https://flyte.org logo
#ask-the-community
Title
# ask-the-community
l

Lee Ning Jie Leon

12/18/2023, 5:44 AM
Hello questions about designing workflow/task on flyte for our challenges that we are thinking of how to work this out. 1. Allowing a failed task proceed / Changing the state of a task in workflow or marking it as succeeded These are usually validation / data quality tasks that run and send notifications. Eg data freshness, checks on values, distribution, number of reoccurring unique values, running DBT test. Many times, the fault are tolerable after manual assessment and the team wish to proceed with downstream task (continue next stage of ETL or train the model). Currently, they remove the validation task (after being notified), rerun/recompute the a new workflow version without the validation and then put it back. This could be repeated if the workflow has multi stage transformation. 2. Updating a task in a failed workflow and avoid rerunning the entire workflow. Happens when there is a bug or an input that needs to be tweak at a task level. Have to change the code and rerun a new workflow with updated task. Currently, users would copy all the outputs from the upstream task (from failed workflow) and run task individually to prevent re-running workflow but this get cumbersome if there are many downstream / parallel task that requires this execution at task level instead of workflow. Identical pain point as per https://flyte-org.slack.com/archives/CP2HDHKE1/p1700212383517249 ---- For (2) caching is an option but it adds lots of complexity to design/maintenance for huge workflows. As per discussed in that thread, there seems to be no alternative for now. For (1) I did see that failure node are merged recently, but from my understanding, this runs a task upon workflow failure and doesn't isn't applicable to task level failure. Also it runs a new chain instead of proceeding the workflow which might not fit our use case. We are thinking to build a plugin along the line of extending
ShellTask
that catch the error and pass the result (succeed or fail) to a conditional node that proceed as per usual on success and raising an approve node that user can manually approve to continue the downstream. There are limitations to this approach but might just work for our use-cases. Are there any suggestion on alternatives to achieve this or if any of these are on the roadmap?
k

Ketan (kumare3)

12/18/2023, 3:51 PM
Validation is an interesting usecase. I think this is a great usecase for the upcoming artifact service and auto triggering. The way you would do it is create an artifact from a task and then create a workflow that consumes this artifact. This would make it possible to still have failure semantics on every workflow, yet do not impact a workflow progress. Also you get faster and parallel quality checks. Would you be open to connecting and we can share how to do this and feedback will be appreciated Else we would love to learn how to solve this. Btw remember you can run a task catch the failure and continue and also put like notifications etc for that But it seems you conditionally want to decide success or failure
Let me know @Lee Ning Jie Leon
l

Lee Ning Jie Leon

12/19/2023, 1:56 AM
Is this similar concept to aws lambda/gcp cloud function trigger on s3 compatibly storage when an object gets uploaded? Must the listener/consumer be a separate workflow or sub workflow? You can share any sample repo using it, Yee demoed this previously iirc. I don’t think it solves the problem perfectly as it doesn’t have the conditional and might be cumbersome to implement in a bigger workflow (up to 50 tasks that might require this) compared to user just changing the state or configuring a flag to allow another flow for the task on failure.
k

Ketan (kumare3)

12/19/2023, 6:12 AM
It does. IMO you will not need conditionals. You can simply hang the validations as separate workflows - which may fail. For every Artifact and artifacts can be created per task. Conditions are needed to continue working right https://github.com/unionai-oss/artifact-demo/tree/main/artifact_ux
Here is what i am saying cc @Yee
@Lee Ning Jie Leon is it that you want the main workflow (in this case W1) to sometimes fail if the validations fail. (which are like triggered workflows). If that is the case - this will not work
sadly
and then if you want to achieve this, the only idea i feel is to do what you said. For example add a conditional approval. you can make it easier by create a meta task function. like
Copy code
@task
@conditionally_approve
def my_task():
   ...
l

Lee Ning Jie Leon

12/19/2023, 6:28 AM
while the artifact registry might be able to stitch some of these up, i feel that it introduced too much complexity for the use case. In your graph, its something like t1 -> t2 (validate and fail) -> approve/mark as success (what we think can help) -> t3 continue The challenge at hand is if t2 fail, there is no way to proceed, unless we re-create another workflow with t1->t3, knowing t2 is tolerable
For both cases, the issue we are trying to avoid is that we are force to re-run t1 (expensive and it could be parallel processing of 10-30 tasks taking up to 10+ hours) multiple times
k

Ketan (kumare3)

12/19/2023, 7:39 AM
True but caching should avoid running. But I don’t follow - why do you need to put dataset check in the same workflow - in the example can you go from t1 - t2?
Is it ok to add a failable task type t2, where you can run t2 and t3 in parallel but don’t care about t2s success or fail?
Please write more or @Yee can chat with Lee
l

Lee Ning Jie Leon

12/19/2023, 8:37 AM
but don’t care about t2s success or fail?
sometimes there are real issue with the data for example where the upstream ETL pipeline (own by another team) has a bug, in this case the validation should fail, alert users and not proceed the pipeline. In any case, being able to change the state / control the workflow. Even if we can update the state of a task execution in db to proceed the workflow, it much better than running long hours of compute again.
True but caching should avoid running.
This is something that was advocated but there are high resistance to using this for DS for various reasons. Due to additional complexity of designing/managing cache hit (cant use default
kickoff_time
as it has seconds precision). There would probably be hidden issue with managing huge amount of data with cache. If there is something wrong with data in one task, the cache has to be invalidated across the entire workflow as it is not granular to task. To users, allowing failure/changing state, it would be way simpler than the above. The purpose of asking was checking if there are features that could be leveraged which I wasn't aware of. Just consolidating these knowledge and seeing how we can solution and work with to prevent wastage. Thanks!
k

Ketan (kumare3)

12/19/2023, 3:35 PM
There is always a feature possibility but it has to be correct. It seems you want to clear db state and run and sadly this is not possible. Or you want partial clone and then run again skipping some nodes. As you know recover is possible- but what you want is to skip a node. Honestly if you think how can such a feature even be designed? You should be able to dynamically chop a workflow create a new with only nodes you need and data from previous node. Cc @Haytham Abuelfutuh
l

Lee Ning Jie Leon

12/20/2023, 2:29 AM
It seems you want to clear db state and run and sadly this is not possible. Or you want partial clone and then run again skipping some nodes.
that was desperate idea 😅, ultimately we just want to 'allow failure' or be able to 'handle failure' at a task level which didn't seem unreasonable as a feature. it could be close to failure node, but at task level. We'll try if we can stitch this behaviour with conditional and approval node then. Thanks!
h

Haytham Abuelfutuh

12/20/2023, 5:05 PM
Would something like this work?
Copy code
@workflow
def my_wf(ds: Dataset):
   a = validate(ds = ds)
   # If the validation result is success, proceed without prompting the user
   b = _if(a == SUCCESS).then(always_proceed()).else(ask_for_confirmation())
   # If the user ask to proceed, go ahead, otherwise, fail now before "corrupting the state"
   c = _if(b == PROCEED).then(always_proceed()).else(fail_now())
   d = proceed()
   c >> d
Where
ask_for_confirmation
is a Human In The Loop node... and
always_proceed
just returns a cached static value... The main change here is that
validate
should not fail but instead return the result of the validation (YES/NO)...
@Lee Ning Jie Leon
k

Ketan (kumare3)

12/20/2023, 5:47 PM
Haytham the challenge is adding this code block everywhere
l

Lee Ning Jie Leon

12/21/2023, 1:37 AM
Thanks, this is similar to what I had in mind, just that adding this to big workflow can be daunting. Nevertheless if it works out we might just stick with this for now, will also test out if it works within dynamic workflow.
h

Haytham Abuelfutuh

12/21/2023, 4:40 AM
@Lee Ning Jie Leon sounds great! I think if this pattern works we should look into providing a construct to do this for you...