Victor Gustavo da Silva Oliveira
05/04/2023, 5:13 PMYee
Victor Gustavo da Silva Oliveira
05/04/2023, 5:40 PM@workflow
def main_workflow()-> pd.DataFrame:
dataset = get_dataframe_from_bq(...)
# How can I do this?
sliced_dataset = dataset[['col_a', 'col_b']].copy()
# Or this?
result = process_task(
dataset=dataset[['col_a', 'col_b']]
)
# Where
@task
def process_task(dataset: pd.DataFrame) -> pd.DataFrame:
...
return result
When I try this, I get the promise error.get_dataframe_from_bq
is a taskKevin Su
05/04/2023, 6:03 PMget_sliced_dataset
that return sliced data.Victor Gustavo da Silva Oliveira
05/04/2023, 6:05 PMKevin Su
05/04/2023, 6:13 PMdef process_task(dataset: Annotated[pd.DataFrame, kwtypes(col_a=int, col_b=int)])
you can pass entire dataset to process_task
, but process_task
will read subset of dataframe from s3Victor Gustavo da Silva Oliveira
05/10/2023, 1:33 PMError: Connection Info: [Endpoint: dns:///flyte.hurb-data.com, InsecureConnection?: false, AuthMode: Pkce]: rpc error: code = Unavailable desc = unexpected HTTP status code received from server: 502 (Bad Gateway); transport: received unexpected content-type "text/html"
I know it is only this workflow because I have another tasks/workflows that are registering just fineYee
Victor Gustavo da Silva Oliveira
05/10/2023, 6:12 PM@workflow
def wf() -> pd.DataFrame:
df = pd.read_csv(...)
output = annotated_task(df=df)
return output
COLS = kwargs(a=str, b=str)
@task
def annotated_task(df: Annotated[pd.DataFrame, COLS]) -> pd.DataFrame:
...
return df
Is something like this. I've followed the doc @Kevin Su sent previously