full-evening-87657
02/23/2024, 4:09 AMglamorous-carpet-83516
02/23/2024, 4:44 AMLaunchPlan.get_or_create
glamorous-carpet-83516
02/23/2024, 4:45 AMfull-evening-87657
02/23/2024, 4:49 AMglamorous-carpet-83516
02/23/2024, 7:46 AMglamorous-carpet-83516
02/23/2024, 7:46 AMuser
02/23/2024, 7:46 AMfull-evening-87657
02/23/2024, 7:47 AMglamorous-carpet-83516
02/23/2024, 7:47 AMfull-evening-87657
03/01/2024, 8:05 AMimport json
from flytekit.configuration import PlatformConfig
from flytekit.clients import raw, friendly
from typing import Optional
from flyteidl.admin.common_pb2 import ObjectGetRequest
from flytekit.models.core.identifier import Identifier
from flytekit.models.core.identifier import ResourceType
from flytekit.models.core.identifier import Identifier, ResourceType
# from flytekit.core.launch_plan import LaunchPlan
from flyteidl.admin.launch_plan_pb2 import LaunchPlanCreateRequest, LaunchPlanSpe
from flyteidl.admin.workflow_pb2 import Workflow as _workflow
from flyteidl.core import identifier_pb2 as _identifier_pb2
import flyteidl.admin.launch_plan_pb2 as _launch_plan_pb2
# Set Up Client
sandbox_cfg = PlatformConfig.for_endpoint(endpoint='localhost:30080', insecure=True)
raw_client = raw.RawSynchronousFlyteClient(cfg=sandbox_cfg, insecure=True)
friendly_client = friendly.SynchronousFlyteClient(cfg=sandbox_cfg)
# Get WorkflowSpec from Client
def get_workflow(client=raw_client,
project:str='flytesnacks',
domain:str='development',
name: str='pb2_wf',
version: Optional[str]=None) -> _workflow:
obj_req = ObjectGetRequest(
id=Identifier(
resource_type=ResourceType.WORKFLOW,
project=project,
domain=domain,
name=name,
version=version
).to_flyte_idl()
)
_wf = client.get_workflow(get_object_request=obj_req)
return _wf
# Create default LaunchPlan with WorkflowSpec
def create_default_launch_plan(wf: _workflow,
client=raw_client,
lp_name: str='name_lp',
lp_version: str='lp_default_name'):
launch_plan = _launch_plan_pb2.LaunchPlan(
id=_identifier_pb2.Identifier(
resource_type=_identifier_pb2.LAUNCH_PLAN,
project=wf.id.project,
domain=wf.id.domain,
name=lp_name,
version=lp_version,
),
spec=_launch_plan_pb2.LaunchPlanSpec(
workflow_id=_identifier_pb2.Identifier(
resource_type=_identifier_pb2.WORKFLOW,
project=wf.id.project,
domain=wf.id.domain,
name=wf.id.name,
version=wf.id.version,
)
),
)
lp_create_request = LaunchPlanCreateRequest(
id = launch_plan.id,
spec = launch_plan.spec
)
try:
client.create_launch_plan(launch_plan_create_request=lp_create_request)
except Exception as e:
print('a')
print(e)
def create_schedule_launch_plan():
pass
if __name__ == '__main__':
wf = get_workflow(name = 'aaa', version='v3')
create_default_launch_plan(wf, lp_name=wf.id.name, lp_version=wf.id.version)
glamorous-carpet-83516
03/04/2024, 11:52 PM