kind-kite-58745
07/02/2023, 5:53 PMkind-kite-58745
07/02/2023, 5:55 PMkind-kite-58745
07/02/2023, 5:56 PMdef myfunc(...):
wf = self.client.register_script(smplx_proccess_pipeline)
execution_map = {}
for session in tqdm(sessions):
infered_session = self.infere_session_info(session)
config = {**constant_config, **{"session_info_full_path": infered_session}}
execution = self.client.run_workflow(wf, inputs=config)
execution_map[infered_session] = execution
try:
<http://logger.info|logger.info>("Waiting for completion. Press ctrl+c to cancel all runs")
with tqdm(total=len(execution_map), desc="Waiting for executions to complete") as pbar:
while any(not execution.is_done for execution in execution_map.values()):
for session, execution in execution_map.items():
<http://logger.info|logger.info>(f"Execution for session {session} is done: {execution.is_done}, execution phase: {execution.closure.phase}")
if execution.is_done:
del execution_map[session]
pbar.update(1)
<http://logger.info|logger.info>(f"Execution completed for session {session}")
break
time.sleep(10)
except KeyboardInterrupt:
<http://logger.info|logger.info>("KeyboardInterrupt received! Cancelling all runs!")
for session, execution in execution_map.items():
<http://logger.info|logger.info>(f"Cancelling run for session {session}")
self.client.terminate(execution, "Cancelled by user")
The KeyboardInterrupt to cancel runs works well, can be seen in the screenshot that the orange five are terminated. This makes me believe that I have valid FlyteWorkflowExecution objectskind-kite-58745
07/02/2023, 5:58 PMself.client
here is a custom wrapper around FlyteRemote, my run_workflow
function just calls flyteremote.execute with the desired configuration + my defaults
self.client.terminate calls flyteremote’s terminatefreezing-airport-6809
tall-lock-23197
kind-kite-58745
07/03/2023, 7:48 AMtry:
<http://logger.info|logger.info>("Waiting for completion. Press ctrl+c to cancel all runs")
with tqdm(total=len(execution_map), desc="Waiting for executions to complete") as pbar:
while any(not execution.is_done for execution in execution_map.values()):
for session, execution in execution_map.items():
if execution.is_done:
del execution_map[session]
pbar.update(1)
break
execution_map[session] = self.client.sync(execution)
time.sleep(10)
except KeyboardInterrupt:
<http://logger.info|logger.info>("KeyboardInterrupt received! Cancelling all runs!")
for session, execution in execution_map.items():
<http://logger.info|logger.info>(f"Cancelling run for session {session}")
self.client.terminate(execution, "Cancelled by user")