curved-petabyte-84246
04/07/2024, 6:43 AMdef tracked_workflow(fn: Callable[P, T]) -> Callable[P, T]:
wrapped = setup_teardown(before=report_wf_started, after=report_wf_completed)(fn)
return flytekit.workflow(wrapped)
# AND THIS IS HOW I USE IT
@tracked_workflow
def wf(name: str = "union") -> typing.Tuple[str, int]:
greeting = say_hello(name=name)
greeting_len = greeting_length(greeting=greeting)
return greeting, greeting_len
I want to add a "report_failure" task automatically to each tracked workflow.
The reporting function is simple:
def report_wf_failed() -> None:
send_exec_event("workflow-finish", status=cac.ExecutionStatus.FAILED)
But Flytekit requires the signature to be the same as the workflow entry method.
I tried to wrap the report_wf_failed
function and then change the signature (I reduced some code bits a little):
# wrap with a task and modify the "interface" -
def tracked_workflow(fn):
def wrapped(*args, **kwargs):
report_wf_failed()
wrapped.__signature__ = inspect.signature(fn)
report_failed_task = task(wraped)
...
But that failed because Flytekit is checking if the function is nested. Naturally, I don't want to put the function at the module level and manipulate its signature as I'm afraid it might hurt other tasks using the same tracked_workflow
decorator.
I then attempted to add the inner function to the module scope, but that didn't do the trick (from Flytekit's code it seems to be using a property of the function which is probably being set when it's defined).
I have a few more ideas to try but I would love to get some input:
1. Try to somehow create a new Python function that calls the report function on the fly. It seems rather hacky to me, but I can go that route.
2. From `PythonFunctionTask`'s code, it looks like it's using transform_function_to_interface
function to generate the Interface
object. I can temporarily patch this function to return a different interface.
3. Actually, define the function at the module level, change the signature in the tracked_workflow
decorator, and register the task. That's a solution I'm slightly "scared" of because I don't know Flyte's internal well enough to evaluate what will happen if several workflows use the same decorator (so the signature keeps changing after it was registered).
All solutions are a little hacky. Any other ideas?
Thanks!curved-petabyte-84246
04/07/2024, 9:16 AMError 0: Code: TaskReferenceNotFound, Node Id: fn0, Description: Referenced Task [resource_type:TASK project:"flyte-tasks" domain:"development" ...] not found.
The weird thing is the name of the task.
The report_wf_failed
decorator is in module_a
but is being used by a workflow defined in module_b
. The name of the task becomes module_b.module_a.report_wf_failed
. However, a task with the same name and version exists, so I'm not sure why it's claiming it cannot find it. I used FlyteRemote
to fetch a task by these parameters and was successful.freezing-airport-6809
curved-petabyte-84246
04/07/2024, 3:29 PMmodule_a
but is being registered as module_b.module_a
. But it does exist, with the name module_b.module_a.report_wf_failed
. Still, Flyte won't run the workflow claiming it cannot find it.
I'm able to fetch the task with the same name and version using FlyteRemote. When examining the container arguments I can see the module name is correct.tall-lock-23197