Craig Amundsen
03/28/2023, 10:38 PMGreg Gydush
03/28/2023, 10:45 PMexecution = remote.sync_execution(execution, sync_nodes=True)
while not execution.is_done:
# do something
time.sleep(5)
^ does the above not work for your use case?def wait_for_workflow_execution_to_finish(
execution: FlyteWorkflowExecution,
log_interval: int | None = None,
) -> FlyteWorkflowExecution:
"""Wait for workflow to complete and return execution.
Args:
execution: FlyteWorkflowExecution object
log_interval: Interval for logging (in seconds). Defaults to 60.
Raises:
ValueError: If workflow execution failed
Returns:
Synced workflow execution
"""
log_interval = log_interval or 60
remote = get_flyte_remote(execution.id.project, execution.id.domain)
original_start_time = time.time()
last_logged_time = time.time()
<http://logger.info|logger.info>(
"Syncing workflow execution. Total time elapsed: "
f"{datetime.timedelta(seconds=0)}."
)
while not execution.is_done:
time.sleep(5)
execution = remote.sync_execution(execution, sync_nodes=True)
if (time.time() - last_logged_time) >= log_interval:
last_logged_time = time.time()
elapsed = datetime.timedelta(seconds=last_logged_time - original_start_time)
<http://logger.info|logger.info>(f"Syncing execution. Total time elapsed: {elapsed}.")
if execution.error:
raise ValueError(execution.closure.error.message)
<http://logger.info|logger.info>("Workflow execution finished.")
return execution
^ I do something like this to get a synced execution (but you could also grab status trivially)Craig Amundsen
03/28/2023, 11:00 PM