https://flyte.org logo
#ask-the-community
Title
# ask-the-community
m

Maciej Kopczyński

02/06/2024, 9:20 PM
Hi, I know this is a long shot, but I am a bit lost therefore looking for any pointers. I am trying to execute the same task multiple times (in parallel) on a list of different inputs, so I figured that I should be using
map_task
. The list of inputs for the map task is dynamically obtained (via a database query), therefore I encapsulate that map task in a dynamic workflow. Basically, I need to collect outputs of mapped tasks (which I assumed would be a list of outputs) and do further processing on it. Below is a generalized example of what I am trying to do:
Copy code
@task
def get_dynamic_params(...) -> list[str]:
    # Task implementation
    return some_list


@task
def my_task(dynamic_param: str, param1: str, param2: str,) -> DataFrame:
    # Task implementation
    return some_dataframe


@dynamic
def my_dynamic_workflow(
    dynamic_params: list[str],
    param1: str,
    param2: str,
) -> list[DataFrame]:
    partial_my_task = functools.partial(
        my_task,
        param1,
        param2,
    )
    return map_task(partial_my_task)(dynamic_param=dynamic_params)

@workflow
def my_static_workflow(
    param1: str,
    param2: str,
) -> list[...]:
    dynamic_params = get_dynamic_params()
    my_data = my_dynamic_workflow(
        dynamic_params=dynamic_params,
	    param1 = param1,
	    param2 = param2,
    )
    # Workflow implementation
    ...
When trying to register the workflow, I am getting following errors:
Copy code
Error 0: Code: MismatchingTypes, Node Id: end-node, Description: Variable [o0] (type [collection_type:<union_type:<variants:<map_value_type:<structured_dataset_type:<> > structure:<tag:"Typed Dict" > > variants:<simple:NONE structure:<tag:"none" > > > > ]) doesn't match expected type [collection_type:<map_value_type:<structured_dataset_type:<> > > ].
Error 1: Code: ParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0].
Is there something obviously wrong in my reasoning? Maybe I have a bug in the actual code (cannot paste it, I have reviewed and rewritten it a few times though to make sure), but I wanted to make sure first that my understanding of these concepts is correct, etc. Will appreciate any suggestions. Basically, I cannot understand where did the
[collection_type:<union_type:<variants:<map_value_type:<structured_dataset_type:<> > structure:<tag:"Typed Dict" > > variants:<simple:NONE structure:<tag:"none" > > > > ]
came from, since in this particular workflow no such type is returned.
n

Niels Bantilan

02/06/2024, 9:55 PM
looks like the partial has a syntax error:
Copy code
functools.partial(
        my_task,
        param1: str,  # param1=param1
        param2: str,  # param2=param2
    )
the error message needs to be improved for sure… squinting a little bit, it seems like the
ParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0].
means param1/param2 are not being bound to any task invocation, not exactly sure what
Error 0
is about
m

Maciej Kopczyński

02/07/2024, 7:15 AM
Indeed, there is a syntax error, however I have made it when preparing this example 🙂. Just fixed it, thanks! The code is clean though. And I have just solved it - probably should have waited until morning (on this side of the globe) before posting, but I will leave here with the answer. The problem was actually with the part of the code that I have initially skipped in the example. I was overwhelmed by the error and did not notice that it was about the
end-node
of
my_static_workflow
. Now,
my_static_workflow
really looked like this:
Copy code
@workflow
def my_static_workflow(
    param1: str,
    param2: str,
) -> list[...]:
    dynamic_params = get_dynamic_params()
    my_data = my_dynamic_workflow(
        dynamic_params=dynamic_params,
	    param1 = param1,
	    param2 = param2,
    )
    return map_task(some_other_task)(param=my_data)
So now that I look at it, it is clear that since
my_data
was dynamically obtained, I could not have just fed it to
some_other_task
, I needed a separate dynamic workflow and it solved the issue. The error was really hard to read though, and it was not obvious what was the problem (until I read the code again with fresh mind). Thanks for your time anyway!
Seems like I have celebrated too fast. The only think that have changed by moving the second
map_task
to a dynamic workflow is that the error is thrown during workflow execution instead of durin registration (makes sense):
Copy code
Workflow[redacted:redacted:redacted] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: 0: [User] malformed dynamic workflow, caused by: Collected Errors: 2
	Error 0: Code: MismatchingTypes, Node Id: end-node, Description: Variable [o0] (type [collection_type:<union_type:<variants:<structured_dataset_type:<> structure:<tag:"StructuredDataset Transformer" > > variants:<simple:NONE structure:<tag:"none" > > > > ]) doesn't match expected type [collection_type:<structured_dataset_type:<> > ].
	Error 1: Code: ParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0].


1: 0: [User] malformed dynamic workflow, caused by: Collected Errors: 2
	Error 0: Code: MismatchingTypes, Node Id: end-node, Description: Variable [o0] (type [collection_type:<union_type:<variants:<structured_dataset_type:<> structure:<tag:"StructuredDataset Transformer" > > variants:<simple:NONE structure:<tag:"none" > > > > ]) doesn't match expected type [collection_type:<structured_dataset_type:<> > ].
	Error 1: Code: ParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0].
No idea what
StructuredDataset Transformer
is and how
None
got there since none of my tasks returns it.
n

Niels Bantilan

02/08/2024, 2:19 PM
(not ignoring the bad error message problem, but found another potential issue)
Copy code
partial_my_task = functools.partial(
        my_task,
        param1,
        param2,
    )
can you try using **kwargs
Copy code
partial_my_task = functools.partial(
        my_task,
        param1=param1,
        param2=param2,
    )
With *args like param1 is being bound to
dynamic_param
and param2 to
param1
m

Maciej Kopczyński

02/09/2024, 2:47 PM
Sorry for a late response! I needed a "fresh look" so I started over (wrote the same thing from scratch) and actually I did do it the way you suggested on the second time (unknowlingly), and it is woring now 🙂. It is possible that I had other problems in my first attempt, but I can register and execute map tasks. Thanks a lot!
n

Niels Bantilan

02/09/2024, 5:22 PM
Nice! Yeah we really need to figure out how to improve error messages. Would you mind sharing the working code so we can analyze the differences?
m

Maciej Kopczyński

02/09/2024, 6:40 PM
Sure, I have asked GPT4 to anonymize it, hope it still makes sense :). It is just a proof of concept (I did not anticipate to have to use
map_task
, therefore I am trying to put something together quickly):
Copy code
@dynamic
def function_a(
    param_a: list[str],
    param_b: str,
    param_c: str,
    param_d: datetime,
    param_e: str,
) -> list[DataFrame]:
    
    partial_function_b = functools.partial(
        function_b,
        param_b=param_b,
        param_c=param_c,
        param_d=param_d,
        param_e=param_e,
    )
    data_a = map_task(
        partial_function_b,
        metadata=TaskMetadata(retries=0, timeout=timedelta(minutes=240)),
        concurrency=10,
    )(dynamic_param=param_a).with_overrides(requests=AVAILABLE_RESOURCES["s"])
    return data_a

@dynamic
def function_d(data_b: list[DataFrame]) -> list[dict[str, dict[str, DataFrame]]]:
    
    data_c = map_task(
        function_e,
        metadata=TaskMetadata(retries=0, timeout=timedelta(minutes=240)),
        concurrency=10,
    )(data=data_b).with_overrides(requests=AVAILABLE_RESOURCES["m"])
    return data_c

@workflow
def function_f(
    param_b: str,
    param_g: str,
    param_c: str,
    param_d: datetime,
    param_e: str,
    param_h: int,
) -> list[dict[str, dict[str, DataFrame]]]:
    
    param_a = function_i(param_g=param_g, param_c=param_c, param_h=param_h).with_overrides(
        requests=AVAILABLE_RESOURCES["xs"]
    )
    data_a = function_a(
        param_a=param_a,
        param_b=param_b,
        param_c=param_c,
        param_d=param_d,
        param_e=param_e,
    )
    data_sets = function_d(data_b=data_a)
    return data_sets

@workflow
def function_j(
    param_b: str,
    param_g: str,
    param_c: str,
    param_d: datetime,
    param_e: str,
    param_h: int,
) -> FlyteDirectory:
    
    data_d = function_f(
        param_b=param_b,
        param_g=param_g,
        param_c=param_c,
        param_d=param_d,
        param_e=param_e,
        param_h=param_h,
    )
    trained_models = function_k(train_sets=data_d).with_overrides(requests=AVAILABLE_RESOURCES["l"])
    uri = function_l(blob=trained_models).with_overrides(requests=AVAILABLE_RESOURCES["xs"])
    function_m(
        param_g=param_g, param_c=param_c, param_e=param_e, uri=uri
    ).with_overrides(requests=AVAILABLE_RESOURCES["xs"])
    return trained_models
Basically, first dynamic workflow fetches data per item (item list is obtained dynamically), second one applies data transformation. Then the first regular workflow executes these two dynamic ones to produce datasets, which are then used to train models.
2 Views