late-traffic-56555
09/07/2023, 2:58 PMtask_config
. For example, I have a TFJob
task defined as:
@task(task_config=TFJob(num_workers=1, num_ps=1))
def run(...):
...
But I would like num_workers
to be configurable. i.e. the workflow can have it passed as a param. Not sure how to propagate this to the taskhigh-park-82026
@dynamic
to "generate" tasks at runtime with any custom configurations you may want (you can even generate functions but that's a bit of an advanced topic 🙂 )
@task(task_config=TFJob(num_workers=1, num_ps=1))
def run(...):
...
@dynamic
def my_task(num_workers: int) -> ...:
run.task_config.num_workers = num_workers
return run(...)
@workflow
def my_wf() -> ...:
return my_task(...)
I believe something like this should do the trick...late-traffic-56555
09/07/2023, 4:02 PMlate-traffic-56555
09/07/2023, 4:02 PMlate-traffic-56555
09/07/2023, 4:14 PMlate-traffic-56555
09/07/2023, 4:14 PMlate-traffic-56555
09/07/2023, 4:14 PM@task(
task_config=TfJob(
num_workers=1,
num_ps_replicas=1,
num_chief_replicas=1,
),
requests=resources,
limits=resources,
)
def run_tfjob(config_path: str) -> str:
return "hello world"
@dynamic
def run(num_workers: int, config_path: str) -> str:
run_tfjob.task_config.num_workers = num_workers
return run_tfjob(config_path=config_path)
@workflow
def ranker_cold_start_training(
cold_start_config: str, num_workers: int, num_ps: int, num_dds_workers: int
) -> str:
return run(num_workers=num_workers, config_path=cold_start_config)
late-traffic-56555
09/07/2023, 4:14 PMlate-traffic-56555
09/07/2023, 4:15 PMlate-traffic-56555
09/07/2023, 4:29 PMlate-traffic-56555
09/07/2023, 4:30 PMlate-traffic-56555
09/07/2023, 4:30 PMhigh-park-82026
@task(
task_config=TfJob(
num_workers=1,
num_ps_replicas=1,
num_chief_replicas=1,
),
requests=resources,
limits=resources,
)
def run_tfjob(config_path: str) -> str:
return "hello world"
@dynamic
def run(num_workers: int, config_path: str) -> str:
return run_tfjob(config_path=config_path)
.with_overrides(task_config=TfJob(
num_workers=num_workers,
num_ps_replicas=1,
num_chief_replicas=1,
))
@workflow
def ranker_cold_start_training(
cold_start_config: str, num_workers: int, num_ps: int, num_dds_workers: int
) -> str:
return run(num_workers=num_workers, config_path=cold_start_config)