https://flyte.org logo
g

Govind Raghu

09/08/2022, 10:39 PM
Hello! We would like to get some help with an issue we are facing when we use Structured Dataset as an output of a task. Intermittently the task gets stuck in Queued status. Sample Code in 🧵
Copy code
@task
def struct_task(forecast_config: FXConfig) -> OutputSchema:
    config_df = forecast_config.open(pd.DataFrame).all()
    fcst_start = config_df["fcst_start"][0]
    data = [['John', 10], ['Alice', 15], ['Bob', 14], [fcst_start, 3]]
    df = pd.DataFrame(data, columns=['name', 'age'])
    return OutputSchema(dataframe=df, uri=utils.gen_bq_path(model_name))


def struct_model_standalone(forecast_config: FXConfig) -> OutputSchema:
    return struct_task(forecast_config=forecast_config)

@task
def get_assumption(version_id: str) -> FXConfig:
    d = {'fcst_start': ['2022-09-10'], 'fcst_end': ['2022-09-10'], 'version' : [version_id]}
    df = pd.DataFrame(data=d)
    return StructuredDataset(dataframe=df)

@workflow
def struct_model_onemodel(version_id: str) -> OutputSchema:
    forecast_config = get_assumption(
            version_id=version_id
    )
    return struct_model_standalone(forecast_config=forecast_config)
Schemas:
Copy code
OutputSchema = Annotated[StructuredDataset,
    kwtypes(
        name=str,
        age=int
    )
]

FXConfig = Annotated[StructuredDataset, kwtypes(fcst_start=str, fcst_end=str, version=str)]
After the timeout we see an error similar to below show up in the console:
Copy code
Failed to check Catalog for previous results: unexpected artifactData: [o0] type: [structured_dataset_type:<columns:<name:"forecast_version" literal_type:<simple:STRING > > columns:<name:"fromCurrency" literal_type:<simple:STRING > > columns:<name:"toCurrency" literal_type:<simple:STRING > > columns:<name:"rate" literal_type:<simple:FLOAT > > columns:<name:"year_start_rate" literal_type:<simple:FLOAT > > > ] does not match any task output type: [structured_dataset_type:<columns:<name:"forecast_version" literal_type:<simple:STRING > > columns:<name:"fromCurrency" literal_type:<simple:STRING > > columns:<name:"toCurrency" literal_type:<simple:STRING > > columns:<name:"rate" literal_type:<simple:FLOAT > > columns:<name:"year_start_rate" literal_type:<simple:FLOAT > > format:"parquet" > ]
k

Ketan (kumare3)

09/09/2022, 12:56 AM
Cc @Yee
@Kevin Su
k

Kevin Su

09/09/2022, 10:47 AM
@Govind Raghu Which flytekit version are you using?
g

Govind Raghu

09/09/2022, 2:39 PM
flytekit version is 1.1.1
a

Alireza

09/09/2022, 4:14 PM
@Govind Raghu Can you please add
@task
above
def struct_model_standalone
functions? Your error says:
Copy code
type: [structured_dataset_type ...  does not match any task output type.
My initial guess is that this is expecting some output from a task. So adding
@task
might help. Also can you please run
df.dtypes
on your dataset and share that output? I would like to see if the dataset has a consistent datatype on all rows of every column. I am also thinking about the `> format:"parquet" >`` in the error message but we can dig in that after your initial tests.
d

Dylan Wilder

09/09/2022, 5:46 PM
we've tried this @Alireza but this failure is at execution time (not registration) and we have this pattern elsewhere and it appears to be directly related to structured datasets as flyteschemas work fine
struct_model_standalone
returns an actual flyte task, we use this pattern to share common workflow dags without the runtime overhead of actually using a subworkflow
a

Alireza

09/09/2022, 5:50 PM
@Dylan Wilder have you tried to delete the
struct_model_standalone
? i.e., to use the
Copy code
@workflow
def struct_model_onemodel(version_id: str) -> OutputSchema:
    forecast_config = get_assumption(
            version_id=version_id
    )
    return struct_task(forecast_config=forecast_config)
If yes, did you see the same error?
g

Govind Raghu

09/09/2022, 6:15 PM
yes...I tried after removing _standalone method too..but it still resulted in the same issue.
a

Alireza

09/09/2022, 6:32 PM
I see. Thank you for the quick check. In our doc the template looks like this: https://docs.flyte.org/projects/cookbook/en/latest/auto/core/type_system/structured_dataset.html#structured-dataset
Copy code
@task
def get_subset_df(
    df: Annotated[StructuredDataset, subset_cols]
) -> Annotated[StructuredDataset, subset_cols]:
    df = df.open(pd.DataFrame).all()
    df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
    # On specifying BigQuery uri for StructuredDataset, flytekit writes a pandas dataframe to a BigQuery table
    return StructuredDataset(dataframe=df)
I am still trying to see the possible differences. Can you please reorganize your first task to look like the above and see if getting same error?
k

Kevin Su

09/09/2022, 7:15 PM
@Dylan Wilder I ran your example in flytekit v1.2.0b1, but that’s working for me. Do you have any other workflows that can produce the same error? The root cause is that format is different between input type and expected type here.
Copy code
[structured_dataset_type:<columns:<name:"forecast_version" literal_type:<simple:STRING > > columns:<name:"fromCurrency" literal_type:<simple:STRING > > columns:<name:"toCurrency" literal_type:<simple:STRING > > columns:<name:"rate" literal_type:<simple:FLOAT > > columns:<name:"year_start_rate" literal_type:<simple:FLOAT > > > ] does not match any task output type: [structured_dataset_type:<columns:<name:"forecast_version" literal_type:<simple:STRING > > columns:<name:"fromCurrency" literal_type:<simple:STRING > > columns:<name:"toCurrency" literal_type:<simple:STRING > > columns:<name:"rate" literal_type:<simple:FLOAT > > columns:<name:"year_start_rate" literal_type:<simple:FLOAT > > format:"parquet" > ]
Here is the workflow I executed
Copy code
from typing import Annotated

import pandas as pd
from flytekit import task, workflow, StructuredDataset, kwtypes


OutputSchema = Annotated[StructuredDataset,
    kwtypes(
        name=str,
        age=int
    )
]

FXConfig = Annotated[StructuredDataset, kwtypes(fcst_start=str, fcst_end=str, version=str)]


def struct_task(forecast_config: FXConfig) -> OutputSchema:
    config_df = forecast_config.open(pd.DataFrame).all()
    fcst_start = config_df["fcst_start"][0]
    data = [['John', 10], ['Alice', 15], ['Bob', 14], [fcst_start, 3]]
    df = pd.DataFrame(data, columns=['name', 'age'])
    return OutputSchema(dataframe=df, uri="<bq://flyte-test-340607.dataset.test1>")


@task
def struct_model_standalone(forecast_config: FXConfig) -> OutputSchema:
    return struct_task(forecast_config=forecast_config)


@task
def get_assumption(version_id: str) -> FXConfig:
    d = {'fcst_start': ['2022-09-10'], 'fcst_end': ['2022-09-10'], 'version': [version_id]}
    df = pd.DataFrame(data=d)
    return StructuredDataset(dataframe=df)


@workflow
def struct_model_onemodel(version_id: str) -> OutputSchema:
    forecast_config = get_assumption(version_id=version_id)
    return struct_model_standalone(forecast_config=forecast_config)


if __name__ == "__main__":
    struct_model_onemodel(version_id="id")
g

Govind Raghu

09/09/2022, 7:17 PM
yes...it works most of the time...this issue is intermittent...usually happens when multiple workflows are running together
here's another example for which I was able to reproduce this issue:
Copy code
Schema1 = Annotated[
    StructuredDataset,
    kwtypes(
        name=str,
        age=int
    )
]

Schema2 = Annotated[
    StructuredDataset,
    kwtypes(
        len=int,
    )
]

@task
def t1() -> Schema1:
    df = pd.DataFrame({
        "name": ["dylan", "steve"],
        "age": [33, 32]
    })
    return StructuredDataset(df)

@task
def t2(sd: Schema1) -> Schema2:
    df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
    table_id = str(uuid.uuid4())
    bq_uri = f"<bq://sp-one-model.one>_model_dev.{table_id}"
    return StructuredDataset(df, uri=bq_uri)

@workflow
def wf() -> Schema2:
    return t2(sd=t1())
k

Kevin Su

09/09/2022, 7:25 PM
nice, taking a look
g

Govind Raghu

09/09/2022, 7:25 PM
this works fine on single execution...but if I try to run 2 or 3 executions simultaneously I run into the issue
d

Dylan Wilder

09/09/2022, 8:37 PM
looks like it's specific to writing to bq
or reading from
k

Kevin Su

09/10/2022, 2:28 AM
@Dylan Wilder Did you use caching? I can reproduce it if cache enabled
g

Govind Raghu

09/10/2022, 6:13 AM
Yes..caching is enabled on the task.
k

Kevin Su

09/12/2022, 5:55 AM
yes, there is a bug in flytekit, will create a pr shortly. To workaround this, disable cache in flytekit. BQ can cache result as well.
g

Govind Raghu

09/12/2022, 3:13 PM
Thanks Kevin!
d

Dylan Wilder

09/12/2022, 3:30 PM
wait, the above examples do not have caching though?
g

Govind Raghu

09/12/2022, 3:32 PM
@Dylan Wilder the examples I tested were using the custom decorator that enabled caching by default.
d

Dylan Wilder

09/12/2022, 3:32 PM
oh good point 😄
4 Views