dry-ability-69144
05/04/2023, 5:13 PMdry-ability-69144
05/04/2023, 5:13 PMthankful-minister-83577
thankful-minister-83577
dry-ability-69144
05/04/2023, 5:40 PM@workflow
def main_workflow()-> pd.DataFrame:
dataset = get_dataframe_from_bq(...)
# How can I do this?
sliced_dataset = dataset[['col_a', 'col_b']].copy()
# Or this?
result = process_task(
dataset=dataset[['col_a', 'col_b']]
)
# Where
@task
def process_task(dataset: pd.DataFrame) -> pd.DataFrame:
...
return result
When I try this, I get the promise error.dry-ability-69144
05/04/2023, 5:44 PMget_dataframe_from_bq
is a taskglamorous-carpet-83516
05/04/2023, 6:03 PMget_sliced_dataset
that return sliced data.dry-ability-69144
05/04/2023, 6:05 PMglamorous-carpet-83516
05/04/2023, 6:13 PMdef process_task(dataset: Annotated[pd.DataFrame, kwtypes(col_a=int, col_b=int)])
you can pass entire dataset to process_task
, but process_task
will read subset of dataframe from s3glamorous-carpet-83516
05/04/2023, 6:13 PMdry-ability-69144
05/10/2023, 1:33 PMError: Connection Info: [Endpoint: dns:///flyte.hurb-data.com, InsecureConnection?: false, AuthMode: Pkce]: rpc error: code = Unavailable desc = unexpected HTTP status code received from server: 502 (Bad Gateway); transport: received unexpected content-type "text/html"
I know it is only this workflow because I have another tasks/workflows that are registering just finethankful-minister-83577
dry-ability-69144
05/10/2023, 6:12 PM@workflow
def wf() -> pd.DataFrame:
df = pd.read_csv(...)
output = annotated_task(df=df)
return output
COLS = kwargs(a=str, b=str)
@task
def annotated_task(df: Annotated[pd.DataFrame, COLS]) -> pd.DataFrame:
...
return df
Is something like this. I've followed the doc @glamorous-carpet-83516 sent previouslydry-ability-69144
05/10/2023, 6:13 PM