https://flyte.org logo
#ask-the-community
Title
# ask-the-community
a

Alykhan Tejani

09/07/2023, 2:58 PM
Hi, whats the best way to pass a configurable value to a `@task`'s
task_config
. For example, I have a
TFJob
task defined as:
Copy code
@task(task_config=TFJob(num_workers=1, num_ps=1))
def run(...):
  ...
But I would like
num_workers
to be configurable. i.e. the workflow can have it passed as a param. Not sure how to propagate this to the task
h

Haytham Abuelfutuh

09/07/2023, 4:01 PM
That's an awesome question, @Alykhan Tejani... You can use
@dynamic
to "generate" tasks at runtime with any custom configurations you may want (you can even generate functions but that's a bit of an advanced topic 🙂 )
Copy code
@task(task_config=TFJob(num_workers=1, num_ps=1))
def run(...):
  ...

@dynamic
def my_task(num_workers: int) -> ...:
  run.task_config.num_workers = num_workers

  return run(...)

@workflow
def my_wf() -> ...:
  return my_task(...)
I believe something like this should do the trick...
a

Alykhan Tejani

09/07/2023, 4:02 PM
ooo this is cool
let me give it a try
hmm it didnt seem to work
here is the code
Copy code
@task(
    task_config=TfJob(
        num_workers=1,
        num_ps_replicas=1,
        num_chief_replicas=1,
    ),
    requests=resources,
    limits=resources,
)
def run_tfjob(config_path: str) -> str:
    
    return "hello world"


@dynamic
def run(num_workers: int, config_path: str) -> str:
    run_tfjob.task_config.num_workers = num_workers
    return run_tfjob(config_path=config_path)


@workflow
def ranker_cold_start_training(
    cold_start_config: str, num_workers: int, num_ps: int, num_dds_workers: int
) -> str:
    return run(num_workers=num_workers, config_path=cold_start_config)
I am running flytekit 1.27 if that makes a difference
seems to still run with just 1 worker
oh actually I take that back
I passed the wrong param in
it works 🎉 thanks!
h

Haytham Abuelfutuh

09/07/2023, 5:18 PM
Great to hear! Thank to Yee, there is a slightly cleaner version of this now available. It should be equivalent though...
Copy code
@task(
    task_config=TfJob(
        num_workers=1,
        num_ps_replicas=1,
        num_chief_replicas=1,
    ),
    requests=resources,
    limits=resources,
)
def run_tfjob(config_path: str) -> str:
    
    return "hello world"


@dynamic
def run(num_workers: int, config_path: str) -> str:
    return run_tfjob(config_path=config_path)
           .with_overrides(task_config=TfJob(
                           num_workers=num_workers,
                           num_ps_replicas=1,
                           num_chief_replicas=1,
                       ))


@workflow
def ranker_cold_start_training(
    cold_start_config: str, num_workers: int, num_ps: int, num_dds_workers: int
) -> str:
    return run(num_workers=num_workers, config_path=cold_start_config)
36 Views