Hey all, I programmatically launch multiple execu...
# ask-the-community
v
Hey all, I programmatically launch multiple executions and store the FlyteWorkflowExecution objects in a map. Later I iterate over them to check FlyteWorkflowExecution.is_done, but it’s always False, even when the executions have finished with success or failed. The .closure.phase is always 0, too (if it matters). Has anyone used is_done before to track workflows status, and can check if I’m doing it right? Code attached inside this thread, thanks
This message contains interactive elements.
Code:
Copy code
def myfunc(...):
        wf = self.client.register_script(smplx_proccess_pipeline)

        execution_map = {}

        for session in tqdm(sessions):
            infered_session = self.infere_session_info(session)
            config = {**constant_config, **{"session_info_full_path": infered_session}}
            execution = self.client.run_workflow(wf, inputs=config)
            execution_map[infered_session] = execution
        try:
            <http://logger.info|logger.info>("Waiting for completion. Press ctrl+c to cancel all runs")

            with tqdm(total=len(execution_map), desc="Waiting for executions to complete") as pbar:
                while any(not execution.is_done for execution in execution_map.values()):
                    for session, execution in execution_map.items():
                        <http://logger.info|logger.info>(f"Execution for session {session} is done: {execution.is_done}, execution phase: {execution.closure.phase}")
                        if execution.is_done:
                            del execution_map[session]
                            pbar.update(1)
                            <http://logger.info|logger.info>(f"Execution completed for session {session}")
                            break

                    time.sleep(10)

        except KeyboardInterrupt:
            <http://logger.info|logger.info>("KeyboardInterrupt received! Cancelling all runs!")
            for session, execution in execution_map.items():
                <http://logger.info|logger.info>(f"Cancelling run for session {session}")
                self.client.terminate(execution, "Cancelled by user")
The KeyboardInterrupt to cancel runs works well, can be seen in the screenshot that the orange five are terminated. This makes me believe that I have valid FlyteWorkflowExecution objects
Note that
self.client
here is a custom wrapper around FlyteRemote, my
run_workflow
function just calls flyteremote.execute with the desired configuration + my defaults self.client.terminate calls flyteremote’s terminate
k
The is-done doesn’t poll underneath. You have to call sync again.
v
With sync it works well, thanks! the loop:
Copy code
try:
            <http://logger.info|logger.info>("Waiting for completion. Press ctrl+c to cancel all runs")

            with tqdm(total=len(execution_map), desc="Waiting for executions to complete") as pbar:

                while any(not execution.is_done for execution in execution_map.values()):

                    for session, execution in execution_map.items():
                        if execution.is_done:
                            del execution_map[session]
                            pbar.update(1)
                            break

                        execution_map[session] = self.client.sync(execution)

                    time.sleep(10)

        except KeyboardInterrupt:
            <http://logger.info|logger.info>("KeyboardInterrupt received! Cancelling all runs!")
            for session, execution in execution_map.items():
                <http://logger.info|logger.info>(f"Cancelling run for session {session}")
                self.client.terminate(execution, "Cancelled by user")
147 Views