Victor Gustavo da Silva Oliveira
05/02/2023, 2:38 PMNicholas Roberson
05/02/2023, 2:40 PM@task(task_config=...)
?Dan Rammer (hamersaw)
05/02/2023, 2:41 PMVictor Gustavo da Silva Oliveira
05/02/2023, 2:45 PMDan Rammer (hamersaw)
05/02/2023, 2:50 PMVictor Gustavo da Silva Oliveira
05/02/2023, 2:54 PMKetan (kumare3)
05/02/2023, 2:56 PMDan Rammer (hamersaw)
05/02/2023, 2:57 PMKetan (kumare3)
05/02/2023, 2:57 PMVictor Gustavo da Silva Oliveira
05/02/2023, 2:57 PMDan Rammer (hamersaw)
05/02/2023, 3:02 PMVictor Gustavo da Silva Oliveira
05/02/2023, 3:04 PM@workflow
def image_workflow(inputs..) -> None:
skus = get_sku_list(...)
models = load_model()
prepared = prepare_map_inputs(...)
result = map_task(
process_images,
metadata=TaskMetadata(retries=1),
concurrency=2
)(input=prepared)
@task(requests=Resources(mem='2Gi'))
def prepare_map_inputs(
skus: List[str],
...
) -> List[MapInput]:
@task(requests=Resources(cpu='2', mem='6Gi'))
def process_images(
input: MapInput
) -> bool:
This is a snippet, we had to ommit some info...Dan Rammer (hamersaw)
05/02/2023, 3:09 PMtask_resources
configuration option (sets default resource request) or any matchable attributes for example task level or workflow level, etcVictor Gustavo da Silva Oliveira
05/02/2023, 4:06 PMDan Rammer (hamersaw)
05/02/2023, 4:13 PM@task
?Victor Gustavo da Silva Oliveira
05/02/2023, 4:22 PMKevin Su
05/02/2023, 6:49 PMclass GcloudFunctionTask(PythonFunctionTask[gcloud]):
def __init__(self, task_config: gcloud, task_function: Callable, **kwargs):
super(PysparkFunctionTask, self).__init__(
task_config=task_config,
task_type="gcloud",
task_function=task_function,
**kwargs, # resource limit is in it
)
Victor Gustavo da Silva Oliveira
05/02/2023, 6:55 PMdef extended_task(
function: Callable = None, *, integrations: List[str] = None, **task_kwargs
) -> Callable:
if function is None:
return functools.partial(
extended_task, integrations=integrations, **task_kwargs
)
if integrations is not None and not isinstance(integrations, list):
raise ValueError("'integrations' parameter must be a list.")
task_integrations = TaskIntegrations(integrations)
if task_integrations.has_requests:
task_kwargs["task_config"] = generate_task_config(task_integrations)
@task(**task_kwargs)
@functools.wraps(function)
def wrapper(*args, **kwargs) -> Any:
...
It's like thisDan Rammer (hamersaw)
05/02/2023, 6:56 PMVictor Gustavo da Silva Oliveira
05/02/2023, 7:01 PMDan Rammer (hamersaw)
05/02/2023, 7:04 PM...n3-0-0
and n3-0-1
are the maptask Pods right? skus
is n0
, models
is n1
, and prepared
is n2
from your example.Victor Gustavo da Silva Oliveira
05/02/2023, 7:24 PMNicholas LoFaso
05/03/2023, 3:56 PMKetan (kumare3)
05/03/2023, 3:58 PMNicholas LoFaso
05/03/2023, 3:59 PMKetan (kumare3)
05/03/2023, 9:03 PMKevin Su
05/03/2023, 9:16 PMfrom typing import List, Optional
from flytekit import map_task, task, workflow, Resources, TaskMetadata
@task(requests=Resources(cpu="1", mem="800Mi"))
def my_task(should_succeed: bool) -> Optional[bool]:
if should_succeed:
return should_succeed
raise ValueError("This is a failure")
@task(requests=Resources(cpu="1", mem="800Mi"))
def get_num_successes_task(my_task_results: List[Optional[bool]]) -> int:
return len(my_task_results)
@workflow
def wf(inputs: List[bool] = [False, True, False]) -> int:
my_task_results = map_task(
my_task,
metadata=TaskMetadata(retries=1),
min_success_ratio=0.5,
concurrency=2,
)(should_succeed=inputs)
num_successes = get_num_successes_task(my_task_results=my_task_results)
return num_successes
Victor Gustavo da Silva Oliveira
05/04/2023, 5:11 PMDan Rammer (hamersaw)
05/11/2023, 1:27 PMVictor Gustavo da Silva Oliveira
05/11/2023, 1:54 PMDan Rammer (hamersaw)
05/11/2023, 1:55 PM