Matheus Moreno
05/04/2022, 9:06 PM@task_setup
that, as the name suggests, executes some startup/teardown code before and after each task. This decorator is defined inside a submodule tasks.utils.flyte
. My tasks are inside the tasks
module, separated by different modules. When I packaged the tasks before, their names were `tasks.<module>.<task_name>`; now, their names are tasks.utils.flyte.<task_name>
. Basically, the path for the decorator is being used as the "prefix" for the task. This is not ideal and the old method makes way more sense... What should I do?Yee
Matheus Moreno
05/05/2022, 5:22 PM@task(cache=True, cache_version='1.0')
@task_setup
def generate_regression_dataset(
n_samples: int, n_features: int, n_informative: int, bias: float,
random_state: int, order: int
) -> Tuple[pd.DataFrame, pd.DataFrame]:
...
It is inside tasks/ingestion.py
, but when I package the code, you can see that its resolved path is actually `tasks._utils.flyte.generate_regression_dataset`:
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 11 flyte objects
Packaging tasks.evaluation.score_prediction -> 00_tasks.evaluation.score_prediction_1.pb
Packaging tasks._utils.flyte.generate_regression_dataset -> 01_tasks._utils.flyte.generate_regression_dataset_1.pb
...
@task_setup
function. It setups integrations such as Google Cloud and MLflow, and converts column names for strings so that no errors occur when Flyte saves a DataFrame as a Parquet file.
def task_setup(
function: callable = None, *,
integration_requests: List[Literal['gcloud', 'mlflow']] = None
) -> None:
integration_requests = integration_requests or []
@functools.wraps(function)
def wrapper(*args, **kwargs):
# Preprocessing of task
__configure_integrations(integration_requests)
# Execute function
output = function(*args, **kwargs)
# Postprocessing of output
output = __convert_columns_names(output)
return output
return (
functools.partial(
task_setup, integration_requests=integration_requests)
if function is None else wrapper
)
new_task_setup
. It is exactly the same, but with the @task
decorator inside it:
def new_task_setup(
function: callable = None, *,
integration_requests: List[Literal['gcloud', 'mlflow']] = None
) -> None:
integration_requests = integration_requests or []
@task
@functools.wraps(function)
def wrapper(*args, **kwargs):
...
I avoided doing that before because the documentation explicitly says that the task decorator must be the last one, but it did work, and with the desired path result:
@new_task_setup
def test_task(x: int) -> int:
return x
packaging with new_task_setup:
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 12 flyte objects
...
Packaging tasks.transformation.test_task -> 05_tasks.transformation.test_task_1.pb
...
@task_setup
decorator (maybe renaming it to something else, like @expanded_task
)Yee
Matheus Moreno
05/05/2022, 8:15 PMfrom tasks._utils.flyte import extended_task
@extended_task
def pack_model_uri(model_uri: str) -> dict:
"""Pack a model URI into a model data specification."""
return {'model_uri': model_uri}
tasks/transformation.py:
from tasks._utils.flyte import extended_task
@extended_task
def test_task(x: int) -> int:
return x
but when i register the tasks...
(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 12 flyte objects
...
Packaging tasks._utils.flyte.pack_model_uri -> 02_tasks._utils.flyte.pack_model_uri_1.pb
...
Packaging tasks.transformation.test_task -> 05_tasks.transformation.test_task_1.pb
...
def extended_task(
function: callable = None, *,
integrations: List[Literal['gcloud', 'mlflow']] = None,
**task_kwargs
) -> None:
if function is None:
return functools.partial(
extended_task, integrations=integrations, **task_kwargs)
integrations = integrations or []
# ... some irrelevant 'integrations' processing ...
@task(**task_kwargs)
@functools.wraps(function)
def wrapper(*args, **kwargs):
# ... some processing ...
return output
return wrapper
Yee
Matheus Moreno
05/05/2022, 9:59 PMYee
Matheus Moreno
05/05/2022, 10:11 PMYee
tasks.transformation.test_task
Matheus Moreno
05/05/2022, 11:50 PM(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 12 flyte objects
...
Packaging tasks._utils.flyte.test_task -> 04_tasks._utils.flyte.test_task_1.pb
Yee
Matheus Moreno
05/06/2022, 12:24 AMYee
Matheus Moreno
05/06/2022, 12:38 AM(base) matheus@HUNB866:~/Codes/dsc-notebooks/flyte_example$ pyflyte --pkgs tasks --pkgs workflows package --image flyte-example:1.0 --in-container-source-path /home/flyte --force
Loading packages ('tasks', 'workflows') under source root /home/matheus/Codes/dsc-notebooks/flyte_example
Successfully serialized 11 flyte objects
Packaging tasks.evaluation.score_prediction -> 00_tasks.evaluation.score_prediction_1.pb
Packaging tasks.ingestion.generate_regression_dataset -> 01_tasks.ingestion.generate_regression_dataset_1.pb
Packaging tasks.serving.pack_model_uri -> 02_tasks.serving.pack_model_uri_1.pb
Packaging tasks.serving.predict_from_mlflow_model -> 03_tasks.serving.predict_from_mlflow_model_1.pb
Packaging tasks.training.fit_polynomial_regression -> 04_tasks.training.fit_polynomial_regression_1.pb
It worked!! 😄Ketan (kumare3)