Chen Yuanyuan
01/24/2022, 2:57 AMKetan (kumare3)
Chen Yuanyuan
01/24/2022, 3:55 AMKetan (kumare3)
Chen Yuanyuan
01/24/2022, 4:04 AMKetan (kumare3)
RES = {
"development": {
"requests": Resources(...),
"limits": Resources(...),
},
"production": {
...
},
}
@task(requests=RES[settings.domain]["requests"], limits=RES[settngs.domain]["limits"],)
def foo():
...
Yee
Chen Yuanyuan
01/25/2022, 2:24 AMenvironment
here.Ketan (kumare3)
Pradithya Aria Pura
01/25/2022, 3:05 AMif os.getenv("WORKFLOW_DOMAIN") == "production":
# Create scheduled execution in production environment
# schedule for model training and deployment
train_and_deploy_wf_schedule = LaunchPlan.get_or_create(
name=f"{__name__}.train_and_deploy_wf_schedule",
# name of workflow to be triggered
workflow=train_and_deploy,
# fixed_inputs is used to define the workflow input when triggered by scheduler
# in this case we want to pass "evironment" = "production"
fixed_inputs={
"environment": "production"
},
schedule=CronSchedule(
schedule="0 10 * * 1",
# The execution time is passed to workflow as argument called "kickoff_time"
kickoff_time_input_arg="kickoff_time",
),
)
elif os.getenv("WORKFLOW_DOMAIN") == "staging":
# Create scheduled execution in staging environment
train_and_deploy_wf_schedule = LaunchPlan.get_or_create(
name=f"{__name__}.train_and_deploy_wf_schedule",
workflow=train_and_deploy,
fixed_inputs={
"environment": "staging"
},
schedule=CronSchedule(
schedule="0 10 * * 1",
# The execution time is passed to workflow as argument called "kickoff_time"
kickoff_time_input_arg="kickoff_time",
),
)
Within the workflow, we use environment
to chose the correct configuration for given domain/environmentKetan (kumare3)
Pradithya Aria Pura
01/25/2022, 10:10 AMdomain = os.getenv("WORKFLOW_DOMAIN", "staging")
training_resource_requests = {
"staging" : Resources(cpu="2", mem="4Gi"),
"production" : Resources(cpu="4", mem="16Gi"),
}
training_resource_limits = {
"staging" : Resources(cpu="2", mem="4Gi"),
"production" : Resources(cpu="4", mem="16Gi"),
}
@task(requests=training_resource_requests[domain], limits=training_resource_limits[domain])
def train_model(train_df: pd.DataFrame, config: DictConfig) -> FlyteFile[TypeVar("joblib.dat")]:
We have WORKFLOW_DOMAIN
being set in CI during workflow registration which you can use to configure the workflow.
2. Using dynamics workflow (https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.dynamic.html#flytekit.dynamic)
This is more powerful but more complex. In this case you can use the config
input variable to store the resource request and the resource request can be adjusted without having to resubmit the workflow. The option 1 above will only configure resource request once during workflow submission.Ketan (kumare3)