Hey, everyone. I updated flytekit and pyflyte to t...
# announcements
m
Hey, everyone. I updated flytekit and pyflyte to test the v1.0.0 and the new serialization process broke my code. The problem is that I have a decorator called
@task_setup
that, as the name suggests, executes some startup/teardown code before and after each task. This decorator is defined inside a submodule
tasks.utils.flyte
. My tasks are inside the
tasks
module, separated by different modules. When I packaged the tasks before, their names were `tasks.<module>.<task_name>`; now, their names are
tasks.utils.flyte.<task_name>
. Basically, the path for the decorator is being used as the "prefix" for the task. This is not ideal and the old method makes way more sense... What should I do?
y
could you give me an example?
of a task, like just the signature
and could you paste the task_setup function? (can remove all the business logic just a print is fine)
my cursory attempt at repro’ing this isn’t working
m
Ok, so, here's an example task:
Copy code
@task(cache=True, cache_version='1.0')
@task_setup
def generate_regression_dataset(
    n_samples: int, n_features: int, n_informative: int, bias: float,
    random_state: int, order: int
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    ...
It is inside
tasks/ingestion.py
, but when I package the code, you can see that its resolved path is actually `tasks._utils.flyte.generate_regression_dataset`:
Copy code
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 11 flyte objects
  Packaging tasks.evaluation.score_prediction -> 00_tasks.evaluation.score_prediction_1.pb
  Packaging tasks._utils.flyte.generate_regression_dataset -> 01_tasks._utils.flyte.generate_regression_dataset_1.pb
  ...
Here's the
@task_setup
function. It setups integrations such as Google Cloud and MLflow, and converts column names for strings so that no errors occur when Flyte saves a DataFrame as a Parquet file.
Copy code
def task_setup(
    function: callable = None, *,
    integration_requests: List[Literal['gcloud', 'mlflow']] = None
) -> None:
    integration_requests = integration_requests or []

    @functools.wraps(function)
    def wrapper(*args, **kwargs):
        # Preprocessing of task
        __configure_integrations(integration_requests)

        # Execute function
        output = function(*args, **kwargs)

        # Postprocessing of output
        output = __convert_columns_names(output)

        return output

    return (
        functools.partial(
            task_setup, integration_requests=integration_requests)
        if function is None else wrapper
    )
huh, check this out. I created a new task setup function, called
new_task_setup
. It is exactly the same, but with the
@task
decorator inside it:
Copy code
def new_task_setup(
    function: callable = None, *,
    integration_requests: List[Literal['gcloud', 'mlflow']] = None
) -> None:
    integration_requests = integration_requests or []

    @task
    @functools.wraps(function)
    def wrapper(*args, **kwargs):
        ...
I avoided doing that before because the documentation explicitly says that the task decorator must be the last one, but it did work, and with the desired path result:
Copy code
@new_task_setup
def test_task(x: int) -> int:
    return x
packaging with new_task_setup:
Copy code
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 12 flyte objects
  ...
  Packaging tasks.transformation.test_task -> 05_tasks.transformation.test_task_1.pb
  ...
so, problem solved, I guess? I will use only the
@task_setup
decorator (maybe renaming it to something else, like
@expanded_task
)
y
hmm
yeah i guess that works for now, but this is something we should fix. i will continue working on this
are you seeing just the utils name?
or are you seeing two tasks, one with the name you expect and one with the utils name?
m
only the utils
nevermind, I'm trying to pass kwargs to flyte's task decorator and it still doesn't work 😞
wait... something's definitely wrong. check this out.
tasks/serving.py:
Copy code
from tasks._utils.flyte import extended_task

@extended_task
def pack_model_uri(model_uri: str) -> dict:
    """Pack a model URI into a model data specification."""
    return {'model_uri': model_uri}
tasks/transformation.py:
Copy code
from tasks._utils.flyte import extended_task

@extended_task
def test_task(x: int) -> int:
    return x
but when i register the tasks...
Copy code
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 12 flyte objects
  ...
  Packaging tasks._utils.flyte.pack_model_uri -> 02_tasks._utils.flyte.pack_model_uri_1.pb
  ...
  Packaging tasks.transformation.test_task -> 05_tasks.transformation.test_task_1.pb
  ...
same structure, but one of them is registered with tasks._utils.flyte, and the other one with the tasks.transformation prefix. what is happening?! does pyflyte have some sort of cache?
this is my extended_task right now:
Copy code
def extended_task(
    function: callable = None, *,
    integrations: List[Literal['gcloud', 'mlflow']] = None,
    **task_kwargs
) -> None:
    if function is None:
        return functools.partial(
            extended_task, integrations=integrations, **task_kwargs)

    integrations = integrations or []
    # ... some irrelevant 'integrations' processing ...

    @task(**task_kwargs)
    @functools.wraps(function)
    def wrapper(*args, **kwargs):
        # ... some processing ...
        return output

    return wrapper
y
😞
it does have a cache… it caches the compilation step by python object.
we’ve seen issues with it in the past - sometimes the same bit of code gets loaded twice with different import paths.
but I don’t think that’s what’s happening here.
m
yeah, I don't think it's the cache either... I just used the clear command and manually deleted the pycache directories and it still doesn't work
y
i am able to repro… but i can’t repro in pycharm
only from the command line. still looking
m
ok, let me know if you need anything
y
can you try something for me real quick?
Copy code
tasks.transformation.test_task
the folder “transformation” can you rename it to just “t”
only if this is easy to do
m
yeah one second
sorry for taking so long
Copy code
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 12 flyte objects
  ...
  Packaging tasks._utils.flyte.test_task -> 04_tasks._utils.flyte.test_task_1.pb
huh, it did affect it
can you try a fix for me tomorrow?
that comparison there is a hack, but it’s a hack that should work
it’s failing because the getfile call is returning something else.
m
yeah, what can I do?
y
and try it again
i’m still working on the unit tests but looking at the mod should be enough
m
Copy code
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 11 flyte objects
  Packaging tasks.evaluation.score_prediction -> 00_tasks.evaluation.score_prediction_1.pb
  Packaging tasks.ingestion.generate_regression_dataset -> 01_tasks.ingestion.generate_regression_dataset_1.pb
  Packaging tasks.serving.pack_model_uri -> 02_tasks.serving.pack_model_uri_1.pb
  Packaging tasks.serving.predict_from_mlflow_model -> 03_tasks.serving.predict_from_mlflow_model_1.pb
  Packaging tasks.training.fit_polynomial_regression -> 04_tasks.training.fit_polynomial_regression_1.pb
It worked!! 😄
k
ohh so the module name worked?
this is interesting
what coul dhave changed
164 Views