https://flyte.org logo
#ask-the-community
Title
# ask-the-community
e

Endre Karlson

01/16/2024, 5:52 PM
I got the following
Copy code
report_cols = kwtypes(env=str, instance=str, name=str, last=int, max=int)

@task
def get_counter(
    env_name: str, instance_name: str
) -> Annotated[pd.DataFrame, report_cols]:
    return pd.DataFrame()

@dynamic
def run_on_all_instances(
    envs: dict[str, list[str]],
) -> list[pd.DataFrame]:
    results = []
    for env_name, instances in envs.items():
        for instance_name in instances:
            result = get_actionlog_count(env_name=env_name, instance_name=instance_name)
            results.append(result)

    LOG.debug("Done running all")
    # return results
    return pipeline.concat_frames(frames=results)
Copy code
@task
def concat_frames(frames: list[pd.DataFrame]) -> pd.DataFrame:
    """
    Simple task that will concat a list of frames into 1 frame.
    """
    frames = [df for df in frames if not df.empty()]
    df = pd.concat(frames)
    return df
FlyteScopedUserException: Failed to convert inputs of task 'xp_util.pipeline.concat_frames': No match for FieldRef.Name(env) in __index_level_0__: null __fragment_index: int32 __batch_index: int32 __last_in_fragment: bool __filename: string The above exception was the direct cause of the following exception:
s

Samhita Alla

01/19/2024, 12:47 PM
shouldn't you be returning a dataframe, but not a list from
run_on_all_instances
dynamic workflow?
e

Endre Karlson

01/20/2024, 11:29 AM
Yes you are right!