Govind Raghu
09/08/2022, 10:39 PM@task
def struct_task(forecast_config: FXConfig) -> OutputSchema:
config_df = forecast_config.open(pd.DataFrame).all()
fcst_start = config_df["fcst_start"][0]
data = [['John', 10], ['Alice', 15], ['Bob', 14], [fcst_start, 3]]
df = pd.DataFrame(data, columns=['name', 'age'])
return OutputSchema(dataframe=df, uri=utils.gen_bq_path(model_name))
def struct_model_standalone(forecast_config: FXConfig) -> OutputSchema:
return struct_task(forecast_config=forecast_config)
@task
def get_assumption(version_id: str) -> FXConfig:
d = {'fcst_start': ['2022-09-10'], 'fcst_end': ['2022-09-10'], 'version' : [version_id]}
df = pd.DataFrame(data=d)
return StructuredDataset(dataframe=df)
@workflow
def struct_model_onemodel(version_id: str) -> OutputSchema:
forecast_config = get_assumption(
version_id=version_id
)
return struct_model_standalone(forecast_config=forecast_config)
OutputSchema = Annotated[StructuredDataset,
kwtypes(
name=str,
age=int
)
]
FXConfig = Annotated[StructuredDataset, kwtypes(fcst_start=str, fcst_end=str, version=str)]
Failed to check Catalog for previous results: unexpected artifactData: [o0] type: [structured_dataset_type:<columns:<name:"forecast_version" literal_type:<simple:STRING > > columns:<name:"fromCurrency" literal_type:<simple:STRING > > columns:<name:"toCurrency" literal_type:<simple:STRING > > columns:<name:"rate" literal_type:<simple:FLOAT > > columns:<name:"year_start_rate" literal_type:<simple:FLOAT > > > ] does not match any task output type: [structured_dataset_type:<columns:<name:"forecast_version" literal_type:<simple:STRING > > columns:<name:"fromCurrency" literal_type:<simple:STRING > > columns:<name:"toCurrency" literal_type:<simple:STRING > > columns:<name:"rate" literal_type:<simple:FLOAT > > columns:<name:"year_start_rate" literal_type:<simple:FLOAT > > format:"parquet" > ]
Ketan (kumare3)
Kevin Su
09/09/2022, 10:47 AMGovind Raghu
09/09/2022, 2:39 PMAlireza
09/09/2022, 4:14 PM@task
above def struct_model_standalone
functions?
Your error says:
type: [structured_dataset_type ... does not match any task output type.
My initial guess is that this is expecting some output from a task. So adding @task
might help.
Also can you please run df.dtypes
on your dataset and share that output? I would like to see if the dataset has a consistent datatype on all rows of every column.
I am also thinking about the `> format:"parquet" >`` in the error message but we can dig in that after your initial tests.Dylan Wilder
09/09/2022, 5:46 PMstruct_model_standalone
returns an actual flyte task, we use this pattern to share common workflow dags without the runtime overhead of actually using a subworkflowAlireza
09/09/2022, 5:50 PMstruct_model_standalone
? i.e., to use the
@workflow
def struct_model_onemodel(version_id: str) -> OutputSchema:
forecast_config = get_assumption(
version_id=version_id
)
return struct_task(forecast_config=forecast_config)
If yes, did you see the same error?Govind Raghu
09/09/2022, 6:15 PMAlireza
09/09/2022, 6:32 PM@task
def get_subset_df(
df: Annotated[StructuredDataset, subset_cols]
) -> Annotated[StructuredDataset, subset_cols]:
df = df.open(pd.DataFrame).all()
df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
# On specifying BigQuery uri for StructuredDataset, flytekit writes a pandas dataframe to a BigQuery table
return StructuredDataset(dataframe=df)
I am still trying to see the possible differences. Can you please reorganize your first task to look like the above and see if getting same error?Kevin Su
09/09/2022, 7:15 PM[structured_dataset_type:<columns:<name:"forecast_version" literal_type:<simple:STRING > > columns:<name:"fromCurrency" literal_type:<simple:STRING > > columns:<name:"toCurrency" literal_type:<simple:STRING > > columns:<name:"rate" literal_type:<simple:FLOAT > > columns:<name:"year_start_rate" literal_type:<simple:FLOAT > > > ] does not match any task output type: [structured_dataset_type:<columns:<name:"forecast_version" literal_type:<simple:STRING > > columns:<name:"fromCurrency" literal_type:<simple:STRING > > columns:<name:"toCurrency" literal_type:<simple:STRING > > columns:<name:"rate" literal_type:<simple:FLOAT > > columns:<name:"year_start_rate" literal_type:<simple:FLOAT > > format:"parquet" > ]
from typing import Annotated
import pandas as pd
from flytekit import task, workflow, StructuredDataset, kwtypes
OutputSchema = Annotated[StructuredDataset,
kwtypes(
name=str,
age=int
)
]
FXConfig = Annotated[StructuredDataset, kwtypes(fcst_start=str, fcst_end=str, version=str)]
def struct_task(forecast_config: FXConfig) -> OutputSchema:
config_df = forecast_config.open(pd.DataFrame).all()
fcst_start = config_df["fcst_start"][0]
data = [['John', 10], ['Alice', 15], ['Bob', 14], [fcst_start, 3]]
df = pd.DataFrame(data, columns=['name', 'age'])
return OutputSchema(dataframe=df, uri="<bq://flyte-test-340607.dataset.test1>")
@task
def struct_model_standalone(forecast_config: FXConfig) -> OutputSchema:
return struct_task(forecast_config=forecast_config)
@task
def get_assumption(version_id: str) -> FXConfig:
d = {'fcst_start': ['2022-09-10'], 'fcst_end': ['2022-09-10'], 'version': [version_id]}
df = pd.DataFrame(data=d)
return StructuredDataset(dataframe=df)
@workflow
def struct_model_onemodel(version_id: str) -> OutputSchema:
forecast_config = get_assumption(version_id=version_id)
return struct_model_standalone(forecast_config=forecast_config)
if __name__ == "__main__":
struct_model_onemodel(version_id="id")
Govind Raghu
09/09/2022, 7:17 PMSchema1 = Annotated[
StructuredDataset,
kwtypes(
name=str,
age=int
)
]
Schema2 = Annotated[
StructuredDataset,
kwtypes(
len=int,
)
]
@task
def t1() -> Schema1:
df = pd.DataFrame({
"name": ["dylan", "steve"],
"age": [33, 32]
})
return StructuredDataset(df)
@task
def t2(sd: Schema1) -> Schema2:
df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
table_id = str(uuid.uuid4())
bq_uri = f"<bq://sp-one-model.one>_model_dev.{table_id}"
return StructuredDataset(df, uri=bq_uri)
@workflow
def wf() -> Schema2:
return t2(sd=t1())
Kevin Su
09/09/2022, 7:25 PMGovind Raghu
09/09/2022, 7:25 PMDylan Wilder
09/09/2022, 8:37 PMKevin Su
09/10/2022, 2:28 AMGovind Raghu
09/10/2022, 6:13 AMKevin Su
09/12/2022, 5:55 AMGovind Raghu
09/12/2022, 3:13 PMDylan Wilder
09/12/2022, 3:30 PMGovind Raghu
09/12/2022, 3:32 PMDylan Wilder
09/12/2022, 3:32 PM