Hi, I have a wrapper workflow that has certain sub...
# ask-the-community
h
Hi, I have a wrapper workflow that has certain sub workflows which is comprised of tasks (ml and spark) in a multi image setup model. Now I am trying to invoke this wrapper workflow from a dynamic workflow based on certain conditions. But it fails on say for eg:
ModuleNotFoundError: No module named 'matplotlib'
while importing libraries in tasks that requires a tensorflow image (because the default image is spark). And if I pass the tensorflow image as
container_image
on dynamic, then it fails on
ModuleNotFoundError: No module named 'flytekitplugins'
in tasks that requires spark image, basically the reverse. Is there a way I could make this work in a multi image setup ?
y
could you post a skeleton example of the task decorators/signatures?
as long as you’re not trying to use both at the same time it should work.
unf the loading of the task might be what’s causing the module error.
h
Yes it fails at the dynamic level
I can post the example here
Copy code
@dynamic
def dynamic_wrapper_workflow(dates: List[str]):
 for date in dates:
  main_wrapper_workflow(date=date)
 return True

@workflow
def main_wrapper_workflow(date: str):
 x = sub_wf_1(date=date)
 y = sub_wf_2(date=date, input=x)

@workflow
def sub_wf_1(date: str):
 a = spark_task(date=date)

@workflow
def sub_wf_2(date: str, input: str):
 a = ml_task(date=date, input=input)

@task(container_image=spark_image)
def spark_task()

@task(container_image=ml_image)
def ml_task()
also
sub_wf_2
is dependent on
sub_wf_1
execution
@Samhita Alla Sorry to ping you here. I’d appreciate any thoughts you might have on this.
s
where are you importing the libraries? in the tasks?
h
Yes libraries are imported at the task level…this is how the flow is
Copy code
import main_wrapper_workflow

@dynamic
def dynamic_wrapper_workflow(dates: List[str]):
 for date in dates:
  main_wrapper_workflow(date=date)
 return True
Copy code
import sub_wf_1, sub_wf_2

@workflow
def main_wrapper_workflow(date: str):
 x = sub_wf_1(date=date)
 y = sub_wf_2(date=date, input=x)
Copy code
import spark_task

@workflow
def sub_wf_1(date: str):
 a = spark_task(date=date)
Copy code
import ml_task
@workflow
def sub_wf_2(date: str, input: str):
 a = ml_task(date=date, input=input)
Copy code
from flytekitplugins.spark import Spark

@task(container_image=spark_image)
def spark_task()
Copy code
import matplotlib.pyplot as plt
@task(container_image=ml_image)
def ml_task()
This works when we directly invoke the wrapper workflow. I assume this is happening because I am wrapping the wrapper workflow with
dynamic
s
understood. can you import the libraries within your task?
Copy code
@task(...)
def spark_task():
    from flytekitplugins.spark import Spark

    ...
make sure you only have libraries available in the default container image of the dynamic workflow at the top-level.
if you're using image spec, you could also use `is_container()`:
Copy code
image = ImageSpec(...)

if image.is_container():
    from flytekitplugins.spark import Spark

@task(container_image=image)
def spark_task():
    ...
h
We use Dockerfiles so I will try importing the libraries inside the task and see. Will let you know the results here gratitude thank you
Its working 🎉 Thank you so much for the valuable input 🙂
I had two sets of wrapper workflows running under dynamic but when one of them failed, it caused the other workflow to fail because of
Dynamic workflow failed
. I see that failure of one wrapper workflow, caused the dynamic to fail completely causing the other wrapper also to fail. Is there a way to avoid this one ?
I have this set on workflow levels
Copy code
@workflow(
    failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE,
but I see that dynamic doesn’t have that option.
I think I figured it out, I need to have the same setting for the top most workflow which invokes dynamic 👍