Victor Churikov
06/28/2023, 12:24 PMWorkflow[flytesnacks:development:algopipelines.flytedatapipeline.main_pipeline.smplx_proccess_pipeline] failed. RuntimeExecutionError: max number of system retry attempts [51/50] exhausted. Last known status message: 0: 0: 0: 0: 0: 0: [system] unable to read futures file, maybe corrupted, caused by: [system] Failed to read futures protobuf file., caused by: path:gs://<my-gcs-bucket>/metadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0/dn19-dn0/0/futures.pb: not found
1: 0: [system] unable to read futures file, maybe corrupted, caused by: [system] Failed to read futures protobuf file., caused by: path:gs://<my-gcs-bucket>/metadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0/dn19-dn0/0/futures.pb: not found
Any ideas what may be causing this?metadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0/dn19-dn0/0/futures.pb
This far, the path is correct: metadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0
It does not have dn19-dn0 folder but it does have a dn19 folder and a dn0 folder there and they both have a folder named 0
with futures.pbKetan (kumare3)
Victor Churikov
06/28/2023, 1:44 PMKetan (kumare3)
Victor Churikov
06/28/2023, 1:49 PMKetan (kumare3)
Victor Churikov
06/28/2023, 1:52 PMimport os
import uuid
from algopipelines.flytedatapipeline.kubes import (
slack_notify,
parse_session_info,
parse_takes_status,
git_clone,
should_run_fit,
qt_texture_training,
get_parameters,
skip,
)
from algopipelines.flytedatapipeline.subworkflows.shape_fit_flow import shape_fit_flow
from algopipelines.flytedatapipeline.subworkflows.evaluation_flow import evaluation_flow
from flytekit import workflow, conditional
@workflow
def smplx_proccess_pipeline(dummy: bool):
pipeline_params = get_parameters(dummy=dummy)
parsed_session_info = parse_session_info(dummy=dummy)
parsed_takes_status = parse_takes_status(dummy=dummy)
slack_notify(dummy=dummy)
run_fit = should_run_fit(dummy=dummy)
(motion_capture_repo := git_clone(dummy=dummy).with_overrides(name="Clone CGMocap")) >> run_fit
(core_algo_repo := git_clone(dummy=dummy).with_overrides(name="Clone CoreAlgo")) >> run_fit
(evaluator_repo := git_clone(dummy=dummy).with_overrides(name="Clone Evaluators")) >> run_fit
(fitting_repo := git_clone(dummy=dummy).with_overrides(name="Clone Fitting")) >> run_fit
(common_algo_repo := git_clone(dummy=dummy).with_overrides(name="Clone CommonAlgo")) >> run_fit
conditional_fit = conditional("dummy_condition_1").if_(run_fit.is_true()
).then(shape_fit_flow(
dummy=dummy
)).else_().then(skip())
conditional_train_qt = conditional("should_train_qt").if_(dummy == True
).then(qt_texture_training(
dummy=dummy
)).else_().then(skip())
conditional_evaluation = conditional("should_run_evaluation").if_(dummy == True
).then(evaluation_flow(
dummy=dummy
)).else_().then(skip())
# Dependencies that were handled by flyte before dummying the workflow:
# git_clone nodes are inputs of all of the conditionals
# parsed_session_info is an input of all the conditionals
# parsed_takes_status is an input of the evaluation conditional
# pipeline_params is an input of all the conditionals
# Explicit dependencies related to our specific logic:
pipeline_params >> run_fit
motion_capture_repo >> conditional_train_qt
core_algo_repo >> conditional_train_qt
common_algo_repo >> conditional_train_qt
evaluator_repo >> conditional_train_qt
fitting_repo >> conditional_train_qt
conditional_fit >> conditional_train_qt
conditional_fit >> parsed_takes_status
conditional_train_qt >> conditional_evaluation
shape_fit_flow dynamic (used dynamic for looping over contents in a list input:
import os
import uuid
from algopipelines.flytedatapipeline.kubes import (
download_video_folder,
mocap_shape_process,
upload_folder,
upload_status_file,
skip,
)
from algopipelines.flytedatapipeline.data_pipelines.fitting_scripts.smplx_shape_fitting import smplx_shape_fitting
from algopipelines.flytedatapipeline.data_pipelines.fitting_scripts.smplx_expression_fitting import smplx_expression_fitting
from algopipelines.flytedatapipeline.subworkflows.mocap_process_flow import mocap_process_flow
from algopipelines.datahandler.k8s import task_config
from flytekit import conditional, dynamic
from flytekit.types.directory import FlyteDirectory
@dynamic(**task_config(
image='algopipelines-generic:v1',
cpu='400m', memory='400Mi'))
def shape_fit_flow(
dummy=bool
):
shape_video = download_video_folder(dummy=dummy)
shape_fitting_outputs_qt = smplx_shape_fitting(dummy=dummy)
upload_folder(dummy=dummy).with_overrides(name='Upload Shape Fitting Outputs')
shape_fitting_outputs_lite = smplx_shape_fitting(dummy=dummy)
upload_folder(dummy=dummy).with_overrides(name='Upload Shape Fitting Outputs (Lite)')
mocap_shape_process_outputs = mocap_shape_process(dummy=dummy)
upload_folder(dummy=dummy).with_overrides(name='Upload Mocap Shape Process Outputs')
upload_folder(dummy=dummy).with_overrides(name='Upload Shape Raw Data')
for expression_path in ['dummy_path_1', "dummy_path_2", "...", "dummy_path_13"]:
expression_video = download_video_folder(dummy=dummy)
upload_folder(dummy=dummy).with_overrides(name="Upload Expression Raw Data")
expression_fitting_outputs = smplx_expression_fitting(dummy=dummy)
upload_status_file(dummy=dummy)
upload_folder(dummy=dummy)
conditional("should_run_mocap_processing").if_("dummy" == "dummy").then(
mocap_process_flow(dummy-dummy)
).else_().then(skip())
mocap_process_flow dynamic: (used dynamic as workaround because some input strings were Promise(,,,), with dynamic it evaluates them properly)
from algopipelines.flytedatapipeline.kubes import (
mocap_expression_process,
upload_folder,
)
from algopipelines.datahandler.k8s import task_config
from flytekit import dynamic
from flytekit.types.directory import FlyteDirectory
@dynamic(**task_config(
image='algopipelines-generic:v1',
cpu='400m', memory='400Mi'))
def mocap_process_flow(dummy: bool):
expression_process_outputs = mocap_expression_process(dummy=dummy)
upload_folder(dummy=dummy)
evaluation_flow:
from algopipelines.flytedatapipeline.evaluators_kubes import (
evaluation,
download_evaluation_inputs,
)
from algopipelines.datahandler.k8s import task_config
from algopipelines.flytedatapipeline.kubes import (
upload_status_file,
upload_folder,
)
from flytekit import dynamic
from flytekit.types.directory import FlyteDirectory
@dynamic(**task_config(
image='algopipelines-generic:v1',
cpu='400m', memory='400Mi'))
def evaluation_flow(dummy: bool,):
evaluation_inputs = download_evaluation_inputs(dummy=dummy)
evaluation_output = evaluation(dummy=dummy)
upload_status_file(dummy=dummy)
upload_folder(dummy=dummy)
k8s.py (outputs kwargs for task decorators):
from typing import List
from flytekitplugins.pod import Pod
from kubernetes.client.models import (
V1PodSpec,
V1Container,
V1EmptyDirVolumeSource,
V1PodSpec,
V1ResourceRequirements,
V1Volume,
V1VolumeMount,
V1EnvVar,
V1EnvFromSource,
V1ConfigMapEnvSource,
V1SecretEnvSource,
V1Toleration,
)
def k8s_env(env_dict) -> List[V1EnvVar]:
return [V1EnvVar(name=k, value=v) for k, v in env_dict.items()]
def k8s_env_from_configmap(cms) -> List[V1EnvFromSource]:
return [V1EnvFromSource(config_map_ref=V1ConfigMapEnvSource(name=cm)) for cm in cms]
def k8s_env_from_secret(secrets) -> List[V1EnvFromSource]:
return [V1EnvFromSource(secret_ref=V1SecretEnvSource(name=secret)) for secret in secrets]
def task_config(image: str, **kwargs):
cache = kwargs.get("cache", False)
primary_container_name = kwargs.get("primary_container_name", "primary")
env = kwargs.get("env", {})
env_configmaps = kwargs.get("env_configmaps", [])
env_secrets = kwargs.get("env_secrets", [])
node_pool = kwargs.get("node_pool", "default-pool")
cpu_request = kwargs.get("cpu", kwargs.get("cpu_request", "1"))
cpu_limit = kwargs.get("cpu", kwargs.get("cpu_limit", "1"))
memory_request = kwargs.get("memory", kwargs.get("memory_request", "1Gi"))
memory_limit = kwargs.get("memory", kwargs.get("memory_limit", "1Gi"))
gpu = int(kwargs.get("gpu", 0))
mount_shared_memory = kwargs.get("mount_shared_memory", False)
# Hard-coded default environment variables for all tasks
default_env = {
"PYTHONUNBUFFERED": "1"
}
env.update(default_env)
env_from_sources = k8s_env_from_configmap(env_configmaps) + k8s_env_from_secret(env_secrets)
resource_requests = {"cpu": cpu_request, "memory": memory_request}
resource_limits = {"cpu": cpu_limit, "memory": memory_limit}
if gpu > 0:
resource_requests["<http://nvidia.com/gpu|nvidia.com/gpu>"] = gpu
resource_limits["<http://nvidia.com/gpu|nvidia.com/gpu>"] = gpu
volumes = []
volume_mounts = []
if mount_shared_memory:
dshm_volume = V1Volume(
name="dshm",
empty_dir=V1EmptyDirVolumeSource(medium="Memory")
)
volumes.append(dshm_volume)
dshm_volume_mount = V1VolumeMount(
mount_path="/dev/shm",
name="dshm"
)
volume_mounts.append(dshm_volume_mount)
pod = Pod(pod_spec=V1PodSpec(
containers=[V1Container(
name=primary_container_name,
env=k8s_env(env),
env_from=env_from_sources,
resources=V1ResourceRequirements(
requests=resource_requests,
limits=resource_limits,
),
volume_mounts=volume_mounts,
)],
volumes=volumes,
node_selector={"<http://cloud.google.com/gke-nodepool|cloud.google.com/gke-nodepool>": node_pool},
tolerations=[
V1Toleration(
key="nodepool",
operator="Equal",
value=node_pool,
effect="NoSchedule",
),
V1Toleration(
key='<http://nvidia.com/gpu|nvidia.com/gpu>',
operator='Exists',
effect='NoSchedule',
),
],
))
return {
'task_config': pod,
'container_image': image, # Must match pod spec's image
'cache': cache,
'cache_serialize': cache,
'cache_version': "xyz", # Change this string to invalidate all existing cache, it can be set to any string
}
Ketan (kumare3)
Eduardo Apolinario (eapolinario)
06/29/2023, 6:10 PMmetadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0/dn19/dn0/0/futures.pb
exists, but metadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0/dn19-dn0/0/futures.pb
doesn't, correct? Do you have any other files in either path (e.g.: a file called errors.pb
) ?Yee
Victor Churikov
07/01/2023, 11:39 AMmetadata/propeller/flytesnacks-development-algopipelinesflytedatapipeline-da0bfb4f/n10/data/0/dn19/0/futures.pb
(not dn19/dn0/0)
I don’t see any errors.pb
Since 3 days ago I ran over 100 executions so I can’t find that specific run again to confirm the paths of other futures files. I’ve been running into caching issues which have prevented me from attempting to reproduce this specific issue:
Failed to get data from gs://<my-gcs-bucket>/outputs/73/fvqecrpi/c4dcf3fa3770b04d6658e0dfe9e47ce6 to /tmp/flytewzfk1x0g/local_flytekit/dcbc2c5993708c7a495d57e9cdf98fbf (recursive=True).
Original exception: Anonymous caller does not have storage.objects.list access to the Google Cloud Storage bucket. Permission 'storage.objects.list' denied on resource (or it may not exist)., 401
This is the response GCS returns when attempting to access a file that does not exist (such as when it has been deleted or never uploaded in the first place). It also returns this response if the storage.objects.list
permission is missing, but this is not the case because flyte successfully uses other inputs from cache, just not this one. After checking the GCS storage, the cached output is indeed missing. I do not have any lifecycle rules enabled that could delete it automatically.
I wish Flyte would verify that cached outputs actually exist in GCS before skipping with “output for this execution was read from cache”
I am reinstalling my flyte-core helmchart right now, wiping the DB, and updating flytekit on my docker images to 1.7, I’ll let you know if it still happens after thatKetan (kumare3)
Victor Churikov
09/05/2023, 1:24 PMKetan (kumare3)