Dear Flyte community, we are experiencing a cache...
# ask-the-community
l
Dear Flyte community, we are experiencing a cache put failure in several tasks in a workflow using an on-premises flyte-binary 1.11, which we set up from scratch (empty database, empty metadata storage on S3). The problem is deterministic and occurs always in the same tasks across different executions. For other tasks in the same workflow caching works as expected. The affected tasks prepare inputs for a subsequent map task and look roughly like shown in the snippet below. What we have tried: - increase resource limits and requests to high values to ensure problem is not caused by hitting the limits, - activate/deactivate cache serialization. None of this prevented the issue. In the "node_executions" table in the database the value for "cache_status" is set to "CACHE_PUT_FAILURE" for the corresponding node executions. In the source code, I found this function (https://github.com/flyteorg/flyte/blob/fdaa0216dd849e461e6a493ea2f99e99a30e3447/flytepropeller/pkg/controller/nodes/cache.go#L199) where the cache is put. Oddly, I cannot find the error that is logged in row 228 anywhere in the flyte-binary logs. My question is, what are possible causes for a CACHE_PUT_FAILURE? Is there a limit to the size or number of elements in the cache? Are there any platform setting that affect the behaviour of the cache and could mitigate this issue?
Copy code
@dataclass_json
@dataclass
class SomeMapTaskInput:
    a: FlyteFile
    b: FlyteFile
    c: FlyteFile
	
	
@task(
    cache=True,
    cache_version="0.0.1",
    cache_serialize=False,
	requests=...
	limits=...
)
def map_task_input_preparation(
    cfg: shared.OHLIConfigRightOfWay,
    as: List[FlyteFile],
    bs: List[FlyteFile],
    c: FlyteFile,
) -> List[SomeMapTaskInput]:
...
return [SomeMapTaskInput(a, b, c) for a, b in zip(as, bs)]
p
Hi Lukas. As part of the 1.11 release we fixed a bug to correctly bubble up cache put failures. Without the error log this might be hard to debug. Would you be able to check the datacatalog logs for any logged errors/warnings?
The problem is deterministic and occurs always in the same tasks across different executions.
This makes me think this could be related to already exists errors
l
Hey Paul, thanks for your answer. I obtained the logs directly from the flyte-binary pod (with `kubectl logs``...) and have the feeling that the logs are not complete. However, one message is logged which is probably related:
Copy code
2024/03/14 23:40:03 /flyteorg/build/datacatalog/pkg/repositories/gormimpl/dataset.go:36 ERROR: duplicate key value violates unique constraint "datasets_pkey" (SQLSTATE 23505)
[2.037ms] [rows:0] INSERT INTO "datasets" ("created_at","updated_at","deleted_at","project","name","domain","version","uuid","serialized_metadata") VALUES ('2024-03-14 23:40:03.37','2024-03-14 23:40:03.37',NULL,'ohli-core','flyte_task-OHLI.workflows.right_of_way.right_of_way_input_preparation','development','0.0.5_0.0.2-fJQjzhDJ-G6gfv-8i','5b192671-b4b5-42fe-83eb-b868fd7232e9','<binary>')
What's a bit odd, though, is that I am seeing a similar log for every task that finished. So, it's not unique to the task at which the cache put failure occurs.
What exactly is the idea behind the "already exists" error? Under what circumstances is this error raised?
After adjusting the logging level to warning, I finally got an error message. It's
Failed to create artifact id: ...
(see screenshot below) followed by
Failed to write results to catalog for Task [{{{} [] [] 0xc000720000} 157 [] TASK ohli-core development OHLI.workflows.orthophoto.orthophoto_phase4_input_preparation local_a75f358f194a41cbcf09d8dff19c13801ed08b1f_2024-03-14-10-55-45 }]. Error: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (26606474 vs. 4194304)
So, we are obviously running into a size limit. The datamodel that is logged in "Failed to create artifact id" is also massive. Is there a limit we can adjust in the platform configuration?
b
In your flyte config, set the
server.grpc.maxMessageSizeBytes
value higher.
I think the default is 4MB, we bumped ours to 6 after observing this problem. But it appears you'll need to bump yours quite a bit higher. Looks like the failure is caused by a 26MB message?
l
Thanks a lot for pointing out this setting, @Blake Jackson. I set the following in the flyte-binary Helm values but still get the same error in the logs which states the limit of 4 MB is exceed. Does anyone know how to configure this setting for flyte-binary?
Copy code
configuration:
  inline:
    server:
      grpc:
        maxMessageSizeBytes: 33554432
I start to believe there is a bug in flyte-binary that prevents proper use of this setting. Can someone confirm this? If so, I could open a GH issue.
b
We use flyte-core, but the config you have seems correct. Do you know if the endpoint being called is against the admin server or the datacatalog server? (I still don't have a full grasp of all the moving parts, but do see 2 separate configurations in code: https://github.com/flyteorg/flyte/blob/master/flyteadmin/pkg/config/config.go#L24 and https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/config/config.go#L13
and if it's the latter, there does not appear to be a way to set the value currently
d
I think that's a flyteadmin setting. @Lukas Bommes could you verify if it's being rendered in the configmap? I think it's
kubectl describe cm flyte-binary -n flyte
?
l
@David Espejo (he/him) That was actually also my first idea. But I validated that the setting is properly configured in the configmap and it indeed is. This is how the configmap currently looks like (I removed the entire auth section and some hostnames and ports for privacy and replaced them with <somevalue>)
Copy code
apiVersion: v1
data:
  000-core.yaml: |
    admin:
      clientId: flytepropeller
      endpoint: localhost:8089
      insecure: true
    catalog-cache:
      endpoint: localhost:8081
      insecure: true
      type: datacatalog
    cluster_resources:
      standaloneDeployment: false
      templatePath: /etc/flyte/cluster-resource-templates
    logger:
      show-source: true
      level: 3
    propeller:
      create-flyteworkflow-crd: true
    webhook:
      certDir: /var/run/flyte/certs
      localCert: true
      secretName: flyte-backend-flyte-binary-webhook-secret
      serviceName: flyte-backend-flyte-binary-webhook
      servicePort: 443
    flyte:
      admin:
        disableClusterResourceManager: false
        disableScheduler: false
        disabled: false
        seedProjects:
        - flytesnacks
      dataCatalog:
        disabled: false
      propeller:
        disableWebhook: false
        disabled: false
  001-plugins.yaml: |
    tasks:
      task-plugins:
        default-for-task-types:
          container: container
          container_array: k8s-array
          sidecar: sidecar
        enabled-plugins:
        - container
        - sidecar
        - k8s-array
        - agent-service
    plugins:
      logs:
        kubernetes-enabled: false
        cloudwatch-enabled: false
        stackdriver-enabled: false
        templates:
          - displayName: Logs
            messageFormat: 0
            templateUris:
            - http://<host>:<port>/d/df43f8e0-6db3-4f36-92bf-7083547f9b18/logs?orgId=1&var-podName={{
              .podName }}&var-containerName={{ .containerName }}&var-namespace={{ .namespace
              }}&from={{ .podUnixStartTime }}000&to=now
          - displayName: Resource Usage
            messageFormat: 0
            templateUris:
            - http://<host>:<port>/d/6581e46e4e5c7ba40a07646395ef7b23/kubernetes-compute-resources-pod?orgId=1&refresh=10s&var-datasource=default&var-cluster=&var-namespace={{
              .namespace }}&var-pod={{ .podName }}&from={{ .podUnixStartTime }}000&to={{ .podUnixFinishTime
              }}000
      k8s:
        co-pilot:
          image: "<http://cr.flyte.org/flyteorg/flytecopilot-release:v1.11.0|cr.flyte.org/flyteorg/flytecopilot-release:v1.11.0>"
      k8s-array:
        logs:
          config:
            kubernetes-enabled: false
            cloudwatch-enabled: false
            stackdriver-enabled: false
            templates:
              - displayName: Logs
                messageFormat: 0
                templateUris:
                - http://<host>:<port>/d/df43f8e0-6db3-4f36-92bf-7083547f9b18/logs?orgId=1&var-podName={{
                  .podName }}&var-containerName={{ .containerName }}&var-namespace={{ .namespace
                  }}&from={{ .podUnixStartTime }}000&to=now
              - displayName: Resource Usage
                messageFormat: 0
                templateUris:
                - http://<host>:<port>/d/6581e46e4e5c7ba40a07646395ef7b23/kubernetes-compute-resources-pod?orgId=1&refresh=10s&var-datasource=default&var-cluster=&var-namespace={{
                  .namespace }}&var-pod={{ .podName }}&from={{ .podUnixStartTime }}000&to={{ .podUnixFinishTime
                  }}000
  002-database.yaml: |
    database:
      postgres:
        username: flyte
        host: flyte-postgres-postgresql-hl.flyte
        port: 5432
        dbname: flyte
        options: "sslmode=disable"
  003-storage.yaml: |
    propeller:
      rawoutput-prefix: <s3://flyteuserdata-985a62c2-9998-4558-b0d9-4a3bc1b8464e/data>
    storage:
      type: stow
      stow:
        kind: s3
        config:
          region: us-east-1
          disable_ssl: true
          v2_signing: true
          endpoint: http://<host>
          auth_type: accesskey
      container: flytemeta-985a62c2-9998-4558-b0d9-4a3bc1b8464e
  004-auth.yaml: <placeholder>
  100-inline-config.yaml: |
    catalog-cache:
      max-cache-age: 1416h
    flyteadmin:
      useOffloadedWorkflowClosure: true
    plugins:
      k8s:
        default-pod-template-name: ohli-template
    propeller:
      max-output-size-bytes: 52428800
    server:
      grpc:
        maxMessageSizeBytes: 33554432
    storage:
      limits:
        maxDownloadMBs: 50
    task_resources:
      defaults:
        cpu: 500m
        gpu: 0
        memory: 500Mi
      limits:
        cpu: 24
        gpu: 1
        memory: 48Gi
    tasks:
      task-plugins:
        default-for-task-types:
        - container: container
        - container_array: k8s-array
        - dask: dask
        enabled-plugins:
        - container
        - sidecar
        - k8s-array
        - dask
kind: ConfigMap
metadata:
  annotations:
    <http://meta.helm.sh/release-name|meta.helm.sh/release-name>: flyte-backend
    <http://meta.helm.sh/release-namespace|meta.helm.sh/release-namespace>: flyte
  creationTimestamp: "2024-03-13T15:57:07Z"
  labels:
    <http://app.kubernetes.io/instance|app.kubernetes.io/instance>: flyte-backend
    <http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>: Helm
    <http://app.kubernetes.io/name|app.kubernetes.io/name>: flyte-binary
    <http://app.kubernetes.io/version|app.kubernetes.io/version>: 1.16.0
    <http://helm.sh/chart|helm.sh/chart>: flyte-binary-v1.11.0
    <http://helm.toolkit.fluxcd.io/name|helm.toolkit.fluxcd.io/name>: flyte-backend
    <http://helm.toolkit.fluxcd.io/namespace|helm.toolkit.fluxcd.io/namespace>: flyte
  name: flyte-backend-flyte-binary-config
  namespace: flyte
  resourceVersion: "90523012"
  uid: 4ad5249d-3154-4cfc-b7d0-469546977fa2
We worked around this by streamlining our workflow so that message size stays below 4 MB.