Hi team, I am a DS from gojek, currently working o...
# announcements
c
Hi team, I am a DS from gojek, currently working on using Flyte for model pipeline. One question I came across is that the resource requested (cpu, memory) is hard coded in the decorator for each task for the current implementation, so it’s the same for different environments. What will be a better way to do overrides for each task in the workflow for different environments? Will it be possible to define it in a config file instead? cc @Pradithya Aria Pura @Michael Cheng Jan Kao
k
Aah interesting question- you basically want to change it based on domain. May I ask a few clarifying questions So to understand you want to specify different resources for the same task across different domains? Or do you want a common resource for all task la and change it per domain
Also what would be the ideal UX
Like the use experience in the code and how would you want to manage that
Cc @Yee / @Eduardo Apolinario (eapolinario)
c
It is more of the former, specifying different resource requirements for the same task across different domains.
For my implementation, different domains (e.g. staging or production) will have its own config files which contains different config setting for the model (e.g. model hyperparameters). It will be gd if the resource specification can be implemented in the same way.
k
Ya so since registration for each domain happens separately - and there is a variable that is exposed at this time, it is possible to use different values
But would you like config or just python variables would work?
Will share an example later
Sorry not near a computer
c
for domain, do you refer to project domain?
k
Ya
I thought that's what you want?
@Chen Yuanyuan ?
so would something like this be preferrable
Copy code
RES = {
"development": {
   "requests": Resources(...),
   "limits": Resources(...),
},
"production": {
    ...
 },
}

@task(requests=RES[settings.domain]["requests"], limits=RES[settngs.domain]["limits"],)
def foo():
   ...
wdyt? ^ cc @Eduardo Apolinario (eapolinario) / @Yee / @Kevin Su
👍 1
y
that will work.
the default (if the task doesn’t specify anything) is already configurable via the matchable resources endpoint
iirc
c
sorry for the late reply @Ketan (kumare3) I had a misunderstanding on the domain. Lemme rephrase my reply, the requirement will be specifying different resource requirements for the same task under the same project domain, the same workflow but different launchplans for the workflow. Example: there are 2 launchplans specified for workflow , one is for staging environment and another is for production. I would like to change the resource request based on the parameter
environment
here.
k
Why do you need to create 2 launchplans. The same launchplan can be created in staging and production. I am assuming here environment refers to - Flyte domains
Cc @Pradithya Aria Pura ?
Would you guys have a few minutes to chat over VC in 2 hours
p
It’s essentially one launchplan but with different fixed input i.e.
Copy code
if os.getenv("WORKFLOW_DOMAIN") == "production":
    # Create scheduled execution in production environment

    # schedule for model training and deployment
    train_and_deploy_wf_schedule = LaunchPlan.get_or_create(
        name=f"{__name__}.train_and_deploy_wf_schedule",
        # name of workflow to be triggered
        workflow=train_and_deploy,
        # fixed_inputs is used to define the workflow input when triggered by scheduler
        # in this case we want to pass "evironment" = "production"
        fixed_inputs={
            "environment": "production"
        },
        schedule=CronSchedule(
            schedule="0 10 * * 1",
            # The execution time is passed to workflow as argument called "kickoff_time"
            kickoff_time_input_arg="kickoff_time",
        ),
    )
elif os.getenv("WORKFLOW_DOMAIN") == "staging":
    # Create scheduled execution in staging environment

    train_and_deploy_wf_schedule = LaunchPlan.get_or_create(
        name=f"{__name__}.train_and_deploy_wf_schedule",
        workflow=train_and_deploy,
        fixed_inputs={
            "environment": "staging"
        },
        schedule=CronSchedule(
            schedule="0 10 * * 1",
            # The execution time is passed to workflow as argument called "kickoff_time"
            kickoff_time_input_arg="kickoff_time",
        ),
    )
Within the workflow, we use
environment
to chose the correct configuration for given domain/environment
I am available for chat, just ping me when you are available
k
Ok cool, something like that should be simple
Let's talk
p
@Chen Yuanyuan I had a chat with @Ketan (kumare3) regarding the resource requests. There are 2 ways to do it. 1. Using env var to select the correct resource requests for the given task, for example:
Copy code
domain = os.getenv("WORKFLOW_DOMAIN", "staging")
training_resource_requests = {
    "staging" : Resources(cpu="2", mem="4Gi"),
    "production" : Resources(cpu="4", mem="16Gi"),
}
training_resource_limits = {
    "staging" : Resources(cpu="2", mem="4Gi"),
    "production" : Resources(cpu="4", mem="16Gi"),
}
@task(requests=training_resource_requests[domain], limits=training_resource_limits[domain])
def train_model(train_df: pd.DataFrame, config: DictConfig) -> FlyteFile[TypeVar("joblib.dat")]:
We have 
WORKFLOW_DOMAIN
 being set in CI during workflow registration which you can use to configure the workflow. 2. Using dynamics workflow (https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.dynamic.html#flytekit.dynamic) This is more powerful but more complex. In this case you can use the 
config
 input variable to store the resource request and the resource request can be adjusted without having to resubmit the workflow. The option 1 above will only configure resource request once during workflow submission.
👍 1
Any example for the 2nd option @Ketan (kumare3)?
k
Cc @Yee / @Eduardo Apolinario (eapolinario) / @Samhita Alla
175 Views