Hi all, seems the tutorials are all using outdated...
# ask-the-community
h
Hi all, seems the tutorials are all using outdated flytekit versions - trying to update the pima_diabetes Any examples on how to transform
FlyteSchema[FEATURE_COLUMNS]
to
StructuredDataset
Copy code
FEATURE_COLUMNS = OrderedDict({k: v for k, v in DATASET_COLUMNS.items() if k != "class"})
def split_traintest_dataset(
    dataset: FlyteFile[typing.TypeVar("csv")], seed: int, test_split_ratio: float
) -> Tuple[
    FlyteSchema[FEATURE_COLUMNS],
    FlyteSchema[FEATURE_COLUMNS],
    FlyteSchema[CLASSES_COLUMNS],
    FlyteSchema[CLASSES_COLUMNS],
]:
...
k
Copy code
FlyteSchema[FEATURE_COLUMNS]
update to
Copy code
StructuredDataset[FEATURE_COLUMNS]
you’re right. we should remove flyte schema from the docs
h
@Kevin Su yeah I tried that but got this
Copy code
TypeError: 'type' object is not subscriptable
Is this on flytekit 1.8.1 ?
k
let me test it on my side, I’ll get back to you
h
@Kevin Su I replaced the
FlyteSchema[FEATURE_COLUMNS]
->
StructuredDataset
But got the following error:
Copy code
TypeError: Encountered error while executing workflow 'diabetes_xgboost_model':
  Error encountered while executing 'diabetes_xgboost_model':
  Failed to convert outputs of task 'split_traintest_dataset' at position 0:
  Failed to find a handler for <class 'flytekit.types.structured.structured_dataset.StructuredDataset'>, protocol 
[file], fmt ['']
Not sure if related, but getting the same error from this guide too:
Copy code
Failed with Unknown Exception <class 'ValueError'> Reason: Encountered error while executing workflow 'structured_dataset.schema_compatibility_wf':
  Error encountered while executing 'schema_compatibility_wf':
  Failed to find a handler for <class 'numpy.ndarray'>, protocol [file], fmt ['parquet']
Encountered error while executing workflow 'structured_dataset.schema_compatibility_wf':
  Error encountered while executing 'schema_compatibility_wf':
  Failed to find a handler for <class 'numpy.ndarray'>, protocol [file], fmt ['parquet']
k
looking, testing now
just ran the example in this doc. it works for me. could share the workflow code you ran?
I replaced the
FlyteSchema[FEATURE_COLUMNS]
->
StructuredDataset
But got the following error:
could you try
Copy code
Annotated[StructuredDataset, FEATURE_COLUMNS]
h
Hm that's odd, so this was the code for `structured_dataset.py`:
Copy code
import os
import typing

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from flytekit import FlyteContext, StructuredDatasetType, kwtypes, task, workflow
from flytekit.models import literals
from flytekit.models.literals import StructuredDatasetMetadata
from flytekit.types.schema import FlyteSchema
from flytekit.types.structured.structured_dataset import (
    PARQUET,
    StructuredDataset,
    StructuredDatasetDecoder,
    StructuredDatasetEncoder,
    StructuredDatasetTransformerEngine,
)
from typing_extensions import Annotated

superset_cols = kwtypes(Name=str, Age=int, Height=int)
subset_cols = kwtypes(Age=int)


@task
def get_df(a: int) -> Annotated[pd.DataFrame, superset_cols]:
    """
    Generate a sample dataframe
    """
    return pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [a, 22], "Height": [160, 178]})

@task
def get_schema_df(a: int) -> FlyteSchema[superset_cols]:
    """
    Generate a sample dataframe
    """
    s = FlyteSchema()
    s.open().write(pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [a, 22], "Height": [160, 178]}))
    return s

@task
def get_subset_df(df: Annotated[StructuredDataset, subset_cols]) -> Annotated[StructuredDataset, subset_cols]:
    df = df.open(pd.DataFrame).all()
    df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
    # On specifying BigQuery uri for StructuredDataset, flytekit writes a pandas dataframe to a BigQuery table
    return StructuredDataset(dataframe=df)

@task
def to_numpy(ds: Annotated[StructuredDataset, subset_cols]) -> Annotated[StructuredDataset, subset_cols, PARQUET]:
    numpy_array = ds.open(np.ndarray).all()
    return StructuredDataset(dataframe=numpy_array)

@workflow
def pandas_compatibility_wf(a: int) -> Annotated[StructuredDataset, subset_cols]:
    df = get_df(a=a)
    ds = get_subset_df(df=df)  # noqa: shown for demonstration; users should use the same types between tasks
    return to_numpy(ds=ds)

@workflow
def schema_compatibility_wf(a: int) -> Annotated[StructuredDataset, subset_cols]:
    df = get_schema_df(a=a)
    ds = get_subset_df(df=df)  # noqa: shown for demonstration; users should use the same types between tasks
    return to_numpy(ds=ds)
And ran it locally
pyflyte run structured_dataset.py schema_compatibility_wf --a 42
k
run into the same error. looking
h
Ah! I think it runs! So I'd have to register the numpy encoder and decoder everytime I guess - I'll run this on
pima_diabetes
and get back to you
so I made some changes to
pima_diabetes
Basically updating
FlyteSchema[FEATURE_COLUMNS]
->
Annotated[SructuredDataset, FEATURE_COLUMNS]
Copy code
# following the numpy encoder/decoder above 
@task
def get_feature_struct_data(
    df: Annotated[StructuredDataset, FEATURE_COLUMNS]
) -> Annotated[StructuredDataset, FEATURE_COLUMNS]:
    df = df.open(pd.DataFrame).all()
    return StructuredDataset(dataframe=df)

@task
def features_to_numpy(
    ds: Annotated[StructuredDataset, FEATURE_COLUMNS]
) -> Annotated[StructuredDataset, FEATURE_COLUMNS, PARQUET]:
    numpy_array = ds.open(np.ndarray).all()
    return StructuredDataset(dataframe=numpy_array)

@task
def transform_feature_schema(
    df: Annotated[StructuredDataset, FEATURE_COLUMNS]
) -> Annotated[StructuredDataset, FEATURE_COLUMNS]:
    ds = get_feature_struct_data(df=df)
    return features_to_numpy(ds=ds)

@task(cache_version="1.0", cache=True, limits=Resources(mem="200Mi"))
def split_traintest_dataset(
    dataset: FlyteFile[typing.TypeVar("csv")], seed: int, test_split_ratio: float
) -> Tuple[
    Annotated[StructuredDataset, FEATURE_COLUMNS],
    Annotated[StructuredDataset, FEATURE_COLUMNS],
    Annotated[StructuredDataset, CLASSES_COLUMNS],
    Annotated[StructuredDataset, CLASSES_COLUMNS],
]:
    column_names = [k for k in DATASET_COLUMNS.keys()]
    df = pd.read_csv(dataset, names=column_names)

    # Select all features
    x = df[column_names[:8]]
    # Select only the classes
    y = df[[column_names[-1]]]

    # split data into train and test sets
    (x_train_df, x_test_df, y_train_df, y_test_df) = train_test_split(
        x, y, test_size=test_split_ratio, random_state=seed
    )

    return (
        transform_feature_schema(df=x_train_df),
        transform_feature_schema(df=x_test_df),
        transform_classes_schema(df=y_train_df), # CLASSES_COLUMNS instead of FEATURE_COLUMNS
        transform_classes_schema(df=y_test_df),
    )

@task(cache_version="1.0", cache=True, limits=Resources(mem="200Mi"))
def predict(
    x: Annotated[StructuredDataset, FEATURE_COLUMNS],
    model_ser: FlyteFile[MODELSER_JOBLIB],
) -> Annotated[StructuredDataset, CLASSES_COLUMNS]:
    model = joblib.load(model_ser)
    x_df = x.open(pd.DataFrame).all()

    y_pred = model.predict(x_df)

    col = [k for k in CLASSES_COLUMNS.keys()]
    y_pred_df = pd.DataFrame(y_pred, columns=col, dtype="int64")
    y_pred_df.round(0)
    return y_pred_df
got this error seemingly during `predictions = predict(x=x_test, model_ser=model.model)`:
Copy code
TypeError: Encountered error while executing workflow 'diabetes_xgboost_model':
  Error encountered while executing 'diabetes_xgboost_model':
  Failed to convert outputs of task 'predict' at position 0:
  Failed to find a handler for <class 'flytekit.types.structured.structured_dataset.StructuredDataset'>, protocol [file], fmt ['']
s
Can you convert
y_pred_df
to
StructuredDataset(dataframe=y_pred_df)
?
Also, calling tasks from within tasks essentially means you're calling functions within tasks. If you want to call functions within your tasks, you need not decorate them with the
@task
decorator. Just keep them as Python functions. You need to call your tasks from within a Flyte workflow.
h
Sorry for the late reply - so the
split_traintest_dataset
task works! albeit without the numpy encoding/decoding, but simply following @Samhita Allas suggestion:
Copy code
@task(container_image=image, cache_version="1.0", cache=True, limits=Resources(mem="200Mi"))
def split_traintest_dataset(
    dataset: FlyteFile[typing.TypeVar("csv")], seed: int, test_split_ratio: float
) -> Tuple[
    Annotated[StructuredDataset, FEATURE_COLUMNS],
    Annotated[StructuredDataset, FEATURE_COLUMNS],
    Annotated[StructuredDataset, CLASSES_COLUMNS],
    Annotated[StructuredDataset, CLASSES_COLUMNS],
]:
    """

    column_names = [k for k in DATASET_COLUMNS.keys()]
    df = pd.read_csv(dataset, names=column_names)

    x = df[column_names[:8]]
    y = df[[column_names[-1]]]

    x_train_df, x_test_df, y_train_df, y_test_df = train_test_split(
        x, y, test_size=test_split_ratio, random_state=seed
    )

    return (
        StructuredDataset(dataframe=x_train_df),
        StructuredDataset(dataframe=x_test_df),
        StructuredDataset(dataframe=y_train_df),
        StructuredDataset(dataframe=y_test_df),
    )
But now the
joblib.dump
doesnt work in the sandbox:
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/pima_diabetes/diabetes.py", line 182, in fit
        joblib.dump(m, fname)
      File "/usr/local/lib/python3.9/site-packages/joblib/numpy_pickle.py", line 552, in dump
        with open(filename, 'wb') as f:

Message:

    [Errno 13] Permission denied: 'model.joblib.dat'

User error.
how do we deal with model dumps between tasks?
s
What's
fname
?
It needs to be
os.path.join(flytekit.current_context().working_directory, "your-file-name")
Thank you for your willingness to contribute to Flyte. I've actually opened an issue to include ImageSpec in all our examples: https://github.com/flyteorg/flyte/issues/3790. I'll be working on it very soon. But please feel free to create PRs to update the documentation as you deem appropriate. A PR for every example should be fine, or if you prefer, you can create a PR that encompasses multiple examples.
h
Ok it seems a standardized
ImageSpec
definition is yet to be decided - I'll link any working PRs to that issue then Also, seems defining
List[Annotated[StructuredDataset, <SomeOrderedDict>]
gives the
TypeError: unhashable type: 'dict'
- how to handle a list of structured datasets?
s
I don't understand. What's
dict
in the type annotation you specified apart from ordered dict?
h
so in
house_price_prediction/multiregion_house_price_predictor.py
around L76:
Copy code
dataset = typing.NamedTuple(
    "GenerateSplitDataOutputs",
    train_data=typing.List[StructuredDataset, DATASET_COLUMNS]], # previously `typing.List[pd.DataFrame]`
    val_data=typing.List[StructuredDataset, DATASET_COLUMNS]],
    test_data=typing.List[StructuredDataset, DATASET_COLUMNS]],
)

DATASET_COLUMNS = {
    "PRICE": int,
    "YEAR_BUILT": int,
    "SQUARE_FEET": int,
    "NUM_BEDROOMS": int,
    "NUM_BATHROOMS": float,
    "LOT_ACRES": float,
    "GARAGE_SPACES": int,
}
It isn't clear from the full error:
Copy code
/Users/hud/venvs/house_price_prediction/bin/python /Users/hud/projects/flytesnacks/examples/house_price_prediction/house_price_prediction/multiregion_house_price_predictor.py
╭─────────────────────────────────────────────────────────── Traceback (most recent call last) ────────────────────────────────────────────────────────────╮
│ /Users/hud/projects/flytesnacks/examples/house_price_prediction/house_price_prediction/multiregion_house_price_predictor.py:90 in <module>               │
│                                                                                                                                                          │
│ ❱  90 def generate_and_split_data_multiloc(                                                                                                              │
│                                                                                                                                                          │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/task.py:260 in wrapper                                                 │
│                                                                                                                                                          │
│ ❱ 260 │   │   task_instance = TaskPlugins.find_pythontask_plugin(type(task_config))(                                                                     │
│                                                                                                                                                          │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/tracker.py:35 in __call__                                              │
│                                                                                                                                                          │
│ ❱  35 │   │   o = super(InstanceTrackingMeta, cls).__call__(*args, **kwargs)                                                                             │
│                                                                                                                                                          │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/python_function_task.py:121 in __init__                                │
│                                                                                                                                                          │
│ ❱ 121 │   │   super().__init__(                                                                                                                          │
│                                                                                                                                                          │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/python_auto_container.py:85 in __init__                                │
│                                                                                                                                                          │
│ ❱  85 │   │   super().__init__(                                                                                                                          │
│                                                                                                                                                          │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/base_task.py:420 in __init__                                           │
│                                                                                                                                                          │
│ ❱ 420 │   │   │   interface=transform_interface_to_typed_interface(interface),                                                                           │
│                                                                                                                                                          │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/interface.py:248 in transform_interface_to_typed_interface             │
│                                                                                                                                                          │
│ ❱ 248 │   outputs_map = transform_variable_map(interface.outputs, output_descriptions)                                                                   │
│                                                                                                                                                          │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/interface.py:345 in transform_variable_map                             │
│                                                                                                                                                          │
│ ❱ 345 │   │   │   res[k] = transform_type(v, descriptions.get(k, k))                                                                                     │
│                                                                                                                                                          │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/interface.py:350 in transform_type                                     │
│                                                                                                                                                          │
│ ❱ 350 │   return _interface_models.Variable(type=TypeEngine.to_literal_type(x), description=de                                                           │
│                                                                                                                                                          │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/type_engine.py:826 in to_literal_type                                  │
│                                                                                                                                                          │
│ ❱  826 │   │   transformer = cls.get_transformer(python_type)                                                                                            │
│                                                                                                                                                          │
│ /Users/hud/venvs/house_price_prediction/lib/python3.9/site-packages/flytekit/core/type_engine.py:747 in get_transformer                                  │
│                                                                                                                                                          │
│ ❱  747 │   │   if python_type in cls._REGISTRY:                                                                                                          │
│                                                                                                                                                          │
│ /Users/hud/.pyenv/versions/3.9.17/lib/python3.9/typing.py:757 in __hash__                                                                                │
│                                                                                                                                                          │
│ ❱  757 │   │   return hash((self.__origin__, self.__args__))                                                                                             │
│                                                                                                                                                          │
│ /Users/hud/.pyenv/versions/3.9.17/lib/python3.9/typing.py:1294 in __hash__                                                                               │
│                                                                                                                                                          │
│ ❱ 1294 │   │   return hash((self.__origin__, self.__metadata__))                                                                                         │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
TypeError: unhashable type: 'dict'
But that error occurs when I append the
Annotated[..]
in a list
s
@Hud, would you mind letting me know why you are replacing dataframe with structured dataset?
h
@Samhita Alla Oh I guess just for the purpose of validating types defined in the
DATASET_COLUMNS
? Or for which use cases are Structured Datasets generally used for?
s
In that case, you can use
Annotated[pd.DataFrame, <your-columns>]
. So are you telling
list[Annotated[pd.DataFrame, <your-columns>]]
isn't working for you?
h
yes, both
list[Annotated[StructuredDataset, <..>]]
or
list[Annotated[pd.DataFrame, <..>]]
do not work for me - whatever type I put in
<..>
gives me unhashable error for that type.
s
Can you create an issue, please?
[flyte-bug]
h
Okay, so using a dataclass solves the issue - https://github.com/flyteorg/flyte/issues/3959