Hello! We would like to get some help with an issu...
# flytekit
g
Hello! We would like to get some help with an issue we are facing when we use Structured Dataset as an output of a task. Intermittently the task gets stuck in Queued status. Sample Code in 🧵
Copy code
@task
def struct_task(forecast_config: FXConfig) -> OutputSchema:
    config_df = forecast_config.open(pd.DataFrame).all()
    fcst_start = config_df["fcst_start"][0]
    data = [['John', 10], ['Alice', 15], ['Bob', 14], [fcst_start, 3]]
    df = pd.DataFrame(data, columns=['name', 'age'])
    return OutputSchema(dataframe=df, uri=utils.gen_bq_path(model_name))


def struct_model_standalone(forecast_config: FXConfig) -> OutputSchema:
    return struct_task(forecast_config=forecast_config)

@task
def get_assumption(version_id: str) -> FXConfig:
    d = {'fcst_start': ['2022-09-10'], 'fcst_end': ['2022-09-10'], 'version' : [version_id]}
    df = pd.DataFrame(data=d)
    return StructuredDataset(dataframe=df)

@workflow
def struct_model_onemodel(version_id: str) -> OutputSchema:
    forecast_config = get_assumption(
            version_id=version_id
    )
    return struct_model_standalone(forecast_config=forecast_config)
Schemas:
Copy code
OutputSchema = Annotated[StructuredDataset,
    kwtypes(
        name=str,
        age=int
    )
]

FXConfig = Annotated[StructuredDataset, kwtypes(fcst_start=str, fcst_end=str, version=str)]
After the timeout we see an error similar to below show up in the console:
Copy code
Failed to check Catalog for previous results: unexpected artifactData: [o0] type: [structured_dataset_type:<columns:<name:"forecast_version" literal_type:<simple:STRING > > columns:<name:"fromCurrency" literal_type:<simple:STRING > > columns:<name:"toCurrency" literal_type:<simple:STRING > > columns:<name:"rate" literal_type:<simple:FLOAT > > columns:<name:"year_start_rate" literal_type:<simple:FLOAT > > > ] does not match any task output type: [structured_dataset_type:<columns:<name:"forecast_version" literal_type:<simple:STRING > > columns:<name:"fromCurrency" literal_type:<simple:STRING > > columns:<name:"toCurrency" literal_type:<simple:STRING > > columns:<name:"rate" literal_type:<simple:FLOAT > > columns:<name:"year_start_rate" literal_type:<simple:FLOAT > > format:"parquet" > ]
k
Cc @Yee
@Kevin Su
k
@Govind Raghu Which flytekit version are you using?
g
flytekit version is 1.1.1
a
@Govind Raghu Can you please add
@task
above
def struct_model_standalone
functions? Your error says:
Copy code
type: [structured_dataset_type ...  does not match any task output type.
My initial guess is that this is expecting some output from a task. So adding
@task
might help. Also can you please run
df.dtypes
on your dataset and share that output? I would like to see if the dataset has a consistent datatype on all rows of every column. I am also thinking about the `> format:"parquet" >`` in the error message but we can dig in that after your initial tests.
d
we've tried this @Alireza but this failure is at execution time (not registration) and we have this pattern elsewhere and it appears to be directly related to structured datasets as flyteschemas work fine
struct_model_standalone
returns an actual flyte task, we use this pattern to share common workflow dags without the runtime overhead of actually using a subworkflow
a
@Dylan Wilder have you tried to delete the
struct_model_standalone
? i.e., to use the
Copy code
@workflow
def struct_model_onemodel(version_id: str) -> OutputSchema:
    forecast_config = get_assumption(
            version_id=version_id
    )
    return struct_task(forecast_config=forecast_config)
If yes, did you see the same error?
g
yes...I tried after removing _standalone method too..but it still resulted in the same issue.
a
I see. Thank you for the quick check. In our doc the template looks like this: https://docs.flyte.org/projects/cookbook/en/latest/auto/core/type_system/structured_dataset.html#structured-dataset
Copy code
@task
def get_subset_df(
    df: Annotated[StructuredDataset, subset_cols]
) -> Annotated[StructuredDataset, subset_cols]:
    df = df.open(pd.DataFrame).all()
    df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
    # On specifying BigQuery uri for StructuredDataset, flytekit writes a pandas dataframe to a BigQuery table
    return StructuredDataset(dataframe=df)
I am still trying to see the possible differences. Can you please reorganize your first task to look like the above and see if getting same error?
k
@Dylan Wilder I ran your example in flytekit v1.2.0b1, but that’s working for me. Do you have any other workflows that can produce the same error? The root cause is that format is different between input type and expected type here.
Copy code
[structured_dataset_type:<columns:<name:"forecast_version" literal_type:<simple:STRING > > columns:<name:"fromCurrency" literal_type:<simple:STRING > > columns:<name:"toCurrency" literal_type:<simple:STRING > > columns:<name:"rate" literal_type:<simple:FLOAT > > columns:<name:"year_start_rate" literal_type:<simple:FLOAT > > > ] does not match any task output type: [structured_dataset_type:<columns:<name:"forecast_version" literal_type:<simple:STRING > > columns:<name:"fromCurrency" literal_type:<simple:STRING > > columns:<name:"toCurrency" literal_type:<simple:STRING > > columns:<name:"rate" literal_type:<simple:FLOAT > > columns:<name:"year_start_rate" literal_type:<simple:FLOAT > > format:"parquet" > ]
Here is the workflow I executed
Copy code
from typing import Annotated

import pandas as pd
from flytekit import task, workflow, StructuredDataset, kwtypes


OutputSchema = Annotated[StructuredDataset,
    kwtypes(
        name=str,
        age=int
    )
]

FXConfig = Annotated[StructuredDataset, kwtypes(fcst_start=str, fcst_end=str, version=str)]


def struct_task(forecast_config: FXConfig) -> OutputSchema:
    config_df = forecast_config.open(pd.DataFrame).all()
    fcst_start = config_df["fcst_start"][0]
    data = [['John', 10], ['Alice', 15], ['Bob', 14], [fcst_start, 3]]
    df = pd.DataFrame(data, columns=['name', 'age'])
    return OutputSchema(dataframe=df, uri="<bq://flyte-test-340607.dataset.test1>")


@task
def struct_model_standalone(forecast_config: FXConfig) -> OutputSchema:
    return struct_task(forecast_config=forecast_config)


@task
def get_assumption(version_id: str) -> FXConfig:
    d = {'fcst_start': ['2022-09-10'], 'fcst_end': ['2022-09-10'], 'version': [version_id]}
    df = pd.DataFrame(data=d)
    return StructuredDataset(dataframe=df)


@workflow
def struct_model_onemodel(version_id: str) -> OutputSchema:
    forecast_config = get_assumption(version_id=version_id)
    return struct_model_standalone(forecast_config=forecast_config)


if __name__ == "__main__":
    struct_model_onemodel(version_id="id")
g
yes...it works most of the time...this issue is intermittent...usually happens when multiple workflows are running together
here's another example for which I was able to reproduce this issue:
Copy code
Schema1 = Annotated[
    StructuredDataset,
    kwtypes(
        name=str,
        age=int
    )
]

Schema2 = Annotated[
    StructuredDataset,
    kwtypes(
        len=int,
    )
]

@task
def t1() -> Schema1:
    df = pd.DataFrame({
        "name": ["dylan", "steve"],
        "age": [33, 32]
    })
    return StructuredDataset(df)

@task
def t2(sd: Schema1) -> Schema2:
    df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
    table_id = str(uuid.uuid4())
    bq_uri = f"<bq://sp-one-model.one>_model_dev.{table_id}"
    return StructuredDataset(df, uri=bq_uri)

@workflow
def wf() -> Schema2:
    return t2(sd=t1())
k
nice, taking a look
g
this works fine on single execution...but if I try to run 2 or 3 executions simultaneously I run into the issue
d
looks like it's specific to writing to bq
or reading from
k
@Dylan Wilder Did you use caching? I can reproduce it if cache enabled
g
Yes..caching is enabled on the task.
k
yes, there is a bug in flytekit, will create a pr shortly. To workaround this, disable cache in flytekit. BQ can cache result as well.
g
Thanks Kevin!
d
wait, the above examples do not have caching though?
g
@Dylan Wilder the examples I tested were using the custom decorator that enabled caching by default.
d
oh good point 😄
161 Views