Hi all! Is there a way to pass a slice of a DataFr...
# flyte-support
d
Hi all! Is there a way to pass a slice of a DataFrame returned from one task to another, instead of sending all of it due it's a promise?
In this case, other than a dynamic workflow
t
not sure what you mean - what you’re describing is the default case actually
can you copy/paste your code?
d
Sure, is something like that:
Copy code
@workflow
def main_workflow()-> pd.DataFrame:

    dataset = get_dataframe_from_bq(...)
    # How can I do this?
    sliced_dataset = dataset[['col_a', 'col_b']].copy()
    # Or this?
    result = process_task(
        dataset=dataset[['col_a', 'col_b']]
    )

# Where
@task
def process_task(dataset: pd.DataFrame) -> pd.DataFrame:
    ...
    return result
When I try this, I get the promise error.
Ah, and the
get_dataframe_from_bq
is a task
g
you can’t do that for now because dataset is a promise. you could create a new task
get_sliced_dataset
that return sliced data.
d
Cool I was thinking in this approach, but wanted to be sure if there is any other way Thanks guys!
g
another approach is using annotation. like
Copy code
def process_task(dataset: Annotated[pd.DataFrame, kwtypes(col_a=int, col_b=int)])
you can pass entire dataset to
process_task
, but
process_task
will read subset of dataframe from s3
d
I'm trying the annotation solution, but for some reason, when I upload the workflow to my cluster, it giving me this error:
Copy code
Error: Connection Info: [Endpoint: dns:///flyte.hurb-data.com, InsecureConnection?: false, AuthMode: Pkce]: rpc error: code = Unavailable desc = unexpected HTTP status code received from server: 502 (Bad Gateway); transport: received unexpected content-type "text/html"
I know it is only this workflow because I have another tasks/workflows that are registering just fine
t
can you copy paste the code
d
Copy code
@workflow
def wf() -> pd.DataFrame:
    df = pd.read_csv(...)
    output = annotated_task(df=df)
    return output

COLS = kwargs(a=str, b=str)

@task
def annotated_task(df: Annotated[pd.DataFrame, COLS]) -> pd.DataFrame:
    ...
    return df
Is something like this. I've followed the doc @glamorous-carpet-83516 sent previously
The read_csv line its just for this explanation. In my case, I'm querying from BQ in a task just for this.
155 Views