Pavlina Mitsou
08/31/2023, 6:45 PM@task
def generate_queries(configs: dict) -> List[str]:
return generate_queries_from_configs(configs=configs)
@task
def get_data_for_query(query: str, configs: dict) -> pd.DataFrame:
client = bigquery.Client(project=configs.get("gcpProject"))
df = client.query(query).to_dataframe()
return df
@task
def to_output(df: pd.DataFrame) -> List[AnomalyData]:
data_list = []
for name in df['name'].unique():
data = df[df['name'] == name].copy()
data_list.append(AnomalyData(name=name, data=data))
return data_list
@workflow
def start_workflow()-> str:
...
queries_list = generate_queries(configs=configs)
data_list = map_task(
partial(get_data_for_query, configs=configs),
metadata=TaskMetadata(retries=0),
concurrency=20,
)(query=queries)
# TODO how to flatten?
list_of_list_data = map_task(
to_output,
metadata=TaskMetadata(retries=0),
concurrency=20,
)(df=data_list)
....
Jay Ganbat
08/31/2023, 6:58 PMYee
Pavlina Mitsou
08/31/2023, 7:28 PM