some-solstice-93243
02/06/2024, 9:20 PMmap_task
. The list of inputs for the map task is dynamically obtained (via a database query), therefore I encapsulate that map task in a dynamic workflow. Basically, I need to collect outputs of mapped tasks (which I assumed would be a list of outputs) and do further processing on it. Below is a generalized example of what I am trying to do:
@task
def get_dynamic_params(...) -> list[str]:
# Task implementation
return some_list
@task
def my_task(dynamic_param: str, param1: str, param2: str,) -> DataFrame:
# Task implementation
return some_dataframe
@dynamic
def my_dynamic_workflow(
dynamic_params: list[str],
param1: str,
param2: str,
) -> list[DataFrame]:
partial_my_task = functools.partial(
my_task,
param1,
param2,
)
return map_task(partial_my_task)(dynamic_param=dynamic_params)
@workflow
def my_static_workflow(
param1: str,
param2: str,
) -> list[...]:
dynamic_params = get_dynamic_params()
my_data = my_dynamic_workflow(
dynamic_params=dynamic_params,
param1 = param1,
param2 = param2,
)
# Workflow implementation
...
When trying to register the workflow, I am getting following errors:
Error 0: Code: MismatchingTypes, Node Id: end-node, Description: Variable [o0] (type [collection_type:<union_type:<variants:<map_value_type:<structured_dataset_type:<> > structure:<tag:"Typed Dict" > > variants:<simple:NONE structure:<tag:"none" > > > > ]) doesn't match expected type [collection_type:<map_value_type:<structured_dataset_type:<> > > ].
Error 1: Code: ParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0].
Is there something obviously wrong in my reasoning? Maybe I have a bug in the actual code (cannot paste it, I have reviewed and rewritten it a few times though to make sure), but I wanted to make sure first that my understanding of these concepts is correct, etc. Will appreciate any suggestions. Basically, I cannot understand where did the [collection_type:<union_type:<variants:<map_value_type:<structured_dataset_type:<> > structure:<tag:"Typed Dict" > > variants:<simple:NONE structure:<tag:"none" > > > > ]
came from, since in this particular workflow no such type is returned.broad-monitor-993
02/06/2024, 9:55 PMfunctools.partial(
my_task,
param1: str, # param1=param1
param2: str, # param2=param2
)
broad-monitor-993
02/06/2024, 9:58 PMParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0].
means param1/param2 are not being bound to any task invocation, not exactly sure what Error 0
is aboutsome-solstice-93243
02/07/2024, 7:15 AMend-node
of my_static_workflow
. Now, my_static_workflow
really looked like this:
@workflow
def my_static_workflow(
param1: str,
param2: str,
) -> list[...]:
dynamic_params = get_dynamic_params()
my_data = my_dynamic_workflow(
dynamic_params=dynamic_params,
param1 = param1,
param2 = param2,
)
return map_task(some_other_task)(param=my_data)
So now that I look at it, it is clear that since my_data
was dynamically obtained, I could not have just fed it to some_other_task
, I needed a separate dynamic workflow and it solved the issue. The error was really hard to read though, and it was not obvious what was the problem (until I read the code again with fresh mind). Thanks for your time anyway!some-solstice-93243
02/07/2024, 9:52 AMmap_task
to a dynamic workflow is that the error is thrown during workflow execution instead of durin registration (makes sense):
Workflow[redacted:redacted:redacted] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: 0: [User] malformed dynamic workflow, caused by: Collected Errors: 2
Error 0: Code: MismatchingTypes, Node Id: end-node, Description: Variable [o0] (type [collection_type:<union_type:<variants:<structured_dataset_type:<> structure:<tag:"StructuredDataset Transformer" > > variants:<simple:NONE structure:<tag:"none" > > > > ]) doesn't match expected type [collection_type:<structured_dataset_type:<> > ].
Error 1: Code: ParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0].
1: 0: [User] malformed dynamic workflow, caused by: Collected Errors: 2
Error 0: Code: MismatchingTypes, Node Id: end-node, Description: Variable [o0] (type [collection_type:<union_type:<variants:<structured_dataset_type:<> structure:<tag:"StructuredDataset Transformer" > > variants:<simple:NONE structure:<tag:"none" > > > > ]) doesn't match expected type [collection_type:<structured_dataset_type:<> > ].
Error 1: Code: ParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0].
No idea what StructuredDataset Transformer
is and how None
got there since none of my tasks returns it.broad-monitor-993
02/08/2024, 2:19 PMpartial_my_task = functools.partial(
my_task,
param1,
param2,
)
can you try using **kwargs
partial_my_task = functools.partial(
my_task,
param1=param1,
param2=param2,
)
With *args like param1 is being bound to dynamic_param
and param2 to param1
some-solstice-93243
02/09/2024, 2:47 PMbroad-monitor-993
02/09/2024, 5:22 PMsome-solstice-93243
02/09/2024, 6:40 PMmap_task
, therefore I am trying to put something together quickly):
@dynamic
def function_a(
param_a: list[str],
param_b: str,
param_c: str,
param_d: datetime,
param_e: str,
) -> list[DataFrame]:
partial_function_b = functools.partial(
function_b,
param_b=param_b,
param_c=param_c,
param_d=param_d,
param_e=param_e,
)
data_a = map_task(
partial_function_b,
metadata=TaskMetadata(retries=0, timeout=timedelta(minutes=240)),
concurrency=10,
)(dynamic_param=param_a).with_overrides(requests=AVAILABLE_RESOURCES["s"])
return data_a
@dynamic
def function_d(data_b: list[DataFrame]) -> list[dict[str, dict[str, DataFrame]]]:
data_c = map_task(
function_e,
metadata=TaskMetadata(retries=0, timeout=timedelta(minutes=240)),
concurrency=10,
)(data=data_b).with_overrides(requests=AVAILABLE_RESOURCES["m"])
return data_c
@workflow
def function_f(
param_b: str,
param_g: str,
param_c: str,
param_d: datetime,
param_e: str,
param_h: int,
) -> list[dict[str, dict[str, DataFrame]]]:
param_a = function_i(param_g=param_g, param_c=param_c, param_h=param_h).with_overrides(
requests=AVAILABLE_RESOURCES["xs"]
)
data_a = function_a(
param_a=param_a,
param_b=param_b,
param_c=param_c,
param_d=param_d,
param_e=param_e,
)
data_sets = function_d(data_b=data_a)
return data_sets
@workflow
def function_j(
param_b: str,
param_g: str,
param_c: str,
param_d: datetime,
param_e: str,
param_h: int,
) -> FlyteDirectory:
data_d = function_f(
param_b=param_b,
param_g=param_g,
param_c=param_c,
param_d=param_d,
param_e=param_e,
param_h=param_h,
)
trained_models = function_k(train_sets=data_d).with_overrides(requests=AVAILABLE_RESOURCES["l"])
uri = function_l(blob=trained_models).with_overrides(requests=AVAILABLE_RESOURCES["xs"])
function_m(
param_g=param_g, param_c=param_c, param_e=param_e, uri=uri
).with_overrides(requests=AVAILABLE_RESOURCES["xs"])
return trained_models
Basically, first dynamic workflow fetches data per item (item list is obtained dynamically), second one applies data transformation. Then the first regular workflow executes these two dynamic ones to produce datasets, which are then used to train models.