Hi team ! I'm trying to run a MapTask and am getti...
# ask-the-community
f
Hi team ! I'm trying to run a MapTask and am getting this strange error - Can someone help me debug this ? Something's up with the
get_tscv_train_test_splits
and
sarima_task
functions
Copy code
Encountered error while executing workflow 'parallel-tscv.time_series_workflow':
  Error encountered while executing 'time_series_workflow':
  Failed to convert inputs of task 'parallel-tscv.map_sarima_task_99b7f0e6767e1ec825145f7d10320b35':
  type object 'DataFrame' has no attribute '__args__'
I've added the script that i am trying to run in the comments
Copy code
from flytekit import task, workflow, FlyteContextManager, Resources, map_task, TaskMetadata
from typing import List, Any, Tuple
import pandas
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error

from dataclasses import dataclass
from dataclasses_json import dataclass_json

import statsmodels.api as sm
import numpy as np

@dataclass_json
@dataclass
class DatasetConfig(object):
    train_data: pandas.DataFrame
    test_data: pandas.DataFrame

# Define a function to generate a dummy dataset
def generate_dummy_dataset(num_rows: int, num_columns: int) -> pandas.DataFrame:
    # Generate random data
    data = np.random.rand(num_rows, num_columns)
    
    # Create column names
    columns = [f"Column_{i+1}" for i in range(num_columns)]
    
    # Create a pandas DataFrame
    df = pandas.DataFrame(data, columns=columns)
    
    return df

@task(requests=Resources(cpu="1", mem="500Mi"), cache=True, cache_version="1.0")
def get_data() -> pandas.DataFrame:
    num_rows = 100
    num_columns = 5
    return generate_dummy_dataset(num_rows=num_rows, num_columns=num_columns)

# Define the SARIMA task
@task(requests=Resources(cpu="2", mem="500Mi"))
def sarima_task(dataset_config: DatasetConfig) -> float:
    train_data, test_data = dataset_config.train_data, dataset_config.test_data
    scores = []
    for column in train_data.columns:
        model = sm.tsa.SARIMAX(train_data[column], order=(1, 1, 1), seasonal_order=(0, 1, 1, 12))
        model_fit = model.fit()
        y_pred = model_fit.predict(start=len(train_data), end=len(train_data) + len(test_data) - 1)
        score = mean_absolute_error(test_data[column], y_pred)
        scores.append(score)

    return float(sum(scores) / len(scores))

@task(requests=Resources(cpu="1", mem="500Mi"), cache=True, cache_version="1.0")
def coalesce_scores(scores: List[float]) -> float:
    return float(sum(scores) / len(scores))

@task(requests=Resources(cpu="2", mem="500Mi"), cache=True, cache_version="1.0")
def get_tscv_train_test_splits(data: pandas.DataFrame) -> List[DatasetConfig]:
    tscv = TimeSeriesSplit(n_splits=5)
    splits = []

    # Iterate over the splits
    for train_index, test_index in tscv.split(data):
        train_end_index = test_index[0] - 1

        train_data = data.iloc[:train_end_index + 1]
        test_data = data.iloc[test_index]

        splits.append(DatasetConfig(train_data=train_data, test_data=test_data))

    return splits

# Define the workflow
@workflow()
def time_series_workflow() -> float:
    # Generate the dummy dataset
    data = get_data()
    dataset_configs = get_tscv_train_test_splits(data=data)
    scores = map_task(
        sarima_task,
        metadata=TaskMetadata(retries=1),
        )(dataset_config=dataset_configs)
    return coalesce_scores(scores=scores)
e
@Faisal Anees, a couple things: 1. it looks like you hit a bug in the dataclass transformer. We're in the process of revamping it (see https://github.com/flyteorg/flytekit/pull/1679). In the meantime, can you open a gh issue to track this issue you're seeing? 2. Reading your code it looks like you based your example on https://docs.flyte.org/projects/cookbook/en/latest/auto/core/control_flow/map_task.html#map-a-task-with-multiple-inputs, but recently we updated map-tasks to take multiple inputs natively (i.e. without needing to group them in a dataclass to hold the intermediary values), bringing the behavior of map_task closer to python's map. I modified your example slightly in https://gist.github.com/eapolinario/42a2393166be6db57c884b6574c55943 to showcase how to do this.
f
Thanks @Eduardo Apolinario (eapolinario) that worked !
@Eduardo Apolinario (eapolinario) just a quick follow up question - why is it that when i assign CPU resources as limit 8 to one of the tasks, i get this error
Copy code
details: Requested CPU limit [8] is greater than current limit set in the platform configuration [2]. Please contact Flyte Admins to change these limits or consult the configuration
        Debug string UNKNOWN:Error received from peer  {created_time:"2023-06-08T16:20:46.851944-04:00", grpc_status:3, grpc_message:"Requested CPU limit [8] is greater than current limit set in the platform configuration [2]. Please contact Flyte Admins to change these limits or consult the configuration"}
this is how I'd defined the task :
Copy code
@task(requests=Resources(cpu="4", mem="1Gi"), limits=Resources(cpu="8", mem="2Gi"))
def sarima_task(train_data: pandas.DataFrame, test_data: pandas.DataFrame) -> float:
    scores = []
    for column in train_data.columns:
        model = sm.tsa.SARIMAX(train_data[column], order=(1, 1, 1), seasonal_order=(0, 1, 1, 12))
        model_fit = model.fit()
        y_pred = model_fit.predict(start=len(train_data), end=len(train_data) + len(test_data) - 1)
        score = mean_absolute_error(test_data[column], y_pred)
        scores.append(score)

    return float(sum(scores) / len(scores))
Not sure where the CPU default of 2 is coming from .. is this something that's configured in
~/.flyte/config.yaml
?
e
f
Thanks ! 👍