Hi all, I'm trying to build a workflow that's base...
# ask-the-community
Hi all, I'm trying to build a workflow that's based on Papermill (notebooks). I've built a simple start workflow which reads a Parquet file from S3, runs some stats on it in a task, and creates a Pandas dataframe. The dataframe is an input to the next task, which is a notebook. This isn't working - task 1 succeeds, but task 2 throws an error "Object of type DataFrame is not JSON serializable". This is the workflow:
Copy code
import pathlib
import math
import os

from flytekit import Resources, kwtypes, task, workflow
from flytekit.types.file import HTMLPage

from flytekitplugins.papermill import NotebookTask
import pandas as pd

nb = NotebookTask(
    notebook_path=os.path.join(pathlib.Path(__file__).parent.absolute(), "s3_notebook_poc.ipynb"),
    requests=Resources(cpu="500m", mem="1Gi"),
    limits=Resources(cpu="500m", mem="4Gi"),

@task(requests=Resources(cpu="500m", mem="1Gi"), limits=Resources(cpu="500m", mem="4Gi"))
def load_and_summarize(s3_path: str) -> pd.DataFrame:
    '''Read a file from s3, summarize it and convert to a Pandas dataframe.'''
    df = pd.read_parquet(s3_path)

    # Get the number of rows
    num_rows = len(df)
    print(f'Number of rows: {num_rows}')
    min_timestamp = df['timestamp'].min()
    max_timestamp = df['timestamp'].max()
    total_ms = round(max_timestamp - min_timestamp)
    minutes = math.floor(total_ms/60000)
    seconds = math.floor((total_ms-minutes*60000)/1000)
    ms = (total_ms-minutes*60000-seconds*1000)
    print(f'Data for: {total_ms} total_ms ({minutes}:{seconds}.{ms} minutes)')
    return df

def wf(s3_path: str) -> None:
    '''Run the workflow'''
    df = load_and_summarize(s3_path=s3_path)
And the screenshot is the beginning of the notebook (s3_notebook_poc.ipynb). Any idea?
This is a limitation of papermill. You'll need to store the dataframe in a csv file or a blob storage like s3, and use the same as an input to the notebook task.
Hi @Samhita Alla, Yes, I realized this when digging further... So what I wanted to try was for the previous task to output to parquet, and then for the NotebookTask to receive the path of the file. I wanted to use FlyteFile in order to manage the intermediate file, however this too doesn't seem to work (an exception is thrown by the Notebook task), any thoughts? dataframe_test.py:
Copy code
'''Demo workflow for serializing dataframes'''
import os
import pathlib
from flytekit import task, workflow, kwtypes
from flytekit.types.file import FlyteFile
from flytekitplugins.papermill import NotebookTask
import pandas as pd

def create_data_frame() -> pd.DataFrame:
    '''Create a simple dataframe'''
    names = ["Lionel Messi", "Cristiano Ronaldo", "Neymar Junior", "Kylian Mbappe",
          "Manuel Neuer"]
    height = [1.70, 1.87, 1.75, 1.78, 1.93]
    data_frame = pd.DataFrame(data = {"Height": height}, index = names)
    return data_frame

def summarize_python(data_frame: pd.DataFrame) -> None:
    '''Short summary of the dataframe'''

def df_to_parquet(data_frame: pd.DataFrame) -> FlyteFile:
    '''Save the dataframe as a Parquet file'''
    local_path = "/tmp/results.parquet"
    return FlyteFile(local_path)

def parquet_to_df(file_path: FlyteFile) -> None:
    print(f'parquet_to_df: file_path: {file_path}')
    df = pd.read_parquet(file_path)
summarize_nb = NotebookTask(
    notebook_path=os.path.join(pathlib.Path(__file__).parent.absolute(), "dataframe_test.ipynb"),

def wf() -> None:
    '''Run the workflow'''
    data_frame = create_data_frame()
    flyte_file = df_to_parquet(data_frame=data_frame)
    parquet_to_df(file_path = flyte_file)
    summarize_nb(flyte_file = flyte_file)

if __name__ == "__main__":
Copy code
flyte_file = None

import pandas as pd

print(f'notebook: file_path: {flyte_file}')
data_frame = pd.read_parquet(flyte_file)
Copy code
2023-09-06 15:00:12,386 flytekit ERROR Exception when executing                                                         base_task.py:543
                                    Exception encountered at "In [3]":                                                                                                      
                                    ArrowInvalid                              Traceback (most recent call last)                                                             
                                    Cell In[3], line 4                                                                                                                      
                                          1 import pandas as pd                                                                                                             
                                          3 print(f'notebook: file_path: {flyte_file}')                                                                                     
                                    ----> 4 data_frame = pd.read_parquet(flyte_file)                                                                                        
                                          5 print(data_frame)                                                                                                               
                                    File ~/Workspaces/flyte-poc/.venv/lib/python3.8/site-packages/pandas/io/parquet.py:503, in read_parquet(path, engine,                   
                                    columns, storage_options, use_nullable_dtypes, **kwargs)
Looks like StructuredDataset has to work in papermill tasks. Here's an example: https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-papermill/tests/test_task.py#L151-L183. Here's the notebook: https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-papermill/tests/testdata/nb-types.ipynb. Could you paste the full stacktrace?
OK, I see that your notebook is using the load_flytefile function, I'll give that a go. That's probably it.