faint-machine-61752
04/10/2024, 8:52 PMModuleNotFoundError: No module named 'matplotlib'
while importing libraries in tasks that requires a tensorflow image (because the default image is spark). And if I pass the tensorflow image as container_image
on dynamic, then it fails on ModuleNotFoundError: No module named 'flytekitplugins'
in tasks that requires spark image, basically the reverse. Is there a way I could make this work in a multi image setup ?thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
faint-machine-61752
04/10/2024, 9:01 PMfaint-machine-61752
04/10/2024, 9:01 PMfaint-machine-61752
04/10/2024, 9:06 PM@dynamic
def dynamic_wrapper_workflow(dates: List[str]):
for date in dates:
main_wrapper_workflow(date=date)
return True
@workflow
def main_wrapper_workflow(date: str):
x = sub_wf_1(date=date)
y = sub_wf_2(date=date, input=x)
@workflow
def sub_wf_1(date: str):
a = spark_task(date=date)
@workflow
def sub_wf_2(date: str, input: str):
a = ml_task(date=date, input=input)
@task(container_image=spark_image)
def spark_task()
@task(container_image=ml_image)
def ml_task()
faint-machine-61752
04/10/2024, 9:15 PMsub_wf_2
is dependent on sub_wf_1
executionfaint-machine-61752
04/11/2024, 7:22 AMtall-lock-23197
faint-machine-61752
04/11/2024, 7:40 AMimport main_wrapper_workflow
@dynamic
def dynamic_wrapper_workflow(dates: List[str]):
for date in dates:
main_wrapper_workflow(date=date)
return True
import sub_wf_1, sub_wf_2
@workflow
def main_wrapper_workflow(date: str):
x = sub_wf_1(date=date)
y = sub_wf_2(date=date, input=x)
import spark_task
@workflow
def sub_wf_1(date: str):
a = spark_task(date=date)
import ml_task
@workflow
def sub_wf_2(date: str, input: str):
a = ml_task(date=date, input=input)
from flytekitplugins.spark import Spark
@task(container_image=spark_image)
def spark_task()
import matplotlib.pyplot as plt
@task(container_image=ml_image)
def ml_task()
This works when we directly invoke the wrapper workflow. I assume this is happening because I am wrapping the wrapper workflow with dynamic
tall-lock-23197
@task(...)
def spark_task():
from flytekitplugins.spark import Spark
...
make sure you only have libraries available in the default container image of the dynamic workflow at the top-level.tall-lock-23197
image = ImageSpec(...)
if image.is_container():
from flytekitplugins.spark import Spark
@task(container_image=image)
def spark_task():
...
faint-machine-61752
04/11/2024, 8:59 AMfaint-machine-61752
04/11/2024, 12:09 PMfaint-machine-61752
04/11/2024, 12:29 PMDynamic workflow failed
. I see that failure of one wrapper workflow, caused the dynamic to fail completely causing the other wrapper also to fail. Is there a way to avoid this one ?faint-machine-61752
04/11/2024, 12:43 PM@workflow(
failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE,
but I see that dynamic doesn’t have that option.faint-machine-61752
04/11/2024, 12:58 PM