Hi, I’m trying to call a `@dynamic` task from a `m...
# announcements
n
Hi, I’m trying to call a
@dynamic
task from a
map_task
and am seeing a strange error. Is there a limitation I should be aware of?
Copy code
[1/1] currentAttempt done. Last Error: UNKNOWN::failed to read data from dataDir [<gs://mybucket/metadata/propeller/nickflyte-dev-fc4174c9bef1a4c2caa5/n0/data/0/0/outputs.pb>]. Error: path:<gs://mybucket/metadata/propeller/nickflyte-dev-fc4174c9bef1a4c2caa5/n0/data/0/0/outputs.pb>: not found
The attached code fails on our GCS cluster. Other map_tasks that don’t call a dynamic task work fine
If our workflow design is poor we welcome recommendations alternate approaches. The reason we are using map_task + dynamic is so we can call a set of tasks not just one from the map_task. However, since map_tasks can only take a single argument, all of my parameters are bundled into one object. When the dynamic task receives the bundled object it calls the sub tasks with just the arguments it cares about.
Copy code
@dataclass_json
@dataclass
class BundledData:
  arg1: str
  arg2: int

@task
def t1(input: str) -> str:
  print(input)
  return "hello"

@task t2(input: int, msg: str):
  print(input)

@dynamic
def dyn(input: BundledData):
  o = t1(input=input.arg1)
  t2(input=input.arg2, msg=o)

@workflow
def wf(inputs: List[BundledData]):
  map_task(input=inputs)
k
Map only works with pod task or regular task
We should fail in flytekit
n
Oh well that explains it. Flytekit did not stop me from doing it, so I assumed I could. If I want to process a list of inputs in parallel. And the processing is made up of multiple tasks is that possible?
One solution might be to have a separate application (outside of flyte) that takes the list of inputs and calls flyte using FlyteRemote to execute a LaunchPlan for each input. However, I’d prefer to have it all happen within Flyte so I could use the FlyteConsole to start processing.
k
@Nicholas LoFaso sorry for the delayed response, you can use @dynamic, to launch multiple other @dynamic workflows
Why not use that?
n
Hi Ketan, thanks for the very early morning response! The dynamic workflow statements execute serially yes? So if I had
Copy code
@dynamic
def dyn(inputs: List[str]):
  for input in inputs:
    task1(input=input)
If my inputs were [“apple”, “banana”, “cherry”]. `task1(input=“banana”) wouldn’t start until task1(input=“apple”) finished? Can I use standard python multithreading to make it happen in parallel?
k
They do not execute serially, only locally 😞. The local executor is only to test
We should make that clear in the docs cc @Yee
n
So there is no way currently to do what I’m trying to do in parallel?
k
No it will run in parallel when you run remote
I meant in testing it looks like serial, but when you run on the cluster it will be parallel
n
oh great! so in my silly example above three instances of
task1
will be created at the same time for apple, banana, and cherry?
k
Yes
n
Fantastic! I will set that up now. Thanks for much
k
Sorry for not being clear
Everything in Flyte will parallelize automatically
🙏 1
Map is just a special way of running large arrays
n
Ahh I think that’s what I wasn’t understanding
k
Sorry
Please suggest edits to docs
Cc @Samhita Alla we should probably explain the model for a workflow with independent data
@Nicholas LoFaso so it's a data flow system, Flyte will make parallel progress as long as data is available and resources are available
E.g, you can write a static for loop in a regular workflow
n
That makes sense. And because I need to loop on an input variable that’s why I need
@dyanmic
k
Exactly
Can you suggest an edit
n
It might be clear and I just misunderstood. Let me reread, and see if I can suggest an edit
267 Views