aloof-magazine-14878
06/11/2024, 9:26 PMancient-wolf-19325
06/11/2024, 9:26 PMFlyteRemote
, you can follow these steps:
1. Import necessary libraries: Import FlyteRemote
, Config
, and LaunchPlan
from flytekit
.
2. Initialize FlyteRemote: Create a FlyteRemote
object with the appropriate configuration.
3. Fetch the workflow: Use the fetch_workflow
method to get the workflow you want to create a launch plan for.
4. Create the launch plan: Use the LaunchPlan.create
method to create a launch plan for the fetched workflow.
5. Register the launch plan: Register the created launch plan using the register_launch_plan
method of FlyteRemote
.
Here is an example code snippet:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
from flytekit import LaunchPlan
# Initialize FlyteRemote
remote = FlyteRemote(
config=Config.for_endpoint(endpoint="flyte.example.net"),
default_project="flytesnacks",
default_domain="development",
)
# Fetch the workflow
flyte_workflow = remote.fetch_workflow(name="workflows.example.wf", version="v1")
# Create the launch plan
launch_plan = LaunchPlan.create(
name="example_launch_plan",
workflow=flyte_workflow,
default_inputs={"mean": 1}
)
# Register the launch plan
remote.register_launch_plan(launch_plan)
Would you like more detail?
Sources:
- Running launch plans
- Launch plans š 0 button š 0 button š” Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
06/11/2024, 9:28 PMancient-wolf-19325
06/11/2024, 9:28 PMfetch_workflow
method returns an object of class FlyteWorkflow
, which is not directly compatible with LaunchPlan.create
that expects a WorkflowBase
object. This is a known issue and has been discussed in the Flyte community.
To work around this, you can use the fetch_launch_plan
method to get an existing launch plan and then modify it as needed. Here is an example:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
from flytekit import LaunchPlan
# Initialize FlyteRemote
remote = FlyteRemote(
config=Config.for_endpoint(endpoint="flyte.example.net"),
default_project="flytesnacks",
default_domain="development",
)
# Fetch the existing launch plan
existing_launch_plan = remote.fetch_launch_plan(name="existing_launch_plan_name")
# Modify the launch plan as needed
new_launch_plan = LaunchPlan.create(
name="new_launch_plan_name",
workflow=existing_launch_plan.workflow,
default_inputs={"input1": "value1", "input2": "value2"}
)
# Register the new launch plan
remote.register_launch_plan(new_launch_plan)
Would you like more detail?
Sources:
- FlyteRemote fetch_workflow issue
- FlyteRemote fetch_launch_plan usage š 0 button š 0 button š” Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
06/11/2024, 9:35 PMancient-wolf-19325
06/11/2024, 9:35 PMfetch_workflow
and fetch_launch_plan
return FlyteWorkflow
objects, which are not directly compatible with LaunchPlan.create
. This is a known limitation and has been discussed in the Flyte community.
As of now, there is no direct way to create a LaunchPlan
from a FlyteWorkflow
object due to the missing python_interface
attribute. This issue is tracked in the Flyte GitHub repository (issue #2977).
Given this limitation, the recommended approach is to define and register workflows and launch plans using the Flyte SDK directly in your code, rather than relying on remote fetching and creation.
Here is an example of how to define and register a workflow and its launch plan using the Flyte SDK:
from flytekit import task, workflow, LaunchPlan
# Define a task
@task
def my_task(x: int) -> int:
return x + 1
# Define a workflow
@workflow
def my_workflow(x: int) -> int:
return my_task(x=x)
# Create a launch plan
my_launch_plan = LaunchPlan.create(
name="my_launch_plan",
workflow=my_workflow,
default_inputs={"x": 5}
)
# Register the workflow and launch plan
if __name__ == "__main__":
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
remote = FlyteRemote(
config=Config.for_endpoint(endpoint="<http://flyte.example.net|flyte.example.net>"),
default_project="flytesnacks",
default_domain="development",
)
remote.register_workflow(my_workflow)
remote.register_launch_plan(my_launch_plan)
Would you like more detail?
Sources:
- FlyteRemote fetch_workflow issue
- FlyteRemote fetch_launch_plan usage š 0 button š 1 button š” Hint: Mention @ancient-wolf-19325 in the thread for followups.aloof-magazine-14878
06/15/2024, 9:28 PMancient-wolf-19325
06/15/2024, 9:28 PMFlyteRemote
class to interact with Flyte's control plane. Here are the steps:
1. Initialize FlyteRemote: Create a FlyteRemote
object with the appropriate configuration.
2. Fetch the execution: Use the fetch_execution
method to get the execution object.
3. Inspect the execution status: Access the closure.phase
attribute of the fetched execution to get its status.
Here is an example code snippet:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
# Initialize FlyteRemote
remote = FlyteRemote(
config=Config.for_endpoint(endpoint="flyte.example.net"),
default_project="flytesnacks",
default_domain="development",
)
# Fetch the execution
execution = remote.fetch_execution(
name="fb22e306a0d91e1c6000", # Replace with your execution ID
project="flytesnacks",
domain="development"
)
# Inspect the execution status
status = execution.closure.phase
print(f"Execution status: {status}")
Would you like more detail?
Sources:
- Inspecting Executions
- FlyteRemote API š 0 button š 0 button š” Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
06/15/2024, 9:50 PMancient-wolf-19325
06/15/2024, 9:50 PMWorkflowExecutionPhase
enum from flytekit.models.core.execution
. Here is an updated example:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
from flytekit.models.core.execution import WorkflowExecutionPhase
# Initialize FlyteRemote
remote = FlyteRemote(
config=Config.for_endpoint(endpoint="flyte.example.net"),
default_project="flytesnacks",
default_domain="development",
)
# Fetch the execution
execution = remote.fetch_execution(
name="fb22e306a0d91e1c6000", # Replace with your execution ID
project="flytesnacks",
domain="development"
)
# Convert the numerical phase to a human-readable status
status = WorkflowExecutionPhase.enum_to_string(execution.closure.phase)
print(f"Execution status: {status}")
Would you like more detail?
Sources:
- FlyteRemote API
- WorkflowExecutionPhase š 0 button š 0 button š” Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
08/01/2024, 8:18 PMancient-wolf-19325
08/01/2024, 8:18 PMpyflyte
by specifying the --launchplans-only
option in the pyflyte register
command. This will ensure that only the launch plans are registered, and the workflows are ignored.
Here is an example command:
FLYTE_SDK_LOGGING_LEVEL=20 pyflyte register --non-fast -p flytesnacks --domain development dev_register_lp.py --version t1 --launchplans-only
This command will register only the launch plans defined in dev_register_lp.py
under the project flytesnacks
and domain development
with version t1
.
Would you like more detail?
Sources:
- Flyte GitHub Issue #5073 š 0 button š 0 button š” Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
08/01/2024, 8:20 PMancient-wolf-19325
08/01/2024, 8:20 PM--launchplans-only
option is not available in your version of pyflyte
. As an alternative, you can use the pyflyte register
command to register workflows and launch plans together, and then deactivate the default launch plans if needed.
Here is an example of how to register workflows and launch plans together and then deactivate the default launch plans:
1. Register workflows and launch plans:
FLYTE_SDK_LOGGING_LEVEL=20 pyflyte register --non-fast -p flytesnacks --domain development dev_register_lp.py --version t1
2. Deactivate the default launch plans:
pyflyte launchplan --deactivate -p flytesnacks -d development dev_python_dependency_deck.py_deps_deck_wf
This approach ensures that only the explicit launch plans you have defined remain active.
Would you like more detail?
Sources:
- Flyte GitHub Issue #5073 š 0 button š 0 button š” Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
08/01/2024, 9:28 PMancient-wolf-19325
08/01/2024, 9:28 PMDict[str, int]
parameters not being correctly set in the launch plan is likely due to the way Flyte handles dictionary types. Flyte's type system can sometimes struggle with complex types like dictionaries, especially when they are nested or have specific type constraints.
To work around this, you can serialize the dictionary to a JSON string before passing it to the launch plan. Here is an example of how to do this:
1. Define the workflow and task:
from flytekit import task, workflow, LaunchPlan
import json
from typing import Dict
@task
def my_task(d: Dict[str, int]) -> int:
return sum(d.values())
@workflow
def my_workflow(d: str) -> int:
d_dict = json.loads(d)
return my_task(d=d_dict)
2. Create and register the launch plan:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
from flytekit import LaunchPlan
# Initialize FlyteRemote
remote = FlyteRemote(
config=Config.for_endpoint(endpoint="flyte.example.net"),
default_project="flytesnacks",
default_domain="development",
)
# Create the launch plan with the dictionary serialized as a JSON string
launch_plan = LaunchPlan.create(
name="my_launch_plan",
workflow=my_workflow,
default_inputs={"d": json.dumps({"key1": 1, "key2": 2})}
)
# Register the launch plan
remote.register_launch_plan(launch_plan)
This approach ensures that the dictionary is correctly passed as a JSON string and then deserialized within the workflow.
Would you like more detail?
Sources:
- Flyte GitHub Issue #5473
- Flyte GitHub Issue #4740 š 0 button š 0 button š” Hint: Mention @runllm in the thread for followups.