acoustic-carpenter-78188
02/21/2023, 11:58 PM{"json":{"exec_id":"atfkcwwv5cfr7wzhqq94","node":"n1","ns":"flytesnacks-
development","res_ver":"5501760","routine":"worker-3","tasktype":"python-
task","wf":"flytesnacks:development:<http://example_test.wf|example_test.wf>"},"level":"error","msg":"DataCatalog failed to get outputs from
artifact 45bd1d68-a013-43b1-a56b-b7597b559125, err: unexpected artifactData: [o0] type:
[structured_dataset_type:\u003c\u003e ] does not match any task output type:
[structured_dataset_type:\u003cformat:\"parquet\" \u003e ]","ts":"2022-09-12T06:56:41Z"}
When the cache is enabled, we'll retrieve artifacts from datacatalog and check if the structured dataset's schema and format match the expected type.
However, the default format of the structured dataset in the expected type is always Parquet
, but the format of the output structured dataset is ""
.
@task(cache=True, cache_version="1.0")
def t2() -> StructuredDataset: # The default format of structured dataset is Parquet here
df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
return StructuredDataset(df, uri=bq_uri) # The format of structured dataset is ""
Two ways to fix it.
1. Change these lines to
if len(structuredDatasetType.Format) != 0 && !strings.EqualFold(structuredDatasetType.Format, t.literalType.GetStructuredDatasetType().Format) {
return false
}
2. Change the default format of the expected type to "" in flytekit, and change these lines to the below. However, it will break existing users. If users upgrade flytekit, they have to upgrade the propeller as well.
if len(t.literalType.GetStructuredDatasetType().Format) != 0 && !strings.EqualFold(structuredDatasetType.Format, t.literalType.GetStructuredDatasetType().Format) {
return false
}
structuredDatasetType
is input type
t.literalType.GetStructuredDatasetType()
is expected type
Expected behavior
BQ task should run successfully even if the cache is enabled
Additional context to reproduce
import uuid
import pandas as pd
from typing_extensions import Annotated
from flytekit import task, workflow, StructuredDataset, kwtypes
@task(cache=True, cache_version="2.0")
def t1() -> StructuredDataset:
df = pd.DataFrame({
"name": ["dylan", "steve"],
"age": [33, 32]
})
return StructuredDataset(df)
@task(cache=True, cache_version="2.0")
def t2(sd: StructuredDataset) -> StructuredDataset:
df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
table_id = str(uuid.uuid4())
bq_uri = f"<bq://flyte-test-340607.dataset>.{table_id}"
return StructuredDataset(df, uri=bq_uri)
@workflow
def wf() -> StructuredDataset:
return t2(sd=t1())
if __name__ == "__main__":
wf()
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteacoustic-carpenter-78188
02/21/2023, 11:58 PM