Hi all, I'm trying to build a workflow that's base...
# ask-the-community
g
Hi all, I'm trying to build a workflow that's based on Papermill (notebooks). I've built a simple start workflow which reads a Parquet file from S3, runs some stats on it in a task, and creates a Pandas dataframe. The dataframe is an input to the next task, which is a notebook. This isn't working - task 1 succeeds, but task 2 throws an error "Object of type DataFrame is not JSON serializable". This is the workflow:
Copy code
import pathlib
import math
import os

from flytekit import Resources, kwtypes, task, workflow
from flytekit.types.file import HTMLPage

from flytekitplugins.papermill import NotebookTask
import pandas as pd

nb = NotebookTask(
    name="s3_notebook_poc",
    notebook_path=os.path.join(pathlib.Path(__file__).parent.absolute(), "s3_notebook_poc.ipynb"),
    requests=Resources(cpu="500m", mem="1Gi"),
    limits=Resources(cpu="500m", mem="4Gi"),
    render_deck=True,
    disable_deck=False,
    inputs=kwtypes(df=pd.DataFrame),
    outputs=kwtypes(out_rendered_nb=HTMLPage)
)

@task(requests=Resources(cpu="500m", mem="1Gi"), limits=Resources(cpu="500m", mem="4Gi"))
def load_and_summarize(s3_path: str) -> pd.DataFrame:
    '''Read a file from s3, summarize it and convert to a Pandas dataframe.'''
    df = pd.read_parquet(s3_path)
    print(df.dtypes)

    # Get the number of rows
    num_rows = len(df)
    print(f'Number of rows: {num_rows}')
    min_timestamp = df['timestamp'].min()
    max_timestamp = df['timestamp'].max()
    total_ms = round(max_timestamp - min_timestamp)
    minutes = math.floor(total_ms/60000)
    seconds = math.floor((total_ms-minutes*60000)/1000)
    ms = (total_ms-minutes*60000-seconds*1000)
    print(f'Data for: {total_ms} total_ms ({minutes}:{seconds}.{ms} minutes)')
    return df

@workflow
def wf(s3_path: str) -> None:
    '''Run the workflow'''
    df = load_and_summarize(s3_path=s3_path)
    nb(df=df)
And the screenshot is the beginning of the notebook (s3_notebook_poc.ipynb). Any idea?
s
This is a limitation of papermill. You'll need to store the dataframe in a csv file or a blob storage like s3, and use the same as an input to the notebook task.
g
Hi @Samhita Alla, Yes, I realized this when digging further... So what I wanted to try was for the previous task to output to parquet, and then for the NotebookTask to receive the path of the file. I wanted to use FlyteFile in order to manage the intermediate file, however this too doesn't seem to work (an exception is thrown by the Notebook task), any thoughts? dataframe_test.py:
Copy code
'''Demo workflow for serializing dataframes'''
import os
import pathlib
from flytekit import task, workflow, kwtypes
from flytekit.types.file import FlyteFile
from flytekitplugins.papermill import NotebookTask
import pandas as pd

@task
def create_data_frame() -> pd.DataFrame:
    '''Create a simple dataframe'''
    names = ["Lionel Messi", "Cristiano Ronaldo", "Neymar Junior", "Kylian Mbappe",
          "Manuel Neuer"]
    height = [1.70, 1.87, 1.75, 1.78, 1.93]
    data_frame = pd.DataFrame(data = {"Height": height}, index = names)
    return data_frame

@task
def summarize_python(data_frame: pd.DataFrame) -> None:
    '''Short summary of the dataframe'''
    print(data_frame)

@task
def df_to_parquet(data_frame: pd.DataFrame) -> FlyteFile:
    '''Save the dataframe as a Parquet file'''
    local_path = "/tmp/results.parquet"
    data_frame.to_parquet(path=local_path)
    return FlyteFile(local_path)

@task
def parquet_to_df(file_path: FlyteFile) -> None:
    print(f'parquet_to_df: file_path: {file_path}')
    df = pd.read_parquet(file_path)
    print(df)
    
summarize_nb = NotebookTask(
    name="summarize_nb",
    notebook_path=os.path.join(pathlib.Path(__file__).parent.absolute(), "dataframe_test.ipynb"),
    render_deck=True,
    disable_deck=False,
    inputs=kwtypes(flyte_file=FlyteFile)
)

@workflow
def wf() -> None:
    '''Run the workflow'''
    data_frame = create_data_frame()
    summarize_python(data_frame=data_frame)
    flyte_file = df_to_parquet(data_frame=data_frame)
    parquet_to_df(file_path = flyte_file)
    summarize_nb(flyte_file = flyte_file)

if __name__ == "__main__":
    wf()
dataframe_test.ipynb:
Copy code
flyte_file = None

import pandas as pd

print(f'notebook: file_path: {flyte_file}')
data_frame = pd.read_parquet(flyte_file)
print(data_frame)
Exception:
Copy code
2023-09-06 15:00:12,386 flytekit ERROR Exception when executing                                                         base_task.py:543
                                    ---------------------------------------------------------------------------                                                             
                                    Exception encountered at "In [3]":                                                                                                      
                                    ---------------------------------------------------------------------------                                                             
                                    ArrowInvalid                              Traceback (most recent call last)                                                             
                                    Cell In[3], line 4                                                                                                                      
                                          1 import pandas as pd                                                                                                             
                                          3 print(f'notebook: file_path: {flyte_file}')                                                                                     
                                    ----> 4 data_frame = pd.read_parquet(flyte_file)                                                                                        
                                          5 print(data_frame)                                                                                                               
                                                                                                                                                                            
                                    File ~/Workspaces/flyte-poc/.venv/lib/python3.8/site-packages/pandas/io/parquet.py:503, in read_parquet(path, engine,                   
                                    columns, storage_options, use_nullable_dtypes, **kwargs)
s
Looks like StructuredDataset has to work in papermill tasks. Here's an example: https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-papermill/tests/test_task.py#L151-L183. Here's the notebook: https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-papermill/tests/testdata/nb-types.ipynb. Could you paste the full stacktrace?
g
OK, I see that your notebook is using the load_flytefile function, I'll give that a go. That's probably it.