limited-hairdresser-5419
01/16/2024, 5:52 PMreport_cols = kwtypes(env=str, instance=str, name=str, last=int, max=int)
@task
def get_counter(
env_name: str, instance_name: str
) -> Annotated[pd.DataFrame, report_cols]:
return pd.DataFrame()
@dynamic
def run_on_all_instances(
envs: dict[str, list[str]],
) -> list[pd.DataFrame]:
results = []
for env_name, instances in envs.items():
for instance_name in instances:
result = get_actionlog_count(env_name=env_name, instance_name=instance_name)
results.append(result)
LOG.debug("Done running all")
# return results
return pipeline.concat_frames(frames=results)
@task
def concat_frames(frames: list[pd.DataFrame]) -> pd.DataFrame:
"""
Simple task that will concat a list of frames into 1 frame.
"""
frames = [df for df in frames if not df.empty()]
df = pd.concat(frames)
return df
FlyteScopedUserException: Failed to convert inputs of task 'xp_util.pipeline.concat_frames':
No match for FieldRef.Name(env) in __index_level_0__: null
__fragment_index: int32
__batch_index: int32
__last_in_fragment: bool
__filename: string
The above exception was the direct cause of the following exception:tall-lock-23197
run_on_all_instances
dynamic workflow?limited-hairdresser-5419
01/20/2024, 11:29 AM