Hey All, I have a case where it would be useful t...
# ask-the-community
v
Hey All, I have a case where it would be useful to dynamically pass different functions to a flyte workflow, but running into issues when trying to run it. Minimal code examples inside, thanks
This fails at runtime but registers successfully:
Copy code
def sometask(inputs: dict):
    for item in inputs:
        print(f"Found {item.__class__.__name__}: {item}")

decorated_task = task(sometask)

@workflow
def generic_wf(
    inputs: dict
):
    decorated_task(inputs=inputs)
This works:
Copy code
@task
def sometask(inputs: dict):
    for item in inputs:
        print(f"Found {item.__class__.__name__}: {item}")


@workflow
def generic_wf(
    inputs: dict
):
    sometask(inputs=inputs)
The failing code fails with:
Copy code
tar: Removing leading `/' from member names

╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /root/anaconda3/envs/trueself/bin/pyflyte-execute:8 in <module>              │
│                                                                              │
│ ❱ 8 │   sys.exit(execute_task_cmd())                                         │
│                                                                              │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/click/core.py:113 │
│ 0 in __call__                                                                │
│                                                                              │
│ ❱ 1130 │   │   return self.main(*args, **kwargs)                             │
│                                                                              │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/click/core.py:105 │
│ 5 in main                                                                    │
│                                                                              │
│ ❱ 1055 │   │   │   │   │   rv = self.invoke(ctx)                             │
│                                                                              │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/click/core.py:140 │
│ 4 in invoke                                                                  │
│                                                                              │
│ ❱ 1404 │   │   │   return ctx.invoke(self.callback, **ctx.params)            │
│                                                                              │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/click/core.py:760 │
│ in invoke                                                                    │
│                                                                              │
│ ❱  760 │   │   │   │   return __callback(*args, **kwargs)                    │
│                                                                              │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/flytekit/bin/entr │
│ ypoint.py:480 in execute_task_cmd                                            │
│                                                                              │
│ ❱ 480 │   _execute_task(                                                     │
│                                                                              │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/flytekit/exceptio │
│ ns/scopes.py:160 in system_entry_point                                       │
│                                                                              │
│ ❱ 160 │   │   │   │   return wrapped(*args, **kwargs)                        │
│                                                                              │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/flytekit/bin/entr │
│ ypoint.py:362 in _execute_task                                               │
│                                                                              │
│ ❱ 362 │   │   _handle_annotated_task(ctx, _task_def, inputs, output_prefix)  │
│                                                                              │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/flytekit/bin/entr │
│ ypoint.py:304 in _handle_annotated_task                                      │
│                                                                              │
│ ❱ 304 │   _dispatch_execute(ctx, task_def, inputs, output_prefix)            │
│                                                                              │
│ /root/anaconda3/envs/trueself/lib/python3.10/site-packages/flytekit/bin/entr │
│ ypoint.py:80 in _dispatch_execute                                            │
│                                                                              │
│ ❱  80 │   logger.debug(f"Starting _dispatch_execute for {task_def.name}")    │
╰──────────────────────────────────────────────────────────────────────────────╯
AttributeError: 'function' object has no attribute 'name'
Found this test in flytekit repo:
Copy code
def test_py_func_task_get_container():
    def foo(i: int):
        pass

    default_img = Image(name="default", fqn="<http://xyz.com/abc|xyz.com/abc>", tag="tag1")
    other_img = Image(name="other", fqn="<http://xyz.com/other|xyz.com/other>", tag="tag-other")
    cfg = ImageConfig(default_image=default_img, images=[default_img, other_img])

    settings = SerializationSettings(project="p", domain="d", version="v", image_config=cfg, env={"FOO": "bar"})

    pytask = PythonFunctionTask(None, foo, None, environment={"BAZ": "baz"})
    c = pytask.get_container(settings)
    assert c.image == "<http://xyz.com/abc:tag1|xyz.com/abc:tag1>"
    assert c.env == {"FOO": "bar", "BAZ": "baz"}
It seems like I can use PythonFunctionTask to achieve this. In the test, they use this class to run an undecorated python function (or so it seems). From my initial effort it seems that it is not recognizing the task_config, but I need to double check that I am doing it correctly and rely on this example
p
Interesting, it looks like @task either adds an attr or calling the decorator directly strips it. I imagine there's a way to hack it and get it to work, but perhaps the cleaner way would be to use Flyte's builtin conditionals? https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/control_flow/conditions.html
y
there are also dynamic tasks.
could you elaborate a bit more on the goal?
v
Developers asked me to create a wrapper with Flyte that runs arbitrary code on the cloud in kubernetes. It seems like a convenient way to run code because it can be scaled beyond our office machines’ capacity, is tracked in the UI, and has various useful features and integrations. Few months ago I managed to achieve this with Prefect v2 (another ML pipelines solution like Flyte), but Prefect did not allow as much flexibility with the scheduling and pod configurations, so I ended up looking for a more kubernetes-native solution and that’s how I found Flyte. (note: this is not our main use case, we also run full ML pipelines in addition to this, and Flyte has been doing really well so far) The developers expect that ideally they should have to mess with Flyte-specific concepts as little as possible, and are asking me to abstract away executions of single functions on Flyte. I’d like to achieve this using a
@workflow
or a dynamic rather than a
@task
because that would allow me to add additional nodes to each execution for sending notifications, preparing inputs, etc. But the main task should be something that is dynamically specified by passing a python function to some Flyte wrapper, registering it, and executing it. I currently use FlyteRemote with register_script() and execute() if that’s relevant
After some trial and error and looking into it, I got this minimal code which successfully registers and executes. It receives a dict, which I tested with both strings and ints, so it can have multiple values of different types as a single input/output:
Copy code
from typing import Any, Dict
from flytekit import task, workflow
import functools

# My own custom helper to make it easier to configure tasks
from flyte_helpers.k8s import task_config

decorator_kwargs = task_config(
    cache=True,
    node_pool='t4-memory-pool',
    image='us-central1-docker.pkg.dev/mygcpproject/garus1/mycondaimage:v1',
    cpu='4', memory='16Gi', gpu='1')

def convert_to_flyte_task(local_function, decorator_kwargs):
    @task(**decorator_kwargs)
    @functools.wraps(local_function)
    def flyte_task_function(inputs: Dict[str, Any]) -> Any:
        return local_function(inputs)

    return flyte_task_function

def generic_task(inputs: dict) -> None:
    print("Received inputs:", inputs)

generic_task = convert_to_flyte_task(generic_task, decorator_kwargs)

@workflow
def my_workflow(inputs: dict) -> None:
    generic_task(inputs=inputs)
Now this doesn’t fail during runtime because the “task” and the function behind it have the exact same name. I’m looking into having the task dynamically named after the function so I can support any function name with this, and I’ll also test it with actual functions that do work and with real inputs/outputs to see if it still works, and see if I can get it to support files as well