I am using Flyte v1.13.0. If I run a workflow and...
# flyte-support
n
I am using Flyte v1.13.0. If I run a workflow and due to some reason a task sends a large output (greater than maxDownloadMBs, I can see an error in the propeller logs:
Copy code
"error","msg":"Error when trying to reconcile workflow. Error [failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: output file @[<>/n0/data/0/outputs.pb] is too large [19664776] bytes, max allowed [10485760] bytes]. Error Type[*errors.NodeErrorWithCause]"
But the workflow is stuck on running on flyte console….. I can see this issue to be similar but it doesn’t seem that this has been resolved. https://github.com/flyteorg/flyte/issues/381
h
@nice-market-38632, can you give more details? Do you still see the execution stuck in the Running state in flyteconsole if you refresh the page?
n
yes, it is actually stuck on running
h
ok, can you share the structure of the workflow you're running? Also, which version of Flyte? (edit: you mention 1.13.0 in https://github.com/flyteorg/flyte/issues/381#issuecomment-2306811974) Is this single-binary?
n
I deployed admin and propeller separately on aws k8s cluster.
👍 1
Give me some time, I will create a reproducible example and share. I saw the issue open on github, so I thought it was yet to be addressed!
h
No, let me clarify. The 10MB limit still applies, what I'm looking for confirmation is this weird state you're seeing for the workflow not being aborted after the task is aborted.
n
yeah got it. but actually the github issue also states the same that workflow gets stuck in this case… Will get back with an example anyways
t
and this is the top level workflow that’s getting stuck right? not a subworkflow or something else
n
yes, it is a top level workflow that gets stuck.
Copy code
from flytekit import  task, workflow, ImageSpec
from typing import List, Tuple, Union

normal_image = ImageSpec(
    base_image="python:3.9-slim",
    packages=["flytekit==1.10.3"],
    registry="ttl.sh",
    name="skdjbKBJ1341-normal",
    source_root="..",
)

@task(container_image=normal_image)
def print_arrays(arr1: str) -> None:
    print(f"Array 1: {arr1}")

@task(container_image=normal_image)
def increase_size_of_of_arrays(n: int) -> str:
    arr1 = 'a' * n * 1024
    return arr1

# Workflow: Orchestrate the tasks
@workflow
def simple_pipeline(n: int) -> int:
    arr1 = increase_size_of_of_arrays(n=n)
    print_arrays(arr1)
    return 2

# Runs the pipeline locally
if __name__ == "__main__":
    result = simple_pipeline(n=5)
I just verified it running in flyte sandbox also. Please register the above file.
Copy code
pyflyte --pkgs limit_eg  package -f --source .
flytectl register files --project flytesnacks --domain development --archive flyte-package.tgz --version 1
Then run it from UI with n=11000 (i/o size will be some 11 MB). This workflow is forever stuck in running now. I can just see the propeller logs:
Copy code
[failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: output file @[<s3://my-s3-bucket/metadata/propeller/flytesnacks-development-at9h4tkhqx5fjbp6sbfm/n0/data/0/outputs.pb>] is too large [11264029] bytes, max allowed [10485760] bytes]. Error Type[*errors.NodeErrorWithCause]","ts":"2024-08-25T10:53:09Z"}
E0825 10:53:09.978923       1 workers.go:103] error syncing 'flytesnacks-development/at9h4tkhqx5fjbp6sbfm': failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: output file @[<s3://my-s3-bucket/metadata/propeller/flytesnacks-development-at9h4tkhqx5fjbp6sbfm/n0/data/0/outputs.pb>] is too large [11264029] bytes, max allowed [10485760] bytes
These are not propagated as an event to flyteadmin and hence its stuck forever without any error message on flyte console
Please let me know if you face any issue in reproducing this error on your end!
@thankful-minister-83577 @high-accountant-32689
t
will pick this up again tomorrow i think. eduardo and i both have tried to repro this and have not been able to.
f
@nice-market-38632 I'm not able to repro with a similar example. Are you able to share the flyteworkflow CRD for the run appears in flyteconsole as stuck in running?
k describe flyteworkflow -n <...> <exeuction_id>
t
we are correctly seeing Failed at the top level
n
I am also able to see it, just checked. But please see the run duration: Like it took 1hrs 26 min to mark it as failed….
Copy code
apiVersion: <http://flyte.lyft.com/v1alpha1|flyte.lyft.com/v1alpha1>
kind: FlyteWorkflow
metadata:
  creationTimestamp: '2024-08-27T06:24:56Z'
  finalizers:
    - flyte-finalizer
  generation: 12
  labels:
    domain: development
    execution-id: akmmktbbq7m987bc8f2n
    project: flytesnacks
    shard-key: '26'
    workflow-name: limit-eg-test-simple-pipeline
  name: akmmktbbq7m987bc8f2n
  namespace: flytesnacks-development
  resourceVersion: '1165624'
  uid: 69194738-ea50-48a7-b6db-23e0ad69ace7
  selfLink: >-
    /apis/flyte.lyft.com/v1alpha1/namespaces/flytesnacks-development/flyteworkflows/akmmktbbq7m987bc8f2n
status:
  dataDir: >-
    <s3://my-s3-bucket/metadata/propeller/flytesnacks-development-akmmktbbq7m987bc8f2n>
  defVersion: 1
  failedAttempts: 4
  lastUpdatedAt: '2024-08-27T06:25:01Z'
  message: >-
    failed at Node[n0]. RuntimeExecutionError: failed during plugin execution,
    caused by: output file
    @[<s3://my-s3-bucket/metadata/propeller/flytesnacks-development-akmmktbbq7m987bc8f2n/n0/data/0/outputs.pb>]
    is too large [11264029] bytes, max allowed [10485760] bytes
  nodeStatus:
    n0:
      TaskNodeStatus:
        pState: >-
          S38DAQELUGx1Z2luU3RhdGUB/4AAAQMBBVBoYXNlAQYAAQ5LOHNQbHVnaW5TdGF0ZQH/ggABD0xhc3RFdmVudFVwZGF0ZQH/hAAAAD//gQMBAQtQbHVnaW5TdGF0ZQH/ggABAwEFUGhhc2UBBAABDFBoYXNlVmVyc2lvbgEGAAEGUmVhc29uAQwAAAAQ/4MFAQEEVGltZQH/hAAAAAv/gAECAQEKAQEAAA==
        phase: 5
        phaseVersion: 1
        psv: 1
        updAt: '2024-08-27T06:25:07.939094379Z'
      dynamicNodeStatus: {}
      laStartedAt: '2024-08-27T06:25:01Z'
      lastUpdatedAt: '2024-08-27T06:25:01Z'
      message: running
      phase: 2
      queuedAt: '2024-08-27T06:25:01Z'
      startedAt: '2024-08-27T06:25:01Z'
    start-node:
      phase: 5
      stoppedAt: '2024-08-27T06:25:01Z'
  phase: 1
  startedAt: '2024-08-27T06:25:01Z'
spec:
  connections:
    n0:
      - n1
    n1:
      - end-node
    start-node:
      - n0
  edges:
    downstream:
      n0:
        - n1
      n1:
        - end-node
      start-node:
        - n0
    upstream:
      end-node:
        - n1
      n0:
        - start-node
      n1:
        - n0
  id: flytesnacks:development:limit_eg.test.simple_pipeline
  nodes:
    end-node:
      id: end-node
      inputBindings:
        - binding:
            scalar:
              primitive:
                integer: '2'
          var: o0
      kind: end
      resources: {}
    n0:
      id: n0
      inputBindings:
        - binding:
            promise:
              nodeId: start-node
              var: 'n'
          var: 'n'
      kind: task
      name: increase_size_of_of_arrays
      resources: {}
      task: >-
        resource_type:TASK project:"flytesnacks" domain:"development"
        name:"limit_eg.test.increase_size_of_of_arrays" version:"3"
    n1:
      id: n1
      inputBindings:
        - binding:
            promise:
              nodeId: n0
              var: o0
          var: arr1
      kind: task
      name: print_arrays
      resources: {}
      task: >-
        resource_type:TASK project:"flytesnacks" domain:"development"
        name:"limit_eg.test.print_arrays" version:"3"
    start-node:
      id: start-node
      kind: start
      resources: {}
  outputBindings:
    - binding:
        scalar:
          primitive:
            integer: '2'
      var: o0
  outputs:
    variables:
      o0:
        type:
          simple: INTEGER
acceptedAt: '2024-08-27T06:24:56Z'
executionConfig:
  EnvironmentVariables: null
  Interruptible: null
  MaxParallelism: 25
  OverwriteCache: false
  RecoveryExecution: {}
  TaskPluginImpls: {}
  TaskResources:
    Limits:
      CPU: '2'
      EphemeralStorage: '0'
      GPU: '5'
      Memory: 4Gi
      Storage: '0'
    Requests:
      CPU: 500m
      EphemeralStorage: '0'
      GPU: '0'
      Memory: 1Gi
      Storage: '0'
executionId:
  domain: development
  name: akmmktbbq7m987bc8f2n
  project: flytesnacks
inputs:
  literals:
    'n':
      scalar:
        primitive:
          integer: '11000'
node-defaults: {}
rawOutputDataConfig: {}
securityContext:
  run_as: {}
tasks:
  resource_type:TASK project:"flytesnacks" domain:"development" name:"limit_eg.test.increase_size_of_of_arrays" version:"3":
    container:
      args:
        - pyflyte-execute
        - '--inputs'
        - '{{.input}}'
        - '--output-prefix'
        - '{{.outputPrefix}}'
        - '--raw-output-data-prefix'
        - '{{.rawOutputDataPrefix}}'
        - '--checkpoint-path'
        - '{{.checkpointOutputPrefix}}'
        - '--prev-checkpoint'
        - '{{.prevCheckpointPrefix}}'
        - '--resolver'
        - flytekit.core.python_auto_container.default_task_resolver
        - '--'
        - task-module
        - limit_eg.test
        - task-name
        - increase_size_of_of_arrays
      image: <http://ttl.sh/skdjbkbj1341-normal:ahPJ_Pe5dK7cEx5eLvclfA|ttl.sh/skdjbkbj1341-normal:ahPJ_Pe5dK7cEx5eLvclfA>
      resources:
        limits:
          - name: CPU
            value: 500m
          - name: MEMORY
            value: 1Gi
        requests:
          - name: CPU
            value: 500m
          - name: MEMORY
            value: 1Gi
    id:
      domain: development
      name: limit_eg.test.increase_size_of_of_arrays
      project: flytesnacks
      resourceType: TASK
      version: '3'
    interface:
      inputs:
        variables:
          'n':
            type:
              simple: INTEGER
      outputs:
        variables:
          o0:
            type:
              simple: STRING
    metadata:
      retries: {}
      runtime:
        flavor: python
        type: FLYTE_SDK
        version: 1.13.4
    type: python-task
  resource_type:TASK project:"flytesnacks" domain:"development" name:"limit_eg.test.print_arrays" version:"3":
    container:
      args:
        - pyflyte-execute
        - '--inputs'
        - '{{.input}}'
        - '--output-prefix'
        - '{{.outputPrefix}}'
        - '--raw-output-data-prefix'
        - '{{.rawOutputDataPrefix}}'
        - '--checkpoint-path'
        - '{{.checkpointOutputPrefix}}'
        - '--prev-checkpoint'
        - '{{.prevCheckpointPrefix}}'
        - '--resolver'
        - flytekit.core.python_auto_container.default_task_resolver
        - '--'
        - task-module
        - limit_eg.test
        - task-name
        - print_arrays
      image: <http://ttl.sh/skdjbkbj1341-normal:ahPJ_Pe5dK7cEx5eLvclfA|ttl.sh/skdjbkbj1341-normal:ahPJ_Pe5dK7cEx5eLvclfA>
      resources:
        limits:
          - name: CPU
            value: 500m
          - name: MEMORY
            value: 1Gi
        requests:
          - name: CPU
            value: 500m
          - name: MEMORY
            value: 1Gi
    id:
      domain: development
      name: limit_eg.test.print_arrays
      project: flytesnacks
      resourceType: TASK
      version: '3'
    interface:
      inputs:
        variables:
          arr1:
            type:
              simple: STRING
      outputs: {}
    metadata:
      retries: {}
      runtime:
        flavor: python
        type: FLYTE_SDK
        version: 1.13.4
    type: python-task
workflowMeta:
  eventVersion: 2
here is a CRD of the same
(This one is still in running…)
f
what do you have configured for base-delay?
seems your workflows are slowly retrying after failedAttempts/system errors
n
let me check that out. But I used the default I guess. I did flytectl sandbox start
Copy code
propeller:
		  downstream-eval-duration: 30s
		  enable-admin-launcher: true
		  leader-election:
		    enabled: true
		    lease-duration: 15s
		    lock-config-map:
		      name: propeller-leader
		      namespace: flyte
		    renew-deadline: 10s
		    retry-period: 2s
		  limit-namespace: all
		  max-workflow-retries: 30
		  metadata-prefix: metadata/propeller
		  metrics-prefix: flyte
		  prof-port: 10254
		  queue:
		    batch-size: -1
		    batching-interval: 2s
		    queue:
		      base-delay: 5s
		      capacity: 1000
		      max-delay: 120s
		      rate: 100
		      type: maxof
		    sub-queue:
		      capacity: 100
		      rate: 10
		      type: bucket
		    type: batch
		  rawoutput-prefix: <s3://my-s3-bucket/>
		  workers: 4
		  workflow-reeval-duration: 30s
this is what i found in
flyte-propeller-config
``````
t
the CRD above had a
failedAttempts: 4
field in it… do you have one that’s higher? like near the limit of 50? (just so we can look at the timestamps)
also you mention sandbox defaults, is this just running on the
flytectl demo
environment? i’ve been testing on our live backend, not in the sandbox environment
is propeller sharded? if not, @flat-area-42876 why is there a
shard-key
in the crd? or does that just get added all the time?
f
n
yes it is the demo environment @thankful-minister-83577
do you have one that’s higher? like near the limit of 50?
where to check this?
i’ve been testing on our live backend
can you share the config of your live backend please??
Also, this error should not have been classified as “SystemRetriable” failure as it should fail immediately. Reducing
max-workflow-retries: 30
to 5 reduces the time taken to mark failed, but it should be immediate.