Hey, everyone. I updated flytekit and pyflyte to t...
# announcements
Hey, everyone. I updated flytekit and pyflyte to test the v1.0.0 and the new serialization process broke my code. The problem is that I have a decorator called
that, as the name suggests, executes some startup/teardown code before and after each task. This decorator is defined inside a submodule
. My tasks are inside the
module, separated by different modules. When I packaged the tasks before, their names were `tasks.<module>.<task_name>`; now, their names are
. Basically, the path for the decorator is being used as the "prefix" for the task. This is not ideal and the old method makes way more sense... What should I do?
could you give me an example?
of a task, like just the signature
and could you paste the task_setup function? (can remove all the business logic just a print is fine)
my cursory attempt at repro’ing this isn’t working
Ok, so, here's an example task:
Copy code
@task(cache=True, cache_version='1.0')
def generate_regression_dataset(
    n_samples: int, n_features: int, n_informative: int, bias: float,
    random_state: int, order: int
) -> Tuple[pd.DataFrame, pd.DataFrame]:
It is inside
, but when I package the code, you can see that its resolved path is actually `tasks._utils.flyte.generate_regression_dataset`:
Copy code
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 11 flyte objects
  Packaging tasks.evaluation.score_prediction -> 00_tasks.evaluation.score_prediction_1.pb
  Packaging tasks._utils.flyte.generate_regression_dataset -> 01_tasks._utils.flyte.generate_regression_dataset_1.pb
Here's the
function. It setups integrations such as Google Cloud and MLflow, and converts column names for strings so that no errors occur when Flyte saves a DataFrame as a Parquet file.
Copy code
def task_setup(
    function: callable = None, *,
    integration_requests: List[Literal['gcloud', 'mlflow']] = None
) -> None:
    integration_requests = integration_requests or []

    def wrapper(*args, **kwargs):
        # Preprocessing of task

        # Execute function
        output = function(*args, **kwargs)

        # Postprocessing of output
        output = __convert_columns_names(output)

        return output

    return (
            task_setup, integration_requests=integration_requests)
        if function is None else wrapper
huh, check this out. I created a new task setup function, called
. It is exactly the same, but with the
decorator inside it:
Copy code
def new_task_setup(
    function: callable = None, *,
    integration_requests: List[Literal['gcloud', 'mlflow']] = None
) -> None:
    integration_requests = integration_requests or []

    def wrapper(*args, **kwargs):
I avoided doing that before because the documentation explicitly says that the task decorator must be the last one, but it did work, and with the desired path result:
Copy code
def test_task(x: int) -> int:
    return x
packaging with new_task_setup:
Copy code
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 12 flyte objects
  Packaging tasks.transformation.test_task -> 05_tasks.transformation.test_task_1.pb
so, problem solved, I guess? I will use only the
decorator (maybe renaming it to something else, like
yeah i guess that works for now, but this is something we should fix. i will continue working on this
are you seeing just the utils name?
or are you seeing two tasks, one with the name you expect and one with the utils name?
only the utils
nevermind, I'm trying to pass kwargs to flyte's task decorator and it still doesn't work 😞
wait... something's definitely wrong. check this out.
Copy code
from tasks._utils.flyte import extended_task

def pack_model_uri(model_uri: str) -> dict:
    """Pack a model URI into a model data specification."""
    return {'model_uri': model_uri}
Copy code
from tasks._utils.flyte import extended_task

def test_task(x: int) -> int:
    return x
but when i register the tasks...
Copy code
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 12 flyte objects
  Packaging tasks._utils.flyte.pack_model_uri -> 02_tasks._utils.flyte.pack_model_uri_1.pb
  Packaging tasks.transformation.test_task -> 05_tasks.transformation.test_task_1.pb
same structure, but one of them is registered with tasks._utils.flyte, and the other one with the tasks.transformation prefix. what is happening?! does pyflyte have some sort of cache?
this is my extended_task right now:
Copy code
def extended_task(
    function: callable = None, *,
    integrations: List[Literal['gcloud', 'mlflow']] = None,
) -> None:
    if function is None:
        return functools.partial(
            extended_task, integrations=integrations, **task_kwargs)

    integrations = integrations or []
    # ... some irrelevant 'integrations' processing ...

    def wrapper(*args, **kwargs):
        # ... some processing ...
        return output

    return wrapper
it does have a cache… it caches the compilation step by python object.
we’ve seen issues with it in the past - sometimes the same bit of code gets loaded twice with different import paths.
but I don’t think that’s what’s happening here.
yeah, I don't think it's the cache either... I just used the clear command and manually deleted the pycache directories and it still doesn't work
i am able to repro… but i can’t repro in pycharm
only from the command line. still looking
ok, let me know if you need anything
can you try something for me real quick?
Copy code
the folder “transformation” can you rename it to just “t”
only if this is easy to do
yeah one second
sorry for taking so long
Copy code
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 12 flyte objects
  Packaging tasks._utils.flyte.test_task -> 04_tasks._utils.flyte.test_task_1.pb
huh, it did affect it
can you try a fix for me tomorrow?
that comparison there is a hack, but it’s a hack that should work
it’s failing because the getfile call is returning something else.
yeah, what can I do?
and try it again
i’m still working on the unit tests but looking at the mod should be enough
Copy code
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 11 flyte objects
  Packaging tasks.evaluation.score_prediction -> 00_tasks.evaluation.score_prediction_1.pb
  Packaging tasks.ingestion.generate_regression_dataset -> 01_tasks.ingestion.generate_regression_dataset_1.pb
  Packaging tasks.serving.pack_model_uri -> 02_tasks.serving.pack_model_uri_1.pb
  Packaging tasks.serving.predict_from_mlflow_model -> 03_tasks.serving.predict_from_mlflow_model_1.pb
  Packaging tasks.training.fit_polynomial_regression -> 04_tasks.training.fit_polynomial_regression_1.pb
It worked!! 😄
ohh so the module name worked?
this is interesting
what coul dhave changed