<@U06HBGN1SA0> When I execute dynamic workflows wi...
# ask-ai
m
@Glime When I execute dynamic workflows with
map_task
, I get errors like this:
Copy code
Workflow[redacted] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: failed at Node[dn0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [k8s-array]: 
Operation cannot be fulfilled on pods "a4pdb2frgszrfnfw95xd-n0-0-n1-0-dn0-0-238": the object has been modified; please apply your changes to the latest version and try again
It seems to be a generic k8s error, can it be cause by Flyte (my workflow design of Flyte configuration)?
g
The error you're encountering,
Operation cannot be fulfilled on pods "a4pdb2frgszrfnfw95xd-n0-0-n1-0-dn0-0-238": the object has been modified; please apply your changes to the latest version and try again
, is indeed a Kubernetes (k8s) error that typically occurs when there's a conflict in updating a Kubernetes object. This can happen when multiple clients are trying to update the same object simultaneously, leading to a "conflict" error. In the context of Flyte executing dynamic workflows with
map_task
, this error might surface due to the way Flyte's
k8s-array
plugin manages pod creation and updates, especially under high concurrency or when dealing with retries of tasks. The
k8s-array
plugin is used to execute array tasks, which can involve launching multiple pods in parallel. If there's a retry mechanism in place (as indicated by the error message mentioning retry attempts being exhausted), it's possible that the retry logic is leading to rapid creation and deletion of pods, which could exacerbate the occurrence of such conflicts. Several factors could contribute to this issue in the context of Flyte: 1. ​*High Concurrency and Retry Logic*​: If your workflow involves a high degree of parallelism or aggressive retry policies, this could increase the likelihood of encountering such conflicts. 2. ​*Kubernetes Cluster State and Performance*​: The state and performance of your Kubernetes cluster can also play a role. If the cluster is under heavy load or experiencing performance issues, it might be slower to process updates, leading to a higher chance of conflicts. 3. ​*Workflow Design*​: The way your workflow is designed, particularly how tasks are orchestrated and retried, might influence the occurrence of these errors. For example, if many tasks are trying to update their status or other shared resources simultaneously, this could lead to conflicts. To mitigate this issue, you could consider the following approaches: • ​*Review and Adjust Retry Policies*​: If your workflow has aggressive retry policies, consider adjusting them to reduce the frequency of retries or introduce backoff strategies. • ​*Optimize Workflow Design*​: Review your workflow design to ensure that it optimally uses parallelism without overwhelming the Kubernetes cluster. • ​*Cluster Performance and Scaling*​: Ensure that your Kubernetes cluster is appropriately sized and configured for the workload. Scaling up the cluster or optimizing its performance might help reduce the occurrence of such errors. • ​*Flyte and Plugin Versions*​: Ensure that you are using the latest versions of Flyte and its plugins. There might be improvements or fixes in newer versions that could help mitigate this issue. Given the generic nature of the Kubernetes error and its potential relation to Flyte's handling of dynamic workflows and the
k8s-array
plugin, it's advisable to consider both Flyte-specific configurations and broader Kubernetes cluster management practicesto address this issue. --- This is an AI-generated response, take its recommendations with caution. Please help us improve the model accuracy by giving this response a thumbs up/thumbs down reaction. Sources https://discuss.flyte.org/t/10336981/hi-team-while-launching-multiple-workflows-with-map-tasks-in/ https://discuss.flyte.org/t/5076055/enabling-task-cache-for-map-tasks-throws-the-error-in-thread/ https://discuss.flyte.org/t/2404507/got-an-interesting-panic-in-flyte-propeller-k8s-array-plugin/ 👍 button 👎 button
m
@Glime please suggest where is the best place to introduce a backoff strategy. Also, is there another way to set retry policy in this context than providing
TaskMetadata(retries=X)
to
metadata
parameter of
map_task
?
g
In the context of introducing a backoff strategy for handling retries in Flyte, especially when dealing with
map_task
executions that encounter errors, there are a couple of approaches you can consider: 1. Task Level Retry Policy with Backoff: Currently, the primary method to set a retry policy in Flyte, including for
map_task
, is through the
TaskMetadata(retries=X)
parameter. This allows you to specify the number of retries for a task. However, to implement a backoff strategy directly, you would need to handle this logic within your task's code since Flyte's standard retry mechanism does not inherently support backoff intervals between retries. 2. Custom Backoff Logic: Implementing custom backoff logic within your task can be done by catching exceptions and managing retries manually within the task's code. This approach gives you the flexibility to define specific backoff intervals and conditions for retries. However, it requires additional coding and handling within each task where you want to apply this strategy. 3. Workflow Configuration: While the retry policy is typically set at the task level, you might consider broader workflow-level error handling strategies that could indirectly influence how retries are managed. For example, adjusting the concurrency of
map_task
executions or utilizing workflow-level settings to manage task executions more effectively. However, specific backoff strategies would still need to be implemented at the task level. 4. Flyte Propeller Configuration: For system-level retries, there is a possibility to configure system retries through FlytePropeller settings. This is more about how the system handles retries rather than a task-specific retry policy. Adjusting system retries might help in certain scenarios but does not replace the need for task-level retry policies with backoff. 5. External Dependency or Service: Another approach could be to offload the retry logic to an external service or dependency that manages the execution of tasks with sophisticated retry policies, including backoffs. This external system would then trigger task executions in Flyte, managing retries outside of the Flyte ecosystem. In the current Flyte documentation and discussions, there is no direct mention of a built-in backoff strategy for retries at the task or workflow level beyond the basic retry count specification through
TaskMetadata
. Therefore, for a backoff strategy, implementing custom logic within your tasks or leveraging external systems for retry management would be the approaches to consider. Regarding setting a retry policy other than providing
TaskMetadata(retries=X)
to the
metadata
parameter of
map_task
, the current mechanisms primarily revolve around this standard approach for specifying retries. The introduction of RFC 3902 suggests a unified retry budget that includes both system and user retries, but the customization of retry policies, particularly introducing backoff intervals, would still require custom implementation or external management.
m
@Glime let's say I do not want retries at all. This is my workflow:
Copy code
@dynamic
def get_data(
    items: list[str],
    param1: str,
    param2: str,
    param3: datetime,
    param4: str,
) -> list[DataFrame]:
    partial_query = functools.partial(
        my_query,
        param1=param1,
        param2=param2,
        param3=param3,
        param4=param4,
    )
    my_data = map_task(
        partial_query,
        metadata=TaskMetadata(retries=0, timeout=timedelta(minutes=240)),
        concurrency=10,
    )(my_item=items).with_overrides(requests=AVAILABLE_RESOURCES["s"])
    return my_data
I have set
retries=0
so I assume if a task fails, there is no retry. Yet I am getting following error:
Copy code
Workflow[redacted] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: failed at Node[dn0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [k8s-array]: 
Operation cannot be fulfilled on pods "a4cnhw27dj827cc5mzdv-n0-0-n1-0-dn0-0-70": the object has been modified; please apply your changes to the latest version and try again
I understand system retries refer to some other concept. Could they cause the k8s error? For sure it is not cluster resource utilization, because I monitor it all the time.
g
Given the context and the error message you're encountering, it seems like the retries you're seeing are indeed system retries rather than task retries. In Flyte, task retries are defined by the user (as you did with
retries=0
in your
TaskMetadata
), and they dictate how many times a task should be retried upon failure due to user-defined errors or exceptions within the task's execution logic. System retries, on the other hand, are managed by Flyte at the system level and are used to handle infrastructural or system-level errors, such as issues with pod scheduling, system corruption, or transient errors in the Kubernetes layer. The error message you're seeing, particularly the part about the operation on pods not being fulfilled because the object has been modified, suggests a Kubernetes-level conflict error, which is a type of issue that system retries are designed to mitigate. The error message
max number of system retry attempts [11/10] exhausted
indicates that the system has attempted to retry the operation 11 times, despite the configured maximum being 10. This discrepancy itself is a sign of system-level retries in action, attempting to recover from an error that is not directly related to the logic of your task but rather to the underlying infrastructure or the orchestration layer. To directly address your questions: 1. Could system retries cause the Kubernetes error? Yes, the Kubernetes error you're seeing is likely a result of the system retry mechanism attempting to recover from a transient or infrastructural error. These retries are not directly related to the task retries you configure with
retries=0
. 2. Is it not cluster resource utilization? Based on your monitoring and the nature of the error, it does seem like the issue is not directly related to resource utilization but rather to a conflict or transient error within the Kubernetes layer that Flyte's system retries are attempting to handle. In summary, the retries you're observing are system-level retries attempting to handle infrastructural issues, and they are separate from the task-level retries you've configured. While you've correctly set
retries=0
to avoid task-level retries, system retries are a built-in mechanism to enhance the robustness of workflow executions against infrastructural hiccups. If you're looking to minimize the impact of such system retries or to understand how they can be configured or managed, you might want to explore Flyte's system settings or reach out to the Flyte community for more specific guidance on handling system-level errors and retries.
m
@Glime I have checked logs, the only error (that I can see logged repeatedly) is:
Copy code
"Failed to cache Dynamic workflow [[CACHE_WRITE_FAILED] Failed to Cache the metadata, caused by: The entry size is larger than 1/1024 of cache size]",
Then I can see following messages:
Copy code
"Dynamic handler.Handle's called with phase 0.",
"Node level caching is disabled. Skipping catalog read.",
"Failed to clear finalizers for Resource with name: cmf-forecasts-production/aqhkngdswq792rsblbx7-n0-0-n1-0-dn0-0-357. Error: Operation cannot be fulfilled on pods \"aqhkngdswq792rsblbx7-n0-0-n1-0-dn0-0-357\": the object has been modified; please apply your changes to the latest version and try again",
Can the first error explain k8s object mofidication? I do not know how to understand it, since I did not explicitly enable any caching.
g
The error messages you're encountering suggest a couple of underlying issues related to caching and Kubernetes object management within your Flyte workflows. Let's break down the errors and their potential implications: 1. Dynamic Workflow Caching Error: The first error message,
"Failed to cache Dynamic workflow [[CACHE_WRITE_FAILED] Failed to Cache the metadata, caused by: The entry size is larger than 1/1024 of cache size]"
, indicates an issue with caching a dynamic workflow's metadata. Flyte attempts to cache the outputs of tasks and workflows to optimize execution times for subsequent runs with identical inputs. However, in this case, the metadata size for your dynamic workflow exceeds the cache's size limit, which is set to prevent the cache from being overwhelmed by large entries. This failure to cache does not directly impact the execution of your current workflow but indicates that caching optimizations will not be available for this particular execution. 2. Kubernetes Object Modification Error: The subsequent error,
"Failed to clear finalizers for Resource with name: cmf-forecasts-production/aqhkngdswq792rsblbx7-n0-0-n1-0-dn0-0-357. Error: Operation cannot be fulfilled on pods \"aqhkngdswq792rsblbx7-n0-0-n1-0-dn0-0-357\": the object has been modified; please apply your changes to the latest version and try again"
, is a Kubernetes-level error that occurs when Flyte attempts to modify a Kubernetes object (in this case, a pod) that has been concurrently modified by another process. This could happen due to various reasons, such as concurrent updates from Flyte's control plane or direct modifications to the pod outside of Flyte's management. While the first error is related to caching and does not directly impact the execution flow, the second error could potentially disrupt the workflow execution. The Kubernetes object modification error suggests that there's a conflict in managing the lifecycle of the pods associated with your workflow tasks. This could be a symptom of broader issues with how Flyte interacts with your Kubernetes cluster, possibly related to how dynamic workflows are handled or how concurrency and retries are managed within your Flyte configuration. To address these issues, consider the following steps: • Review Flyte Configuration: Ensure that your Flyte deployment is configured correctly for your Kubernetes environment. This includes settings related to concurrency, retries, and resource management. • Check for External Modifications: Verify that there are no external processes or users that might be modifying the Kubernetes objects managed by Flyte. This could lead to conflicts and errors like the one you're experiencing. • Consult Flyte Documentation and Community: Review the Flyte documentation for guidance on managing dynamic workflows and caching. The Flyte community forums and GitHub issues can also be valuable resources for troubleshooting specific errors and configurations. • Monitor Kubernetes Cluster State: Keep an eye on the overall state and performance of your Kubernetes cluster. Issues like resource contention, slow updates, or API server performance can indirectly cause the types of errors you're seeing. While the caching error might not directly explain the Kubernetes object modification issue, both point towards the need to review and possibly adjust your Flyte and Kubernetes configurations to ensure smooth operation of your workflows.