full-ram-17934
09/13/2024, 11:16 PMjolly-florist-66728
09/13/2024, 11:16 PMFlyteRemote.fetch_execution
to get the workflow execution.
2. Sync the Execution: Sync the execution to ensure all node executions are up-to-date.
3. Inspect Node Executions: Iterate through the node executions to identify the failed task.
Here's a code snippet demonstrating this process:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
# Initialize FlyteRemote
remote = FlyteRemote(
config=Config.for_endpoint(endpoint="flyte.example.net"),
default_project="flytesnacks",
default_domain="development",
)
# Fetch the workflow execution
execution = remote.fetch_execution(name="fb22e306a0d91e1c6000", project="flytesnacks", domain="development")
# Sync the execution to get all node executions
synced_execution = remote.sync(execution, sync_nodes=True)
# Iterate through node executions to find the failed task
for node_key, node_exec in synced_execution.node_executions.items():
if node_exec.closure.phase == "FAILED":
print(f"Failed node: {node_key}")
print(f"Error: {node_exec.closure.error.message}")
break
This code will print the first failed node and its error message. You can extend it to handle more complex logic if needed.
Would you like more details on any specific part of this process?
Sources:
- Debugging Executions 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.average-finland-92144
09/16/2024, 3:33 PM