Hi all! Is there a way to pass a slice of a DataFr...
# ask-the-community
v
Hi all! Is there a way to pass a slice of a DataFrame returned from one task to another, instead of sending all of it due it's a promise?
In this case, other than a dynamic workflow
y
not sure what you mean - what you’re describing is the default case actually
can you copy/paste your code?
v
Sure, is something like that:
Copy code
@workflow
def main_workflow()-> pd.DataFrame:

    dataset = get_dataframe_from_bq(...)
    # How can I do this?
    sliced_dataset = dataset[['col_a', 'col_b']].copy()
    # Or this?
    result = process_task(
        dataset=dataset[['col_a', 'col_b']]
    )

# Where
@task
def process_task(dataset: pd.DataFrame) -> pd.DataFrame:
    ...
    return result
When I try this, I get the promise error.
Ah, and the
get_dataframe_from_bq
is a task
k
you can’t do that for now because dataset is a promise. you could create a new task
get_sliced_dataset
that return sliced data.
v
Cool I was thinking in this approach, but wanted to be sure if there is any other way Thanks guys!
k
another approach is using annotation. like
Copy code
def process_task(dataset: Annotated[pd.DataFrame, kwtypes(col_a=int, col_b=int)])
you can pass entire dataset to
process_task
, but
process_task
will read subset of dataframe from s3
v
I'm trying the annotation solution, but for some reason, when I upload the workflow to my cluster, it giving me this error:
Copy code
Error: Connection Info: [Endpoint: dns:///flyte.hurb-data.com, InsecureConnection?: false, AuthMode: Pkce]: rpc error: code = Unavailable desc = unexpected HTTP status code received from server: 502 (Bad Gateway); transport: received unexpected content-type "text/html"
I know it is only this workflow because I have another tasks/workflows that are registering just fine
y
can you copy paste the code
v
Copy code
@workflow
def wf() -> pd.DataFrame:
    df = pd.read_csv(...)
    output = annotated_task(df=df)
    return output

COLS = kwargs(a=str, b=str)

@task
def annotated_task(df: Annotated[pd.DataFrame, COLS]) -> pd.DataFrame:
    ...
    return df
Is something like this. I've followed the doc @Kevin Su sent previously
The read_csv line its just for this explanation. In my case, I'm querying from BQ in a task just for this.
152 Views