Hey all I have a workflow that runs for 2-3 hours...
# ask-the-community
v
Hey all I have a workflow that runs for 2-3 hours, use many nodes, and works well. I tried running 10 in parallel and this happened on some of them:
Copy code
Workflow[flytesnacks:development:algopipelines.flytedatapipeline.main_pipeline.smplx_proccess_pipeline] failed. RuntimeExecutionError: max number of system retry attempts [51/50] exhausted. Last known status message: 0: 0: 0: 0: 0: 0: [system] unable to read futures file, maybe corrupted, caused by: [system] Failed to read futures protobuf file., caused by: path:gs://<my-gcs-bucket>/metadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0/dn19-dn0/0/futures.pb: not found
1: 0: [system] unable to read futures file, maybe corrupted, caused by: [system] Failed to read futures protobuf file., caused by: path:gs://<my-gcs-bucket>/metadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0/dn19-dn0/0/futures.pb: not found
Any ideas what may be causing this?
Here’s what the graph looks like
I checked the GCS storage bucket used by Flyte, the file really doesn’t exist at
metadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0/dn19-dn0/0/futures.pb
This far, the path is correct:
metadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0
It does not have dn19-dn0 folder but it does have a dn19 folder and a dn0 folder there and they both have a folder named
0
with futures.pb
k
Hmm this is not good, seems like a bug. Cc @Dan Rammer (hamersaw) sorry to pull you in. @Victor Churikov but you are saying that things work and then sometimes they don’t?
v
I try not to run this too much without a good reason because it can get expensive, but yesterday evening I wanted to test that Flyte can handle load of many such tasks workflows running concurrently. The main reason why we’re moving to Flyte is because we’ve had issues with our previous tool where it would collapse under load, and we are planning to grow soon. Last time 3/10 workflows failed with this error, I will run 10 more and hopefully it won’t happen again, but it would help me if someone at least clarified what could be contributing to it so I could see if I can adjust the workflow in any way to make it easier on Flyte (for example maybe its the nested conditions. don’t know) We’ll see in 2-3 hours when these complete if it still happens Thanks
If it matters, I use flytekit version 1.6.2 and flyte-core v1.7.0. I’ll try upgrading my flytekit to v1.7.0 as well just in case
k
That is really odd, Flyte can scale extremely well
Missing file sounds like an assumption error
10 workflows is not high load
v
It’s not clear from the error if it’s an issue in the workflow or with Flyte, and not sure where to begin checking it
k
We would love to get a dummy workflow with noop tasks that reproduces the error
Can you just share the code remove logic
v
Will send in a moment
The workflow:
Copy code
import os
import uuid

from algopipelines.flytedatapipeline.kubes import (
    slack_notify,
    parse_session_info,
    parse_takes_status,
    git_clone,
    should_run_fit,
    qt_texture_training,
    get_parameters,
    skip,
)

from algopipelines.flytedatapipeline.subworkflows.shape_fit_flow import shape_fit_flow
from algopipelines.flytedatapipeline.subworkflows.evaluation_flow import evaluation_flow
from flytekit import workflow, conditional

@workflow
def smplx_proccess_pipeline(dummy: bool):

    pipeline_params = get_parameters(dummy=dummy)
    parsed_session_info = parse_session_info(dummy=dummy)
    parsed_takes_status = parse_takes_status(dummy=dummy)

    slack_notify(dummy=dummy)

    run_fit = should_run_fit(dummy=dummy)

    (motion_capture_repo := git_clone(dummy=dummy).with_overrides(name="Clone CGMocap")) >> run_fit
    (core_algo_repo := git_clone(dummy=dummy).with_overrides(name="Clone CoreAlgo")) >> run_fit
    (evaluator_repo := git_clone(dummy=dummy).with_overrides(name="Clone Evaluators")) >> run_fit
    (fitting_repo := git_clone(dummy=dummy).with_overrides(name="Clone Fitting")) >> run_fit
    (common_algo_repo := git_clone(dummy=dummy).with_overrides(name="Clone CommonAlgo")) >> run_fit

    conditional_fit = conditional("dummy_condition_1").if_(run_fit.is_true()
    ).then(shape_fit_flow(
        dummy=dummy
    )).else_().then(skip())

    conditional_train_qt = conditional("should_train_qt").if_(dummy == True
    ).then(qt_texture_training(
        dummy=dummy
    )).else_().then(skip())

    conditional_evaluation = conditional("should_run_evaluation").if_(dummy == True
    ).then(evaluation_flow(
        dummy=dummy
    )).else_().then(skip())


    # Dependencies that were handled by flyte before dummying the workflow:

    # git_clone nodes are inputs of all of the conditionals
    # parsed_session_info is an input of all the conditionals
    # parsed_takes_status is an input of the evaluation conditional
    # pipeline_params is an input of all the conditionals

    # Explicit dependencies related to our specific logic:

    pipeline_params >> run_fit

    motion_capture_repo >> conditional_train_qt
    core_algo_repo >> conditional_train_qt
    common_algo_repo >> conditional_train_qt
    evaluator_repo >> conditional_train_qt
    fitting_repo >> conditional_train_qt

    conditional_fit >> conditional_train_qt
    conditional_fit >> parsed_takes_status
    conditional_train_qt >> conditional_evaluation
shape_fit_flow dynamic (used dynamic for looping over contents in a list input:
Copy code
import os
import uuid

from algopipelines.flytedatapipeline.kubes import (
    download_video_folder,
    mocap_shape_process,
    upload_folder,
    upload_status_file,
    skip,
)
from algopipelines.flytedatapipeline.data_pipelines.fitting_scripts.smplx_shape_fitting import smplx_shape_fitting
from algopipelines.flytedatapipeline.data_pipelines.fitting_scripts.smplx_expression_fitting import smplx_expression_fitting
from algopipelines.flytedatapipeline.subworkflows.mocap_process_flow import mocap_process_flow

from algopipelines.datahandler.k8s import task_config
from flytekit import conditional, dynamic
from flytekit.types.directory import FlyteDirectory



@dynamic(**task_config(
    image='algopipelines-generic:v1',
    cpu='400m', memory='400Mi'))
def shape_fit_flow(
    dummy=bool
):

    shape_video = download_video_folder(dummy=dummy)

    shape_fitting_outputs_qt = smplx_shape_fitting(dummy=dummy)
    upload_folder(dummy=dummy).with_overrides(name='Upload Shape Fitting Outputs')

    shape_fitting_outputs_lite = smplx_shape_fitting(dummy=dummy)
    upload_folder(dummy=dummy).with_overrides(name='Upload Shape Fitting Outputs (Lite)')

    mocap_shape_process_outputs = mocap_shape_process(dummy=dummy)

    upload_folder(dummy=dummy).with_overrides(name='Upload Mocap Shape Process Outputs')

    upload_folder(dummy=dummy).with_overrides(name='Upload Shape Raw Data')

    for expression_path in ['dummy_path_1', "dummy_path_2", "...", "dummy_path_13"]:

        expression_video = download_video_folder(dummy=dummy)

        upload_folder(dummy=dummy).with_overrides(name="Upload Expression Raw Data")

        expression_fitting_outputs = smplx_expression_fitting(dummy=dummy)

        upload_status_file(dummy=dummy)

        upload_folder(dummy=dummy)

        conditional("should_run_mocap_processing").if_("dummy" == "dummy").then(
            mocap_process_flow(dummy-dummy)
        ).else_().then(skip())
mocap_process_flow dynamic: (used dynamic as workaround because some input strings were Promise(,,,), with dynamic it evaluates them properly)
Copy code
from algopipelines.flytedatapipeline.kubes import (
    mocap_expression_process,
    upload_folder,
)

from algopipelines.datahandler.k8s import task_config
from flytekit import dynamic
from flytekit.types.directory import FlyteDirectory


@dynamic(**task_config(
    image='algopipelines-generic:v1',
    cpu='400m', memory='400Mi'))
def mocap_process_flow(dummy: bool):

    expression_process_outputs = mocap_expression_process(dummy=dummy)
    upload_folder(dummy=dummy)
evaluation_flow:
Copy code
from algopipelines.flytedatapipeline.evaluators_kubes import (
    evaluation,
    download_evaluation_inputs,
)

from algopipelines.datahandler.k8s import task_config
from algopipelines.flytedatapipeline.kubes import (
    upload_status_file,
    upload_folder,
)

from flytekit import dynamic
from flytekit.types.directory import FlyteDirectory

@dynamic(**task_config(
    image='algopipelines-generic:v1',
    cpu='400m', memory='400Mi'))
def evaluation_flow(dummy: bool,):

    evaluation_inputs = download_evaluation_inputs(dummy=dummy)
    evaluation_output = evaluation(dummy=dummy)

    upload_status_file(dummy=dummy)
    upload_folder(dummy=dummy)
k8s.py (outputs kwargs for task decorators):
Copy code
from typing import List
from flytekitplugins.pod import Pod
from kubernetes.client.models import (
    V1PodSpec,
    V1Container,
    V1EmptyDirVolumeSource,
    V1PodSpec,
    V1ResourceRequirements,
    V1Volume,
    V1VolumeMount,
    V1EnvVar,
    V1EnvFromSource,
    V1ConfigMapEnvSource,
    V1SecretEnvSource,
    V1Toleration,
)

def k8s_env(env_dict) -> List[V1EnvVar]:
    return [V1EnvVar(name=k, value=v) for k, v in env_dict.items()]

def k8s_env_from_configmap(cms) -> List[V1EnvFromSource]:
    return [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name=cm)) for cm in cms]

def k8s_env_from_secret(secrets) -> List[V1EnvFromSource]:
    return [V1EnvFromSource(secret_ref=V1SecretEnvSource(name=secret)) for secret in secrets]

def task_config(image: str, **kwargs):
    cache = kwargs.get("cache", False)
    primary_container_name = kwargs.get("primary_container_name", "primary")
    env = kwargs.get("env", {})
    env_configmaps = kwargs.get("env_configmaps", [])
    env_secrets = kwargs.get("env_secrets", [])
    node_pool = kwargs.get("node_pool", "default-pool")
    cpu_request = kwargs.get("cpu", kwargs.get("cpu_request", "1"))
    cpu_limit = kwargs.get("cpu", kwargs.get("cpu_limit", "1"))
    memory_request = kwargs.get("memory", kwargs.get("memory_request", "1Gi"))
    memory_limit = kwargs.get("memory", kwargs.get("memory_limit", "1Gi"))
    gpu = int(kwargs.get("gpu", 0))
    mount_shared_memory = kwargs.get("mount_shared_memory", False)

    # Hard-coded default environment variables for all tasks
    default_env = {
        "PYTHONUNBUFFERED": "1"
    }

    env.update(default_env)

    env_from_sources = k8s_env_from_configmap(env_configmaps) + k8s_env_from_secret(env_secrets)

    resource_requests = {"cpu": cpu_request, "memory": memory_request}
    resource_limits = {"cpu": cpu_limit, "memory": memory_limit}

    if gpu > 0:
        resource_requests["<http://nvidia.com/gpu|nvidia.com/gpu>"] = gpu
        resource_limits["<http://nvidia.com/gpu|nvidia.com/gpu>"] = gpu

    volumes = []
    volume_mounts = []

    if mount_shared_memory:
        dshm_volume = V1Volume(
            name="dshm",
            empty_dir=V1EmptyDirVolumeSource(medium="Memory")
        )
        volumes.append(dshm_volume)

        dshm_volume_mount = V1VolumeMount(
            mount_path="/dev/shm",
            name="dshm"
        )
        volume_mounts.append(dshm_volume_mount)

    pod = Pod(pod_spec=V1PodSpec(
        containers=[V1Container(
            name=primary_container_name,
            env=k8s_env(env),
            env_from=env_from_sources,
            resources=V1ResourceRequirements(
                requests=resource_requests,
                limits=resource_limits,
            ),
            volume_mounts=volume_mounts,
        )],
        volumes=volumes,
        node_selector={"<http://cloud.google.com/gke-nodepool|cloud.google.com/gke-nodepool>": node_pool},
        tolerations=[
            V1Toleration(
                key="nodepool",
                operator="Equal",
                value=node_pool,
                effect="NoSchedule",
            ),
            V1Toleration(
                key='<http://nvidia.com/gpu|nvidia.com/gpu>',
                operator='Exists',
                effect='NoSchedule',
            ),
        ],
    ))

    return {
        'task_config': pod,
        'container_image': image, # Must match pod spec's image
        'cache': cache,
        'cache_serialize': cache,
        'cache_version': "xyz", # Change this string to invalidate all existing cache, it can be set to  any string
    }
This should be good enough to create a similar graph of nodes, but if the problem is because of inputs/outputs, it will probably not be reproduced with this example. I used list, namedtuple, str, bool, int, flytedirectory, flytefile as my types for inputs/outputs
k
If it is a Flyte problem, it should reproduce
Cc @Eduardo Apolinario (eapolinario)
e
@Victor Churikov, thanks for the report. We take correctness bugs very seriously, so in that vein, in order to help in this investigation, is there any way you can share the flytepropeller logs from around that run? We're also going to try to reproduce this error on our side.
@Victor Churikov, continuing the investigation on this, you mentioned in an earlier message that the futures.pb files existed only in a slightly different path. For example, in that message you're saying that
metadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0/dn19/dn0/0/futures.pb
exists, but
metadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0/dn19-dn0/0/futures.pb
doesn't, correct? Do you have any other files in either path (e.g.: a file called
errors.pb
) ?
y
and if that is the case, could you also help us confirm the path of the futures file on the ones that didn’t fail please?
v
It exists at
metadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0/dn19/0/futures.pb
(not dn19/dn0/0) I don’t see any errors.pb Since 3 days ago I ran over 100 executions so I can’t find that specific run again to confirm the paths of other futures files. I’ve been running into caching issues which have prevented me from attempting to reproduce this specific issue:
Copy code
Failed to get data from gs://<my-gcs-bucket>/outputs/73/fvqecrpi/c4dcf3fa3770b04d6658e0dfe9e47ce6 to /tmp/flytewzfk1x0g/local_flytekit/dcbc2c5993708c7a495d57e9cdf98fbf (recursive=True).

Original exception: Anonymous caller does not have storage.objects.list access to the Google Cloud Storage bucket. Permission 'storage.objects.list' denied on resource (or it may not exist)., 401
This is the response GCS returns when attempting to access a file that does not exist (such as when it has been deleted or never uploaded in the first place). It also returns this response if the
storage.objects.list
permission is missing, but this is not the case because flyte successfully uses other inputs from cache, just not this one. After checking the GCS storage, the cached output is indeed missing. I do not have any lifecycle rules enabled that could delete it automatically. I wish Flyte would verify that cached outputs actually exist in GCS before skipping with “output for this execution was read from cache” I am reinstalling my flyte-core helmchart right now, wiping the DB, and updating flytekit on my docker images to 1.7, I’ll let you know if it still happens after that
Update now that I have more context We use image streaming on GKE, because our container images need to contain a lot of heavy dependencies and can get above 10GB in size (the cudnn base itself is already 4GB). With image streaming, there’s lazy loading of the container images which lets the tasks start within seconds rather than minutes. It usually works well, but seems to not be able to keep with the pace in higher scale I recently tried to run 100 of our workflows in parallel, each has multiple gpu tasks running in parallel, which resulted in 200-1000 (kubernetes) nodes in total being up during these workflow executions. I was unable to reproduce this issue with 10 parallel workflow executions, it happened only a little bit with 20 wf executions, and with 100 the failure rate was around 80%. I wrapped the docker image with a custom entrypoint that has it sleep for 3 minutes before running anything. This gave GKE/GAR more time to load the container images, and now only 4% (4/100) executions failed. This narrows down the potential causes for this and other strange errors (such as random bus errors) enough that I feel confident saying it’s likely happening because of lazy loading of the container images, which affects the flytekit/pyflate running inside the container as well as the code itself that runs in the task. Right now I’m trying to figure out what can be checked to determine whether or not the container image has loaded enough to run flyte and our code, if I find such a thing then the next step will be to have my entrypoint check it every few seconds until it has loaded enough. If we find that stability can’t be guaranteed unless the entire image is loaded, then we will need to look into alternative solutions for container pulling, but there is no bug from Flyte’s side here.
k
@Victor Churikov would you be open to hopping on a call, happy to help debug. I am a little confused at the moment
v
In other words, this does not seem to be a Flyte issue, so I don’t think a debug call or any action from your side is needed
k
Aah sorry missed that last line - thank you. ❤️ . We hate stability issues.
102 Views