fancy-napkin-28087
06/08/2023, 4:27 PMget_tscv_train_test_splits
and sarima_task
functions
Encountered error while executing workflow 'parallel-tscv.time_series_workflow':
Error encountered while executing 'time_series_workflow':
Failed to convert inputs of task 'parallel-tscv.map_sarima_task_99b7f0e6767e1ec825145f7d10320b35':
type object 'DataFrame' has no attribute '__args__'
I've added the script that i am trying to run in the commentsfancy-napkin-28087
06/08/2023, 4:27 PMfrom flytekit import task, workflow, FlyteContextManager, Resources, map_task, TaskMetadata
from typing import List, Any, Tuple
import pandas
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error
from dataclasses import dataclass
from dataclasses_json import dataclass_json
import statsmodels.api as sm
import numpy as np
@dataclass_json
@dataclass
class DatasetConfig(object):
train_data: pandas.DataFrame
test_data: pandas.DataFrame
# Define a function to generate a dummy dataset
def generate_dummy_dataset(num_rows: int, num_columns: int) -> pandas.DataFrame:
# Generate random data
data = np.random.rand(num_rows, num_columns)
# Create column names
columns = [f"Column_{i+1}" for i in range(num_columns)]
# Create a pandas DataFrame
df = pandas.DataFrame(data, columns=columns)
return df
@task(requests=Resources(cpu="1", mem="500Mi"), cache=True, cache_version="1.0")
def get_data() -> pandas.DataFrame:
num_rows = 100
num_columns = 5
return generate_dummy_dataset(num_rows=num_rows, num_columns=num_columns)
# Define the SARIMA task
@task(requests=Resources(cpu="2", mem="500Mi"))
def sarima_task(dataset_config: DatasetConfig) -> float:
train_data, test_data = dataset_config.train_data, dataset_config.test_data
scores = []
for column in train_data.columns:
model = sm.tsa.SARIMAX(train_data[column], order=(1, 1, 1), seasonal_order=(0, 1, 1, 12))
model_fit = model.fit()
y_pred = model_fit.predict(start=len(train_data), end=len(train_data) + len(test_data) - 1)
score = mean_absolute_error(test_data[column], y_pred)
scores.append(score)
return float(sum(scores) / len(scores))
@task(requests=Resources(cpu="1", mem="500Mi"), cache=True, cache_version="1.0")
def coalesce_scores(scores: List[float]) -> float:
return float(sum(scores) / len(scores))
@task(requests=Resources(cpu="2", mem="500Mi"), cache=True, cache_version="1.0")
def get_tscv_train_test_splits(data: pandas.DataFrame) -> List[DatasetConfig]:
tscv = TimeSeriesSplit(n_splits=5)
splits = []
# Iterate over the splits
for train_index, test_index in tscv.split(data):
train_end_index = test_index[0] - 1
train_data = data.iloc[:train_end_index + 1]
test_data = data.iloc[test_index]
splits.append(DatasetConfig(train_data=train_data, test_data=test_data))
return splits
# Define the workflow
@workflow()
def time_series_workflow() -> float:
# Generate the dummy dataset
data = get_data()
dataset_configs = get_tscv_train_test_splits(data=data)
scores = map_task(
sarima_task,
metadata=TaskMetadata(retries=1),
)(dataset_config=dataset_configs)
return coalesce_scores(scores=scores)
high-accountant-32689
06/08/2023, 5:49 PMfancy-napkin-28087
06/08/2023, 7:26 PMfancy-napkin-28087
06/08/2023, 8:22 PMdetails: Requested CPU limit [8] is greater than current limit set in the platform configuration [2]. Please contact Flyte Admins to change these limits or consult the configuration
Debug string UNKNOWN:Error received from peer {created_time:"2023-06-08T16:20:46.851944-04:00", grpc_status:3, grpc_message:"Requested CPU limit [8] is greater than current limit set in the platform configuration [2]. Please contact Flyte Admins to change these limits or consult the configuration"}
this is how I'd defined the task :
@task(requests=Resources(cpu="4", mem="1Gi"), limits=Resources(cpu="8", mem="2Gi"))
def sarima_task(train_data: pandas.DataFrame, test_data: pandas.DataFrame) -> float:
scores = []
for column in train_data.columns:
model = sm.tsa.SARIMAX(train_data[column], order=(1, 1, 1), seasonal_order=(0, 1, 1, 12))
model_fit = model.fit()
y_pred = model_fit.predict(start=len(train_data), end=len(train_data) + len(test_data) - 1)
score = mean_absolute_error(test_data[column], y_pred)
scores.append(score)
return float(sum(scores) / len(scores))
fancy-napkin-28087
06/08/2023, 8:23 PM~/.flyte/config.yaml
?high-accountant-32689
06/08/2023, 8:31 PMfancy-napkin-28087
06/08/2023, 8:33 PM