Hi, I am currently working on a Flyte POC. I am m...
# ask-the-community
e
Hi, I am currently working on a Flyte POC. I am migrating one of our KF pipelines and I am trying to understand if I can run a workflow from step N and then provide another workflow run id to reuse the inputs from? I've been reading the docs and I can barely find any info about it. ChatGPT suggested using the recover function, which doesn't seem to exist in flytekit. Could anyone give me a hint how that could be achieved in Flyte, please? Our use case is not necessarily just recovering a failed run, but rather a rapid prototyping when DS people could run a heavy workflow end to end, and then, say, rerun only certain parts of the workflow later (model training with different hyperparams etc) reusing artifacts from the previous complete run (heave SQL + feature engineering). So, ideally they say I want to run from step ModelTraining reusing features generated in another run id 123. I want to do it programmatically from python/CLI Thank you!
s
i think this should be possible with artifacts + caching (artifacts is available only in the managed version). your upstream workflow can produce an artifact and you can set up reactivity to trigger your downstream workflow with caching enabled and send the outputs of your upstream workflow to specific tasks in your downstream workflow. without artifacts, you'd need to resort to some hacks. you can use FlyteRemote to programmatically fetch the outputs of your upstream execution and trigger your downstream workflow with caching enabled. when the outputs are updated, downstream tasks that depend on the upstream workflow outputs should get triggered.
h
I suggest looking and leveraging workflow composition too. Instead of having one big monolithic workflow, you can break it down into multiple smaller workflows and have a big workflow that orchestrates them. Generally speaking, users can iterate on tasks or workflows... Another thing you look at is you can mark tasks as cacheable
Copy code
@task(cache=True, cache_version="1.0")
def my_task(a: int, ...): 
...
This will automatically create new cache entries for unique inputs & signature... so if all your tasks in a workflow are cached, you can just keep rerunning the entire workflow as it'll only rerun the parts that changed.