Victor Churikov
07/31/2023, 9:07 AMdef sometask(inputs: dict):
for item in inputs:
print(f"Found {item.__class__.__name__}: {item}")
decorated_task = task(sometask)
@workflow
def generic_wf(
inputs: dict
):
decorated_task(inputs=inputs)
This works:
@task
def sometask(inputs: dict):
for item in inputs:
print(f"Found {item.__class__.__name__}: {item}")
@workflow
def generic_wf(
inputs: dict
):
sometask(inputs=inputs)
The failing code fails with:
tar: Removing leading `/' from member names
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /root/anaconda3/envs/trueself/bin/pyflyte-execute:8 in <module> │
│ │
│ ❱ 8 │ sys.exit(execute_task_cmd()) │
│ │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/click/core.py:113 │
│ 0 in __call__ │
│ │
│ ❱ 1130 │ │ return self.main(*args, **kwargs) │
│ │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/click/core.py:105 │
│ 5 in main │
│ │
│ ❱ 1055 │ │ │ │ │ rv = self.invoke(ctx) │
│ │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/click/core.py:140 │
│ 4 in invoke │
│ │
│ ❱ 1404 │ │ │ return ctx.invoke(self.callback, **ctx.params) │
│ │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/click/core.py:760 │
│ in invoke │
│ │
│ ❱ 760 │ │ │ │ return __callback(*args, **kwargs) │
│ │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/flytekit/bin/entr │
│ ypoint.py:480 in execute_task_cmd │
│ │
│ ❱ 480 │ _execute_task( │
│ │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/flytekit/exceptio │
│ ns/scopes.py:160 in system_entry_point │
│ │
│ ❱ 160 │ │ │ │ return wrapped(*args, **kwargs) │
│ │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/flytekit/bin/entr │
│ ypoint.py:362 in _execute_task │
│ │
│ ❱ 362 │ │ _handle_annotated_task(ctx, _task_def, inputs, output_prefix) │
│ │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/flytekit/bin/entr │
│ ypoint.py:304 in _handle_annotated_task │
│ │
│ ❱ 304 │ _dispatch_execute(ctx, task_def, inputs, output_prefix) │
│ │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/flytekit/bin/entr │
│ ypoint.py:80 in _dispatch_execute │
│ │
│ ❱ 80 │ logger.debug(f"Starting _dispatch_execute for {task_def.name}") │
╰──────────────────────────────────────────────────────────────────────────────╯
AttributeError: 'function' object has no attribute 'name'
def test_py_func_task_get_container():
def foo(i: int):
pass
default_img = Image(name="default", fqn="<http://xyz.com/abc|xyz.com/abc>", tag="tag1")
other_img = Image(name="other", fqn="<http://xyz.com/other|xyz.com/other>", tag="tag-other")
cfg = ImageConfig(default_image=default_img, images=[default_img, other_img])
settings = SerializationSettings(project="p", domain="d", version="v", image_config=cfg, env={"FOO": "bar"})
pytask = PythonFunctionTask(None, foo, None, environment={"BAZ": "baz"})
c = pytask.get_container(settings)
assert c.image == "<http://xyz.com/abc:tag1|xyz.com/abc:tag1>"
assert c.env == {"FOO": "bar", "BAZ": "baz"}
It seems like I can use PythonFunctionTask to achieve this. In the test, they use this class to run an undecorated python function (or so it seems). From my initial effort it seems that it is not recognizing the task_config, but I need to double check that I am doing it correctly and rely on this examplePryce
07/31/2023, 2:51 PMYee
Victor Churikov
08/01/2023, 8:25 AM@workflow
or a dynamic rather than a @task
because that would allow me to add additional nodes to each execution for sending notifications, preparing inputs, etc. But the main task should be something that is dynamically specified by passing a python function to some Flyte wrapper, registering it, and executing it. I currently use FlyteRemote with register_script() and execute() if that’s relevantfrom typing import Any, Dict
from flytekit import task, workflow
import functools
# My own custom helper to make it easier to configure tasks
from flyte_helpers.k8s import task_config
decorator_kwargs = task_config(
cache=True,
node_pool='t4-memory-pool',
image='us-central1-docker.pkg.dev/mygcpproject/garus1/mycondaimage:v1',
cpu='4', memory='16Gi', gpu='1')
def convert_to_flyte_task(local_function, decorator_kwargs):
@task(**decorator_kwargs)
@functools.wraps(local_function)
def flyte_task_function(inputs: Dict[str, Any]) -> Any:
return local_function(inputs)
return flyte_task_function
def generic_task(inputs: dict) -> None:
print("Received inputs:", inputs)
generic_task = convert_to_flyte_task(generic_task, decorator_kwargs)
@workflow
def my_workflow(inputs: dict) -> None:
generic_task(inputs=inputs)
Now this doesn’t fail during runtime because the “task” and the function behind it have the exact same name. I’m looking into having the task dynamically named after the function so I can support any function name with this, and I’ll also test it with actual functions that do work and with real inputs/outputs to see if it still works, and see if I can get it to support files as well