tall-ram-83532
09/05/2023, 2:31 PMimport pathlib
import math
import os
from flytekit import Resources, kwtypes, task, workflow
from flytekit.types.file import HTMLPage
from flytekitplugins.papermill import NotebookTask
import pandas as pd
nb = NotebookTask(
name="s3_notebook_poc",
notebook_path=os.path.join(pathlib.Path(__file__).parent.absolute(), "s3_notebook_poc.ipynb"),
requests=Resources(cpu="500m", mem="1Gi"),
limits=Resources(cpu="500m", mem="4Gi"),
render_deck=True,
disable_deck=False,
inputs=kwtypes(df=pd.DataFrame),
outputs=kwtypes(out_rendered_nb=HTMLPage)
)
@task(requests=Resources(cpu="500m", mem="1Gi"), limits=Resources(cpu="500m", mem="4Gi"))
def load_and_summarize(s3_path: str) -> pd.DataFrame:
'''Read a file from s3, summarize it and convert to a Pandas dataframe.'''
df = pd.read_parquet(s3_path)
print(df.dtypes)
# Get the number of rows
num_rows = len(df)
print(f'Number of rows: {num_rows}')
min_timestamp = df['timestamp'].min()
max_timestamp = df['timestamp'].max()
total_ms = round(max_timestamp - min_timestamp)
minutes = math.floor(total_ms/60000)
seconds = math.floor((total_ms-minutes*60000)/1000)
ms = (total_ms-minutes*60000-seconds*1000)
print(f'Data for: {total_ms} total_ms ({minutes}:{seconds}.{ms} minutes)')
return df
@workflow
def wf(s3_path: str) -> None:
'''Run the workflow'''
df = load_and_summarize(s3_path=s3_path)
nb(df=df)
And the screenshot is the beginning of the notebook (s3_notebook_poc.ipynb).
Any idea?tall-lock-23197
tall-ram-83532
09/06/2023, 12:04 PM'''Demo workflow for serializing dataframes'''
import os
import pathlib
from flytekit import task, workflow, kwtypes
from flytekit.types.file import FlyteFile
from flytekitplugins.papermill import NotebookTask
import pandas as pd
@task
def create_data_frame() -> pd.DataFrame:
'''Create a simple dataframe'''
names = ["Lionel Messi", "Cristiano Ronaldo", "Neymar Junior", "Kylian Mbappe",
"Manuel Neuer"]
height = [1.70, 1.87, 1.75, 1.78, 1.93]
data_frame = pd.DataFrame(data = {"Height": height}, index = names)
return data_frame
@task
def summarize_python(data_frame: pd.DataFrame) -> None:
'''Short summary of the dataframe'''
print(data_frame)
@task
def df_to_parquet(data_frame: pd.DataFrame) -> FlyteFile:
'''Save the dataframe as a Parquet file'''
local_path = "/tmp/results.parquet"
data_frame.to_parquet(path=local_path)
return FlyteFile(local_path)
@task
def parquet_to_df(file_path: FlyteFile) -> None:
print(f'parquet_to_df: file_path: {file_path}')
df = pd.read_parquet(file_path)
print(df)
summarize_nb = NotebookTask(
name="summarize_nb",
notebook_path=os.path.join(pathlib.Path(__file__).parent.absolute(), "dataframe_test.ipynb"),
render_deck=True,
disable_deck=False,
inputs=kwtypes(flyte_file=FlyteFile)
)
@workflow
def wf() -> None:
'''Run the workflow'''
data_frame = create_data_frame()
summarize_python(data_frame=data_frame)
flyte_file = df_to_parquet(data_frame=data_frame)
parquet_to_df(file_path = flyte_file)
summarize_nb(flyte_file = flyte_file)
if __name__ == "__main__":
wf()
dataframe_test.ipynb:
flyte_file = None
import pandas as pd
print(f'notebook: file_path: {flyte_file}')
data_frame = pd.read_parquet(flyte_file)
print(data_frame)
Exception:
2023-09-06 15:00:12,386 flytekit ERROR Exception when executing base_task.py:543
---------------------------------------------------------------------------
Exception encountered at "In [3]":
---------------------------------------------------------------------------
ArrowInvalid Traceback (most recent call last)
Cell In[3], line 4
1 import pandas as pd
3 print(f'notebook: file_path: {flyte_file}')
----> 4 data_frame = pd.read_parquet(flyte_file)
5 print(data_frame)
File ~/Workspaces/flyte-poc/.venv/lib/python3.8/site-packages/pandas/io/parquet.py:503, in read_parquet(path, engine,
columns, storage_options, use_nullable_dtypes, **kwargs)
tall-lock-23197
tall-ram-83532
09/06/2023, 2:36 PM