Hud
07/27/2023, 2:52 PMFlyteSchema[FEATURE_COLUMNS]
to StructuredDataset
FEATURE_COLUMNS = OrderedDict({k: v for k, v in DATASET_COLUMNS.items() if k != "class"})
def split_traintest_dataset(
dataset: FlyteFile[typing.TypeVar("csv")], seed: int, test_split_ratio: float
) -> Tuple[
FlyteSchema[FEATURE_COLUMNS],
FlyteSchema[FEATURE_COLUMNS],
FlyteSchema[CLASSES_COLUMNS],
FlyteSchema[CLASSES_COLUMNS],
]:
...
Kevin Su
07/27/2023, 3:08 PMFlyteSchema[FEATURE_COLUMNS]
update to
StructuredDataset[FEATURE_COLUMNS]
Hud
07/28/2023, 1:00 PMTypeError: 'type' object is not subscriptable
Is this on flytekit 1.8.1 ?Kevin Su
07/28/2023, 4:32 PMHud
08/01/2023, 3:51 PMFlyteSchema[FEATURE_COLUMNS]
-> StructuredDataset
But got the following error:
TypeError: Encountered error while executing workflow 'diabetes_xgboost_model':
Error encountered while executing 'diabetes_xgboost_model':
Failed to convert outputs of task 'split_traintest_dataset' at position 0:
Failed to find a handler for <class 'flytekit.types.structured.structured_dataset.StructuredDataset'>, protocol
[file], fmt ['']
Failed with Unknown Exception <class 'ValueError'> Reason: Encountered error while executing workflow 'structured_dataset.schema_compatibility_wf':
Error encountered while executing 'schema_compatibility_wf':
Failed to find a handler for <class 'numpy.ndarray'>, protocol [file], fmt ['parquet']
Encountered error while executing workflow 'structured_dataset.schema_compatibility_wf':
Error encountered while executing 'schema_compatibility_wf':
Failed to find a handler for <class 'numpy.ndarray'>, protocol [file], fmt ['parquet']
Kevin Su
08/01/2023, 5:58 PMI replaced the->FlyteSchema[FEATURE_COLUMNS]
StructuredDataset
But got the following error:could you try
Annotated[StructuredDataset, FEATURE_COLUMNS]
Hud
08/01/2023, 6:32 PMimport os
import typing
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from flytekit import FlyteContext, StructuredDatasetType, kwtypes, task, workflow
from flytekit.models import literals
from flytekit.models.literals import StructuredDatasetMetadata
from flytekit.types.schema import FlyteSchema
from flytekit.types.structured.structured_dataset import (
PARQUET,
StructuredDataset,
StructuredDatasetDecoder,
StructuredDatasetEncoder,
StructuredDatasetTransformerEngine,
)
from typing_extensions import Annotated
superset_cols = kwtypes(Name=str, Age=int, Height=int)
subset_cols = kwtypes(Age=int)
@task
def get_df(a: int) -> Annotated[pd.DataFrame, superset_cols]:
"""
Generate a sample dataframe
"""
return pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [a, 22], "Height": [160, 178]})
@task
def get_schema_df(a: int) -> FlyteSchema[superset_cols]:
"""
Generate a sample dataframe
"""
s = FlyteSchema()
s.open().write(pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [a, 22], "Height": [160, 178]}))
return s
@task
def get_subset_df(df: Annotated[StructuredDataset, subset_cols]) -> Annotated[StructuredDataset, subset_cols]:
df = df.open(pd.DataFrame).all()
df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
# On specifying BigQuery uri for StructuredDataset, flytekit writes a pandas dataframe to a BigQuery table
return StructuredDataset(dataframe=df)
@task
def to_numpy(ds: Annotated[StructuredDataset, subset_cols]) -> Annotated[StructuredDataset, subset_cols, PARQUET]:
numpy_array = ds.open(np.ndarray).all()
return StructuredDataset(dataframe=numpy_array)
@workflow
def pandas_compatibility_wf(a: int) -> Annotated[StructuredDataset, subset_cols]:
df = get_df(a=a)
ds = get_subset_df(df=df) # noqa: shown for demonstration; users should use the same types between tasks
return to_numpy(ds=ds)
@workflow
def schema_compatibility_wf(a: int) -> Annotated[StructuredDataset, subset_cols]:
df = get_schema_df(a=a)
ds = get_subset_df(df=df) # noqa: shown for demonstration; users should use the same types between tasks
return to_numpy(ds=ds)
And ran it locally pyflyte run structured_dataset.py schema_compatibility_wf --a 42
Kevin Su
08/01/2023, 6:36 PMHud
08/01/2023, 6:49 PMpima_diabetes
and get back to youso I made some changes toBasically updatingpima_diabetes
->FlyteSchema[FEATURE_COLUMNS]
Annotated[SructuredDataset, FEATURE_COLUMNS]
# following the numpy encoder/decoder above
@task
def get_feature_struct_data(
df: Annotated[StructuredDataset, FEATURE_COLUMNS]
) -> Annotated[StructuredDataset, FEATURE_COLUMNS]:
df = df.open(pd.DataFrame).all()
return StructuredDataset(dataframe=df)
@task
def features_to_numpy(
ds: Annotated[StructuredDataset, FEATURE_COLUMNS]
) -> Annotated[StructuredDataset, FEATURE_COLUMNS, PARQUET]:
numpy_array = ds.open(np.ndarray).all()
return StructuredDataset(dataframe=numpy_array)
@task
def transform_feature_schema(
df: Annotated[StructuredDataset, FEATURE_COLUMNS]
) -> Annotated[StructuredDataset, FEATURE_COLUMNS]:
ds = get_feature_struct_data(df=df)
return features_to_numpy(ds=ds)
@task(cache_version="1.0", cache=True, limits=Resources(mem="200Mi"))
def split_traintest_dataset(
dataset: FlyteFile[typing.TypeVar("csv")], seed: int, test_split_ratio: float
) -> Tuple[
Annotated[StructuredDataset, FEATURE_COLUMNS],
Annotated[StructuredDataset, FEATURE_COLUMNS],
Annotated[StructuredDataset, CLASSES_COLUMNS],
Annotated[StructuredDataset, CLASSES_COLUMNS],
]:
column_names = [k for k in DATASET_COLUMNS.keys()]
df = pd.read_csv(dataset, names=column_names)
# Select all features
x = df[column_names[:8]]
# Select only the classes
y = df[[column_names[-1]]]
# split data into train and test sets
(x_train_df, x_test_df, y_train_df, y_test_df) = train_test_split(
x, y, test_size=test_split_ratio, random_state=seed
)
return (
transform_feature_schema(df=x_train_df),
transform_feature_schema(df=x_test_df),
transform_classes_schema(df=y_train_df), # CLASSES_COLUMNS instead of FEATURE_COLUMNS
transform_classes_schema(df=y_test_df),
)
@task(cache_version="1.0", cache=True, limits=Resources(mem="200Mi"))
def predict(
x: Annotated[StructuredDataset, FEATURE_COLUMNS],
model_ser: FlyteFile[MODELSER_JOBLIB],
) -> Annotated[StructuredDataset, CLASSES_COLUMNS]:
model = joblib.load(model_ser)
x_df = x.open(pd.DataFrame).all()
y_pred = model.predict(x_df)
col = [k for k in CLASSES_COLUMNS.keys()]
y_pred_df = pd.DataFrame(y_pred, columns=col, dtype="int64")
y_pred_df.round(0)
return y_pred_df
got this error seemingly during `predictions = predict(x=x_test, model_ser=model.model)`:
TypeError: Encountered error while executing workflow 'diabetes_xgboost_model':
Error encountered while executing 'diabetes_xgboost_model':
Failed to convert outputs of task 'predict' at position 0:
Failed to find a handler for <class 'flytekit.types.structured.structured_dataset.StructuredDataset'>, protocol [file], fmt ['']
Samhita Alla
y_pred_df
to StructuredDataset(dataframe=y_pred_df)
?@task
decorator. Just keep them as Python functions. You need to call your tasks from within a Flyte workflow.Hud
08/07/2023, 10:23 PMsplit_traintest_dataset
task works! albeit without the numpy encoding/decoding, but simply following @Samhita Allas suggestion:
@task(container_image=image, cache_version="1.0", cache=True, limits=Resources(mem="200Mi"))
def split_traintest_dataset(
dataset: FlyteFile[typing.TypeVar("csv")], seed: int, test_split_ratio: float
) -> Tuple[
Annotated[StructuredDataset, FEATURE_COLUMNS],
Annotated[StructuredDataset, FEATURE_COLUMNS],
Annotated[StructuredDataset, CLASSES_COLUMNS],
Annotated[StructuredDataset, CLASSES_COLUMNS],
]:
"""
column_names = [k for k in DATASET_COLUMNS.keys()]
df = pd.read_csv(dataset, names=column_names)
x = df[column_names[:8]]
y = df[[column_names[-1]]]
x_train_df, x_test_df, y_train_df, y_test_df = train_test_split(
x, y, test_size=test_split_ratio, random_state=seed
)
return (
StructuredDataset(dataframe=x_train_df),
StructuredDataset(dataframe=x_test_df),
StructuredDataset(dataframe=y_train_df),
StructuredDataset(dataframe=y_test_df),
)
joblib.dump
doesnt work in the sandbox:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
return wrapped(*args, **kwargs)
File "/root/pima_diabetes/diabetes.py", line 182, in fit
joblib.dump(m, fname)
File "/usr/local/lib/python3.9/site-packages/joblib/numpy_pickle.py", line 552, in dump
with open(filename, 'wb') as f:
Message:
[Errno 13] Permission denied: 'model.joblib.dat'
User error.
how do we deal with model dumps between tasks?Samhita Alla
fname
?os.path.join(flytekit.current_context().working_directory, "your-file-name")
Hud
08/09/2023, 8:03 AMImageSpec
definition is yet to be decided - I'll link any working PRs to that issue then
Also, seems defining List[Annotated[StructuredDataset, <SomeOrderedDict>]
gives the TypeError: unhashable type: 'dict'
- how to handle a list of structured datasets?Samhita Alla
dict
in the type annotation you specified apart from ordered dict?Hud
08/09/2023, 11:40 PMhouse_price_prediction/multiregion_house_price_predictor.py
around L76:
dataset = typing.NamedTuple(
"GenerateSplitDataOutputs",
train_data=typing.List[StructuredDataset, DATASET_COLUMNS]], # previously `typing.List[pd.DataFrame]`
val_data=typing.List[StructuredDataset, DATASET_COLUMNS]],
test_data=typing.List[StructuredDataset, DATASET_COLUMNS]],
)
DATASET_COLUMNS = {
"PRICE": int,
"YEAR_BUILT": int,
"SQUARE_FEET": int,
"NUM_BEDROOMS": int,
"NUM_BATHROOMS": float,
"LOT_ACRES": float,
"GARAGE_SPACES": int,
}
It isn't clear from the full error:
/Users/hud/venvs/house_price_prediction/bin/python /Users/hud/projects/flytesnacks/examples/house_price_prediction/house_price_prediction/multiregion_house_price_predictor.py
╭─────────────────────────────────────────────────────────── Traceback (most recent call last) ────────────────────────────────────────────────────────────╮
│ /Users/hud/projects/flytesnacks/examples/house_price_prediction/house_price_prediction/multiregion_house_price_predictor.py:90 in <module> │
│ │
│ ❱ 90 def generate_and_split_data_multiloc( │
│ │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/task.py:260 in wrapper │
│ │
│ ❱ 260 │ │ task_instance = TaskPlugins.find_pythontask_plugin(type(task_config))( │
│ │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/tracker.py:35 in __call__ │
│ │
│ ❱ 35 │ │ o = super(InstanceTrackingMeta, cls).__call__(*args, **kwargs) │
│ │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/python_function_task.py:121 in __init__ │
│ │
│ ❱ 121 │ │ super().__init__( │
│ │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/python_auto_container.py:85 in __init__ │
│ │
│ ❱ 85 │ │ super().__init__( │
│ │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/base_task.py:420 in __init__ │
│ │
│ ❱ 420 │ │ │ interface=transform_interface_to_typed_interface(interface), │
│ │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/interface.py:248 in transform_interface_to_typed_interface │
│ │
│ ❱ 248 │ outputs_map = transform_variable_map(interface.outputs, output_descriptions) │
│ │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/interface.py:345 in transform_variable_map │
│ │
│ ❱ 345 │ │ │ res[k] = transform_type(v, descriptions.get(k, k)) │
│ │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/interface.py:350 in transform_type │
│ │
│ ❱ 350 │ return _interface_models.Variable(type=TypeEngine.to_literal_type(x), description=de │
│ │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/type_engine.py:826 in to_literal_type │
│ │
│ ❱ 826 │ │ transformer = cls.get_transformer(python_type) │
│ │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/type_engine.py:747 in get_transformer │
│ │
│ ❱ 747 │ │ if python_type in cls._REGISTRY: │
│ │
│ /Users/hud/.pyenv/versions/3.9.17/lib/python3.9/typing.py:757 in __hash__ │
│ │
│ ❱ 757 │ │ return hash((self.__origin__, self.__args__)) │
│ │
│ /Users/hud/.pyenv/versions/3.9.17/lib/python3.9/typing.py:1294 in __hash__ │
│ │
│ ❱ 1294 │ │ return hash((self.__origin__, self.__metadata__)) │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
TypeError: unhashable type: 'dict'
But that error occurs when I append the Annotated[..]
in a listSamhita Alla
Hud
08/10/2023, 9:03 PMDATASET_COLUMNS
? Or for which use cases are Structured Datasets generally used for?Samhita Alla
Annotated[pd.DataFrame, <your-columns>]
. So are you telling list[Annotated[pd.DataFrame, <your-columns>]]
isn't working for you?Hud
08/11/2023, 11:04 PMlist[Annotated[StructuredDataset, <..>]]
or list[Annotated[pd.DataFrame, <..>]]
do not work for me - whatever type I put in <..>
gives me unhashable error for that type.Samhita Alla
Slackbot
08/13/2023, 11:41 AMHud
08/15/2023, 2:28 AM