Ena Škopelja
04/20/2023, 12:31 PM<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/>
path:
data
| 0/
| | start-node/
| | | 0/
| | | | outputs.pb
|
| inputs.pb
i.e. there's no n0/
directory under data/0/
. When I look into data/0/start-node/0/outputs.pb
it looks to be the correct input to the entire workflow.
I'm running flytekit 1.5.0
and deployed from helm flyte-core:1.5.0
. I tried reverting to 1.4.0
in the core helm chart but it didn't help.
Any idea why this is happening?[
{
"json": {
"exec_id": "mye-exec-id",
"node": "n1/n0",
"ns": "myns",
"res_ver": "445663335",
"routine": "worker-7",
"wf": "myproject:myns:mywf.mytask"
},
"level": "error",
"msg": "Failed to read from the raw store [<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>] Error: not found",
"ts": "2023-04-20T12:13:39Z"
},
{
"json": {
"exec_id": "myexecid",
"node": "n1/n0",
"ns": "myns",
"res_ver": "445663335",
"routine": "worker-7",
"wf": "myproject:myns:mywf.mytask"
},
"level": "error",
"msg": "Catalog Failure: release reservation failed. err: failed to read inputs when trying to query catalog: [READ_FAILED] failed to read data from dataDir [<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>]., caused by: path:<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>: not found",
"ts": "2023-04-20T12:13:39Z"
},
{
"json": {
"exec_id": "myexecid",
"node": "n1/n0",
"ns": "myns",
"res_ver": "445663335",
"routine": "worker-7",
"wf": "myproject:myns:mywf.mytask"
},
"level": "error",
"msg": "Failed to finalize Dynamic Nodes Parent.",
"ts": "2023-04-20T12:13:39Z"
},
{
"json": {
"exec_id": "myexecid",
"node": "n1",
"ns": "myns",
"res_ver": "445663335",
"routine": "worker-7",
"wf": "myproject:myns:mywf.mytask"
},
"level": "error",
"msg": "failed Execute for node. Error: 0: failed at Node[n0]. CatalogCallFailed: failed to release reservation, caused by: failed to read inputs when trying to query catalog: [READ_FAILED] failed to read data from dataDir [<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>]., caused by: path:<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>: not found",
"ts": "2023-04-20T12:13:39Z"
},
{
"json": {
"exec_id": "myexecid",
"ns": "myns",
"res_ver": "445663335",
"routine": "worker-7",
"wf": "myproject:myns:mywf.mytask"
},
"level": "warning",
"msg": "Error in handling running workflow [0: failed at Node[n0]. CatalogCallFailed: failed to release reservation, caused by: failed to read inputs when trying to query catalog: [READ_FAILED] failed to read data from dataDir [<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>]., caused by: path:<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>: not found]",
"ts": "2023-04-20T12:13:39Z"
},
{
"json": {
"exec_id": "myexecid",
"ns": "myns",
"res_ver": "445663335",
"routine": "worker-7",
"wf": "myproject:myns:mywf.mytask"
},
"level": "error",
"msg": "Error when trying to reconcile workflow. Error [0: failed at Node[n0]. CatalogCallFailed: failed to release reservation, caused by: failed to read inputs when trying to query catalog: [READ_FAILED] failed to read data from dataDir [<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>]., caused by: path:<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>: not found]. Error Type[errors.ErrorCollection]",
"ts": "2023-04-20T12:13:39Z"
}
]
Eli Bixby
04/20/2023, 2:49 PMflyte-finalizer
before deletion), and then restoring flyte to see if that fixes it.
Has anyone seen these migration issues before? Hunch is maybe there were some workflows running when we did the 1.4.0 -> 1.5.0 upgrade and they got propeller in a corrupted state, but I'm not sure why that should cause new workflows to break in the way ena described above.Dan Rammer (hamersaw)
04/20/2023, 3:54 PMn0/inputs.pb
is not yet created, and Flyte attempts to abort the Node, then it will fail. Could you file an issue on this? I will plan on submitting a fix quickly!Ena Škopelja
04/20/2023, 4:48 PMimport time
import flytekit
from flytekit import workflow, task
from flytekit.types.file import FlyteFile
from pathlib import Path
from typing import Optional
@task(cache=True, cache_serialize=True, cache_version='0.0.0')
def t1() -> FlyteFile:
out = Path(flytekit.current_context().working_directory) / 'out.txt'
out.write_text('Hi from t1')
return FlyteFile(path=str(out))
@task(cache=True, cache_serialize=True, cache_version='0.0.0')
def t2(t1_: FlyteFile, optional: Optional[int] = None) -> FlyteFile:
out = Path(flytekit.current_context().working_directory) / 'out.txt'
with open(t1_, 'r') as f:
out.write_text(f.read() + f'\nHi from t2, optional={optional}')
return FlyteFile(path=str(out))
@workflow
def inner_wf(t1_: FlyteFile, optional: Optional[int] = None) -> FlyteFile:
return t2(t1_=t2(t1_=t1_, optional=optional), optional=optional)
@workflow
def wf() -> FlyteFile:
return inner_wf(t1_=t1())
So the issue seems to be with a workflow calling a subworkflow that has an optional input.failed at Node[n0]. BindingResolutionError: Error binding Var [wf].[optional], caused by: failed at Node[start-node]. OutputsNotFoundError: Failed to find [start-node].[optional]
../inputs.pb
)Dan Rammer (hamersaw)
04/21/2023, 12:19 PMinner_wf
can not find the optional
parameter. @Yee can you take a brief look at the workflow ^^^?Ena Škopelja
04/21/2023, 12:38 PMinputs.pb
file is missing, that one I only see if I add caching @Dan Rammer (hamersaw)Dan Rammer (hamersaw)
04/21/2023, 12:56 PMNone
from the inner_wf
definition fixes this in both cases (caching and non-caching). So in your repro example:
import time
import flytekit
from flytekit import workflow, task
from flytekit.types.file import FlyteFile
from pathlib import Path
from typing import Optional
@task(cache=True, cache_serialize=True, cache_version='0.0.0')
def t1() -> FlyteFile:
out = Path(flytekit.current_context().working_directory) / 'out.txt'
out.write_text('Hi from t1')
return FlyteFile(path=str(out))
@task(cache=True, cache_serialize=True, cache_version='0.0.0')
def t2(t1_: FlyteFile, optional: Optional[int] = None) -> FlyteFile:
out = Path(flytekit.current_context().working_directory) / 'out.txt'
with open(t1_, 'r') as f:
out.write_text(f.read() + f'\nHi from t2, optional={optional}')
return FlyteFile(path=str(out))
@workflow
def inner_wf(t1_: FlyteFile, optional: Optional[int]) -> FlyteFile:
return t2(t1_=t2(t1_=t1_, optional=optional), optional=optional)
@workflow
def wf() -> FlyteFile:
return inner_wf(t1_=t1(), optional=None)
Optional[int] = None
on workflows.Ena Škopelja
04/21/2023, 1:29 PMDan Rammer (hamersaw)
04/21/2023, 2:06 PM