broad-train-34581
12/18/2023, 5:44 AMShellTask
that catch the error and pass the result (succeed or fail) to a conditional node that proceed as per usual on success and raising an approve node that user can manually approve to continue the downstream. There are limitations to this approach but might just work for our use-cases. Are there any suggestion on alternatives to achieve this or if any of these are on the roadmap?freezing-airport-6809
freezing-airport-6809
broad-train-34581
12/19/2023, 1:56 AMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
@task
@conditionally_approve
def my_task():
...
broad-train-34581
12/19/2023, 6:28 AMbroad-train-34581
12/19/2023, 6:30 AMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
broad-train-34581
12/19/2023, 8:37 AMbut don’t care about t2s success or fail?sometimes there are real issue with the data for example where the upstream ETL pipeline (own by another team) has a bug, in this case the validation should fail, alert users and not proceed the pipeline. In any case, being able to change the state / control the workflow. Even if we can update the state of a task execution in db to proceed the workflow, it much better than running long hours of compute again.
True but caching should avoid running.This is something that was advocated but there are high resistance to using this for DS for various reasons. Due to additional complexity of designing/managing cache hit (cant use default
kickoff_time
as it has seconds precision). There would probably be hidden issue with managing huge amount of data with cache.
If there is something wrong with data in one task, the cache has to be invalidated across the entire workflow as it is not granular to task.
To users, allowing failure/changing state, it would be way simpler than the above.
The purpose of asking was checking if there are features that could be leveraged which I wasn't aware of. Just consolidating these knowledge and seeing how we can solution and work with to prevent wastage. Thanks!freezing-airport-6809
broad-train-34581
12/20/2023, 2:29 AMIt seems you want to clear db state and run and sadly this is not possible. Or you want partial clone and then run again skipping some nodes.that was desperate idea 😅, ultimately we just want to 'allow failure' or be able to 'handle failure' at a task level which didn't seem unreasonable as a feature. it could be close to failure node, but at task level. We'll try if we can stitch this behaviour with conditional and approval node then. Thanks!
high-park-82026
@workflow
def my_wf(ds: Dataset):
a = validate(ds = ds)
# If the validation result is success, proceed without prompting the user
b = _if(a == SUCCESS).then(always_proceed()).else(ask_for_confirmation())
# If the user ask to proceed, go ahead, otherwise, fail now before "corrupting the state"
c = _if(b == PROCEED).then(always_proceed()).else(fail_now())
d = proceed()
c >> d
Where ask_for_confirmation
is a Human In The Loop node... and always_proceed
just returns a cached static value...
The main change here is that validate
should not fail but instead return the result of the validation (YES/NO)...high-park-82026
freezing-airport-6809
broad-train-34581
12/21/2023, 1:37 AMhigh-park-82026