Marcin Zieminski
07/28/2022, 11:14 AMpyflyte lp -p {{ your project }} -d {{ your domain }} activate-all
The thing is, lp
subcommand does not exist at all, so the command above fails. Ideally would like to do it programmatically from Python having a handle to a launch plan object. Is this possible?Ketan (kumare3)
Yee
Marcin Zieminski
08/02/2022, 10:38 AMimport subprocess
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
from flytekit import CronSchedule, LaunchPlan
from flytekit.configuration import Config
from flytekit.core.notification import Notification
from flytekit.core.workflow import WorkflowBase
from flytekit.remote import FlyteRemote, FlyteWorkflow, FlyteWorkflowExecution
from flytekit.remote.launch_plan import FlyteLaunchPlan
@dataclass
class ExecSpec:
project: str
domain: str
remote: FlyteRemote = field(init=False)
workflow: Union[WorkflowBase, str]
cron_expression: Optional[str] = None
launch_plan_name: Optional[str] = None
default_inputs: Optional[Dict[str, Any]] = None
fixed_inputs: Optional[Dict[str, Any]] = None
notifications: Optional[List[Notification]] = None
version: Optional[str] = None
def __post_init__(self):
self.remote = FlyteRemote(
config=Config.auto(),
default_project=self.project,
default_domain=self.domain,
)
@property
def workflow_name(self) -> str:
if isinstance(self.workflow, str):
return self.workflow
else:
return self.workflow.name
def get_remote_workflow(self) -> FlyteWorkflow:
return self.remote.fetch_workflow(
project=self.project,
domain=self.domain,
name=self.workflow_name,
version=self.version,
)
def register_and_get_lp(self, use_schedule: bool = False) -> FlyteLaunchPlan:
launch_plan_name = self.launch_plan_name or f"{self.workflow_name}-schedule"
schedule = None
if use_schedule:
if not self.cron_expression:
raise ValueError("Cron expression is not setup")
schedule = CronSchedule(schedule=self.cron_expression)
r_wf = self.get_remote_workflow()
print("Getting or creating launch plan", self.workflow_name)
launch_plan = LaunchPlan.get_or_create(
name=launch_plan_name,
workflow=self.workflow,
schedule=schedule,
default_inputs=self.default_inputs,
fixed_inputs=self.fixed_inputs,
notifications=self.notifications,
)
print("Registering launch plan")
return self.remote.register_launch_plan(
launch_plan,
project=self.project,
domain=self.domain,
version=r_wf.id.version,
)
def register_and_activate_lp(self) -> FlyteLaunchPlan:
lp = self.register_and_get_lp(use_schedule=True)
version = lp.workflow_id.version
subprocess.run(
f"flytectl update launchplan {lp.name} -p {self.project} -d {self.domain} --version {version} --activate",
shell=True, # nosec
check=True,
)
return lp
def execute_workflow(
self,
execution_name: Optional[str] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> FlyteWorkflowExecution:
wf = self.get_remote_workflow()
final_inputs = {}
final_inputs.update(self.default_inputs or {})
final_inputs.update(self.fixed_inputs or {})
final_inputs.update(inputs or {})
type_hints = {name: type(val) for name, val in final_inputs.items()}
return self.remote.execute_remote_wf(
wf,
inputs=final_inputs,
type_hints=type_hints,
execution_name=execution_name,
project=self.project,
domain=self.domain,
)
def execute_launchplan(
self,
execution_name: Optional[str] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> FlyteWorkflowExecution:
lp = self.register_and_get_lp(use_schedule=False)
inputs = inputs or {}
type_hints = {name: type(val) for name, val in inputs.items()}
return self.remote.execute_remote_task_lp(
lp,
execution_name=execution_name,
inputs=inputs,
type_hints=type_hints,
project=self.project,
domain=self.domain,
)
Ketan (kumare3)