Hi, I'm seeing a very weird error in my workflow (...
# ask-the-community
e
Hi, I'm seeing a very weird error in my workflow (see thread). Propeller is complaining that there is no input to a node in the workflow. When I look into the gcs bucket I see this in the
<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/>
path:
Copy code
data
|  0/
|  | start-node/
|  |    |  0/
|  |    |  | outputs.pb
|  
|  inputs.pb
i.e. there's no
n0/
directory under
data/0/
. When I look into
data/0/start-node/0/outputs.pb
it looks to be the correct input to the entire workflow. I'm running
flytekit 1.5.0
and deployed from helm
flyte-core:1.5.0
. I tried reverting to
1.4.0
in the core helm chart but it didn't help. Any idea why this is happening?
Copy code
[
  {
    "json": {
      "exec_id": "mye-exec-id",
      "node": "n1/n0",
      "ns": "myns",
      "res_ver": "445663335",
      "routine": "worker-7",
      "wf": "myproject:myns:mywf.mytask"
    },
    "level": "error",
    "msg": "Failed to read from the raw store [<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>] Error: not found",
    "ts": "2023-04-20T12:13:39Z"
  },
  {
    "json": {
      "exec_id": "myexecid",
      "node": "n1/n0",
      "ns": "myns",
      "res_ver": "445663335",
      "routine": "worker-7",
      "wf": "myproject:myns:mywf.mytask"
    },
    "level": "error",
    "msg": "Catalog Failure: release reservation failed. err: failed to read inputs when trying to query catalog: [READ_FAILED] failed to read data from dataDir [<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>]., caused by: path:<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>: not found",
    "ts": "2023-04-20T12:13:39Z"
  },
  {
    "json": {
      "exec_id": "myexecid",
      "node": "n1/n0",
      "ns": "myns",
      "res_ver": "445663335",
      "routine": "worker-7",
      "wf": "myproject:myns:mywf.mytask"
    },
    "level": "error",
    "msg": "Failed to finalize Dynamic Nodes Parent.",
    "ts": "2023-04-20T12:13:39Z"
  },
  {
    "json": {
      "exec_id": "myexecid",
      "node": "n1",
      "ns": "myns",
      "res_ver": "445663335",
      "routine": "worker-7",
      "wf": "myproject:myns:mywf.mytask"
    },
    "level": "error",
    "msg": "failed Execute for node. Error: 0: failed at Node[n0]. CatalogCallFailed: failed to release reservation, caused by: failed to read inputs when trying to query catalog: [READ_FAILED] failed to read data from dataDir [<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>]., caused by: path:<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>: not found",
    "ts": "2023-04-20T12:13:39Z"
  },
  {
    "json": {
      "exec_id": "myexecid",
      "ns": "myns",
      "res_ver": "445663335",
      "routine": "worker-7",
      "wf": "myproject:myns:mywf.mytask"
    },
    "level": "warning",
    "msg": "Error in handling running workflow [0: failed at Node[n0]. CatalogCallFailed: failed to release reservation, caused by: failed to read inputs when trying to query catalog: [READ_FAILED] failed to read data from dataDir [<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>]., caused by: path:<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>: not found]",
    "ts": "2023-04-20T12:13:39Z"
  },
  {
    "json": {
      "exec_id": "myexecid",
      "ns": "myns",
      "res_ver": "445663335",
      "routine": "worker-7",
      "wf": "myproject:myns:mywf.mytask"
    },
    "level": "error",
    "msg": "Error when trying to reconcile workflow. Error [0: failed at Node[n0]. CatalogCallFailed: failed to release reservation, caused by: failed to read inputs when trying to query catalog: [READ_FAILED] failed to read data from dataDir [<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>]., caused by: path:<gs://mybucket/metadata/propeller/myproject-myns-myexecid/n1/data/0/n0/inputs.pb>: not found]. Error Type[errors.ErrorCollection]",
    "ts": "2023-04-20T12:13:39Z"
  }
]
e
We've done some more debugging, including reverting the sql database to a previous state, and reverting flyte to 1.4.x. We were then seeing other errors in the propeller and datacatalog which suggested that the offending workflows were still present. To my knowledge the only other place there is state is in k8s etc for the state of the workflow CRD resource. We're now in the process of taking down all the workflows left after removing the flyte deployment (which is difficult because we need to edit the resource definition to remove the
flyte-finalizer
before deletion), and then restoring flyte to see if that fixes it. Has anyone seen these migration issues before? Hunch is maybe there were some workflows running when we did the 1.4.0 -> 1.5.0 upgrade and they got propeller in a corrupted state, but I'm not sure why that should cause new workflows to break in the way ena described above.
d
@Ena Škopelja is the failure ^^ reproducible? I think there is a bug here. It looks like we call ReleaseCatalogReservation on Finalize, which is called when aborting a Node. It looks like it's possible to abort a Node in the NotYetStarted phase. So the
n0/inputs.pb
is not yet created, and Flyte attempts to abort the Node, then it will fail. Could you file an issue on this? I will plan on submitting a fix quickly!
However, this doesn't explain some of the larger issues it seems your seeing. @Eli Bixby can you elaborate on this? During the upgrade from 1.4 -> 1.5 did you have two flytepropeller instances running simultaneously? This might explain the state issues you mentioned.
e
Yeah I'm not entirely sure what happened either. Let me unroll the sequence of events properly now that we've looked more into it. 1. I deployed 1.5.0 from flyte-core on Monday 2. I reverted to 1.4.0 yesterday 3. Workflows started failing with the errors I sent in the original message 4. We saw errors in the propeller saying that a dataset can't be created because it already exists when writing outputs of a workflow. If I ran a workflow with cache override and different inputs I still saw the error 5. We brought down flyte and rolled back the db to a backup 6. There were dangling executions which we had to delete in a hacky way (see Eli's message above) 7. We redeployed flyte 1.5.0
Looks like the same thing is happening again. This is the workflow that created the issue. I'm seeing the same logs as the ones I saw yesterday. I'm really lost as to why that would happen. I'll try to reproduce it with a mock workflow within a workflow. Not sure what else to do.
I managed to reproduce with this:
Copy code
import time
import flytekit

from flytekit import workflow, task
from flytekit.types.file import FlyteFile
from pathlib import Path
from typing import Optional


@task(cache=True, cache_serialize=True, cache_version='0.0.0')
def t1() -> FlyteFile:
    out = Path(flytekit.current_context().working_directory) / 'out.txt'
    out.write_text('Hi from t1')
    return FlyteFile(path=str(out))


@task(cache=True, cache_serialize=True, cache_version='0.0.0')
def t2(t1_: FlyteFile, optional: Optional[int] = None) -> FlyteFile:
    out = Path(flytekit.current_context().working_directory) / 'out.txt'
    with open(t1_, 'r') as f:
        out.write_text(f.read() + f'\nHi from t2, optional={optional}')
    return FlyteFile(path=str(out))


@workflow
def inner_wf(t1_: FlyteFile, optional: Optional[int] = None) -> FlyteFile:
    return t2(t1_=t2(t1_=t1_, optional=optional), optional=optional)


@workflow
def wf() -> FlyteFile:
    return inner_wf(t1_=t1())
So the issue seems to be with a workflow calling a subworkflow that has an optional input.
Here's the graph. Without cache I get this error:
failed at Node[n0]. BindingResolutionError: Error binding Var [wf].[optional], caused by: failed at Node[start-node]. OutputsNotFoundError: Failed to find [start-node].[optional]
When I add caching the error is the same as the original one (complaining about missing
../inputs.pb
)
d
Very sorry about these issues. It looks like there may be multiple things happening here, almost certainly bugs. The issue with caching (that you mentioned yesterday) is a high priority bug - basically Flyte is attempting to release the cache reservation when it shouldn't be. However, this new issue is probably the root cause - we will need to do some internal investigation as to why the input / output binding values are not being set correctly.
Now that I'm looking a little closer, the error is saying that the call the
inner_wf
can not find the
optional
parameter. @Yee can you take a brief look at the workflow ^^^?
e
Yes that's the error if I don't request caching but my original error is that an
inputs.pb
file is missing, that one I only see if I add caching @Dan Rammer (hamersaw)
here's the full error log and the workflow definition that caused it
d
Yeah, that cache issue is caused because Flyte is attempted to abort the workflow and that process is failing. The abort is attempted because of the issue in resolving inputs.
I just tested locally and removing the
None
from the
inner_wf
definition fixes this in both cases (caching and non-caching). So in your repro example:
Copy code
import time
import flytekit

from flytekit import workflow, task
from flytekit.types.file import FlyteFile
from pathlib import Path
from typing import Optional


@task(cache=True, cache_serialize=True, cache_version='0.0.0')
def t1() -> FlyteFile:
    out = Path(flytekit.current_context().working_directory) / 'out.txt'
    out.write_text('Hi from t1')
    return FlyteFile(path=str(out))


@task(cache=True, cache_serialize=True, cache_version='0.0.0')
def t2(t1_: FlyteFile, optional: Optional[int] = None) -> FlyteFile:
    out = Path(flytekit.current_context().working_directory) / 'out.txt'
    with open(t1_, 'r') as f:
        out.write_text(f.read() + f'\nHi from t2, optional={optional}')
    return FlyteFile(path=str(out))


@workflow
def inner_wf(t1_: FlyteFile, optional: Optional[int]) -> FlyteFile:
    return t2(t1_=t2(t1_=t1_, optional=optional), optional=optional)


@workflow
def wf() -> FlyteFile:
    return inner_wf(t1_=t1(), optional=None)
I'm not an expert on the flytekit side (we'll get others to look into that), but I'm wondering if there is a bug in supporting
Optional[int] = None
on workflows.
e
yeah, seems like a reasonable conclusion. thanks
d
I submitted an issue for the catalog reservation release failure. Like I said, I believe that the optional param on the subworkflow is the root cause, but this is still an issue that we need to resolve.
234 Views