<#2864 [BUG] Failed to run BQ task when cache is e...
# flytekit
g
#2864 [BUG] Failed to run BQ task when cache is enabled Issue created by pingsutw Describe the bug Slack Thread Failed to run BQ task when the cache is enabled because type validation is failing.
Copy code
{"json":{"exec_id":"atfkcwwv5cfr7wzhqq94","node":"n1","ns":"flytesnacks-
development","res_ver":"5501760","routine":"worker-3","tasktype":"python-
task","wf":"flytesnacks:development:<http://example_test.wf|example_test.wf>"},"level":"error","msg":"DataCatalog failed to get outputs from 
artifact 45bd1d68-a013-43b1-a56b-b7597b559125, err: unexpected artifactData: [o0] type: 
[structured_dataset_type:\u003c\u003e ] does not match any task output type: 
[structured_dataset_type:\u003cformat:\"parquet\" \u003e ]","ts":"2022-09-12T06:56:41Z"}
When the cache is enabled, we'll retrieve artifacts from datacatalog and check if the structured dataset's schema and format match the expected type. However, the default format of the structured dataset in the expected type is always
Parquet
, but the format of the output structured dataset is
""
.
Copy code
@task(cache=True, cache_version="1.0")
def t2() -> StructuredDataset: # The default format of structured dataset is Parquet here
    df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
    return StructuredDataset(df, uri=bq_uri) # The format of structured dataset is ""
Two ways to fix it. 1. Change these lines to
Copy code
if len(structuredDatasetType.Format) != 0 && !strings.EqualFold(structuredDatasetType.Format, t.literalType.GetStructuredDatasetType().Format) {
		return false
	}
2. Change the default format of the expected type to "" in flytekit, and change these lines to the below. However, it will break existing users. If users upgrade flytekit, they have to upgrade the propeller as well.
Copy code
if len(t.literalType.GetStructuredDatasetType().Format) != 0 && !strings.EqualFold(structuredDatasetType.Format, t.literalType.GetStructuredDatasetType().Format) {
		return false
	}
structuredDatasetType
is input type
t.literalType.GetStructuredDatasetType()
is expected type Expected behavior BQ task should run successfully even if the cache is enabled Additional context to reproduce
Copy code
import uuid
import pandas as pd
from typing_extensions import Annotated
from flytekit import task, workflow, StructuredDataset, kwtypes


@task(cache=True, cache_version="2.0")
def t1() -> StructuredDataset:
    df = pd.DataFrame({
        "name": ["dylan", "steve"],
        "age": [33, 32]
    })
    return StructuredDataset(df)


@task(cache=True, cache_version="2.0")
def t2(sd: StructuredDataset) -> StructuredDataset:
    df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
    table_id = str(uuid.uuid4())
    bq_uri = f"<bq://flyte-test-340607.dataset>.{table_id}"
    return StructuredDataset(df, uri=bq_uri)


@workflow
def wf() -> StructuredDataset:
    return t2(sd=t1())


if __name__ == "__main__":
    wf()
Screenshots No response Are you sure this issue hasn't been raised already? ☑︎ Yes Have you read the Code of Conduct? ☑︎ Yes flyteorg/flyte