Yuan Wang (Mike)
08/03/2023, 5:24 PMCallable
is not supported by Flyte so far. Is there any workaround to provide a Callable argument to task and workflow in flyte?
• The source code of my poc is pasted in the comments of the thread
Thanks(flyte) ➜ workflows git:(main) ✗ cat robustness_generic.py
import typing
from flytekit import task, workflow
import numpy as np
import pandas as pd
from typing import Callable
from numpy.typing import NDArray
TDataType = NDArray
SlicerType = Callable[[TDataType], TDataType[bool]]
@task
def read_dataset(file_path: str) -> pd.DataFrame:
df = pd.read_csv(file_path)
print("read_dataset..........................")
print(df.shape)
return df
def slice_top_400(df: TDataType) -> TDataType:
return np.arange(df.shape[0]) < 400
@task
def slice_dataset_generic(df: TDataType, slicer: SlicerType) -> TDataType:
print("slice_dataset_generic.....................")
train_df = df[slicer]
print(train_df.shape)
print(train_df)
return train_df
@workflow
def generic_wf(file_path: str) -> TDataType:
df = read_dataset(file_path=file_path)
train_df = slice_dataset_generic(df, slice_top_400)
return train_df
(flyte) ➜ workflows git:(main) ✗ pyflyte run robustness_generic.py wf_generic --file_path data/titanic.csv
Failed with Unknown Exception <class 'ValueError'> Reason: Generic Type <class 'collections.abc.Callable'> not supported currently in Flytekit.
Generic Type <class 'collections.abc.Callable'> not supported currently in Flytekit.
Yicheng Lu
08/03/2023, 5:31 PMYuan Wang (Mike)
08/03/2023, 8:13 PM