Hi! I ran into some issues trying to schedule some...
# flytekit
m
Hi! I ran into some issues trying to schedule some worfklow. The docs say I could activate my launch plan using a command
Copy code
pyflyte lp -p {{ your project }} -d {{ your domain }} activate-all
The thing is,
lp
subcommand does not exist at all, so the command above fails. Ideally would like to do it programmatically from Python having a handle to a launch plan object. Is this possible?
k
Ohh thank you for the catch, that's a doc bug
Cc @Smriti Satyan / @Samhita Alla
1
👀 1
y
m
Not really, I asked about a programmatic way to do things, not a cli command. Fortunately I managed to glue up a solution which is more more or less what I needed. Still, its kinda ugly, especially that I have to explicitly pass type hints:
Copy code
import subprocess
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

from flytekit import CronSchedule, LaunchPlan
from flytekit.configuration import Config
from flytekit.core.notification import Notification
from flytekit.core.workflow import WorkflowBase
from flytekit.remote import FlyteRemote, FlyteWorkflow, FlyteWorkflowExecution
from flytekit.remote.launch_plan import FlyteLaunchPlan


@dataclass
class ExecSpec:
    project: str
    domain: str
    remote: FlyteRemote = field(init=False)
    workflow: Union[WorkflowBase, str]
    cron_expression: Optional[str] = None
    launch_plan_name: Optional[str] = None
    default_inputs: Optional[Dict[str, Any]] = None
    fixed_inputs: Optional[Dict[str, Any]] = None
    notifications: Optional[List[Notification]] = None
    version: Optional[str] = None

    def __post_init__(self):
        self.remote = FlyteRemote(
            config=Config.auto(),
            default_project=self.project,
            default_domain=self.domain,
        )

    @property
    def workflow_name(self) -> str:
        if isinstance(self.workflow, str):
            return self.workflow
        else:
            return self.workflow.name

    def get_remote_workflow(self) -> FlyteWorkflow:
        return self.remote.fetch_workflow(
            project=self.project,
            domain=self.domain,
            name=self.workflow_name,
            version=self.version,
        )

    def register_and_get_lp(self, use_schedule: bool = False) -> FlyteLaunchPlan:
        launch_plan_name = self.launch_plan_name or f"{self.workflow_name}-schedule"

        schedule = None
        if use_schedule:
            if not self.cron_expression:
                raise ValueError("Cron expression is not setup")

            schedule = CronSchedule(schedule=self.cron_expression)

        r_wf = self.get_remote_workflow()

        print("Getting or creating launch plan", self.workflow_name)
        launch_plan = LaunchPlan.get_or_create(
            name=launch_plan_name,
            workflow=self.workflow,
            schedule=schedule,
            default_inputs=self.default_inputs,
            fixed_inputs=self.fixed_inputs,
            notifications=self.notifications,
        )

        print("Registering launch plan")
        return self.remote.register_launch_plan(
            launch_plan,
            project=self.project,
            domain=self.domain,
            version=r_wf.id.version,
        )

    def register_and_activate_lp(self) -> FlyteLaunchPlan:
        lp = self.register_and_get_lp(use_schedule=True)
        version = lp.workflow_id.version
        subprocess.run(
            f"flytectl update launchplan {lp.name} -p {self.project} -d {self.domain} --version {version} --activate",
            shell=True,  # nosec
            check=True,
        )

        return lp

    def execute_workflow(
        self,
        execution_name: Optional[str] = None,
        inputs: Optional[Dict[str, Any]] = None,
    ) -> FlyteWorkflowExecution:
        wf = self.get_remote_workflow()
        final_inputs = {}
        final_inputs.update(self.default_inputs or {})
        final_inputs.update(self.fixed_inputs or {})
        final_inputs.update(inputs or {})

        type_hints = {name: type(val) for name, val in final_inputs.items()}

        return self.remote.execute_remote_wf(
            wf,
            inputs=final_inputs,
            type_hints=type_hints,
            execution_name=execution_name,
            project=self.project,
            domain=self.domain,
        )

    def execute_launchplan(
        self,
        execution_name: Optional[str] = None,
        inputs: Optional[Dict[str, Any]] = None,
    ) -> FlyteWorkflowExecution:
        lp = self.register_and_get_lp(use_schedule=False)

        inputs = inputs or {}
        type_hints = {name: type(val) for name, val in inputs.items()}

        return self.remote.execute_remote_task_lp(
            lp,
            execution_name=execution_name,
            inputs=inputs,
            type_hints=type_hints,
            project=self.project,
            domain=self.domain,
        )
👀 1
k
This should be in Flyte remote - not exact but similar- want to propose
104 Views