Hi all, I'm using the <setup_teardown example>, an...
# flyte-support
c
Hi all, I'm using the setup_teardown example, and created another decorator around it, so I can reuse it easily with multiple workflows. This is my decorator:
Copy code
def tracked_workflow(fn: Callable[P, T]) -> Callable[P, T]:
    wrapped = setup_teardown(before=report_wf_started, after=report_wf_completed)(fn)
    return flytekit.workflow(wrapped)

# AND THIS IS HOW I USE IT

@tracked_workflow
def wf(name: str = "union") -> typing.Tuple[str, int]:
    greeting = say_hello(name=name)
    greeting_len = greeting_length(greeting=greeting)
    return greeting, greeting_len
I want to add a "report_failure" task automatically to each tracked workflow. The reporting function is simple:
Copy code
def report_wf_failed() -> None:
    send_exec_event("workflow-finish", status=cac.ExecutionStatus.FAILED)
But Flytekit requires the signature to be the same as the workflow entry method. I tried to wrap the
report_wf_failed
function and then change the signature (I reduced some code bits a little):
Copy code
# wrap with a task and modify the "interface" -
def tracked_workflow(fn):
    def wrapped(*args, **kwargs):
        report_wf_failed()
    wrapped.__signature__ = inspect.signature(fn)
    report_failed_task = task(wraped)
    ...
But that failed because Flytekit is checking if the function is nested. Naturally, I don't want to put the function at the module level and manipulate its signature as I'm afraid it might hurt other tasks using the same
tracked_workflow
decorator. I then attempted to add the inner function to the module scope, but that didn't do the trick (from Flytekit's code it seems to be using a property of the function which is probably being set when it's defined). I have a few more ideas to try but I would love to get some input: 1. Try to somehow create a new Python function that calls the report function on the fly. It seems rather hacky to me, but I can go that route. 2. From `PythonFunctionTask`'s code, it looks like it's using
transform_function_to_interface
function to generate the
Interface
object. I can temporarily patch this function to return a different interface. 3. Actually, define the function at the module level, change the signature in the
tracked_workflow
decorator, and register the task. That's a solution I'm slightly "scared" of because I don't know Flyte's internal well enough to evaluate what will happen if several workflows use the same decorator (so the signature keeps changing after it was registered). All solutions are a little hacky. Any other ideas? Thanks!
UPDATE: I attempted solution #2. The workflow definition seems to work, but workflow execution fails while "collecting" the tasks. The error looks like that:
Copy code
Error 0: Code: TaskReferenceNotFound, Node Id: fn0, Description: Referenced Task [resource_type:TASK project:"flyte-tasks" domain:"development" ...] not found.
The weird thing is the name of the task. The
report_wf_failed
decorator is in
module_a
but is being used by a workflow defined in
module_b
. The name of the task becomes
module_b.module_a.report_wf_failed
. However, a task with the same name and version exists, so I'm not sure why it's claiming it cannot find it. I used
FlyteRemote
to fetch a task by these parameters and was successful.
f
I think we should make it possible to pass no inputs to the on_failure handler - not a bug, but I think an oversight in flytekit For the failure the task exists? With the weird module? This seems like some resolution issue will need a reproducible example and probably we won’t be able to get to it for a bit
c
the task name should be
module_a
but is being registered as
module_b.module_a
. But it does exist, with the name
module_b.module_a.report_wf_failed
. Still, Flyte won't run the workflow claiming it cannot find it. I'm able to fetch the task with the same name and version using FlyteRemote. When examining the container arguments I can see the module name is correct.
t
mind sharing a UI screenshot of the task being present in the project-domain? if you could also share the full error log, that'd be great!