Hi everyone! I want to use Flyte for my use case, ...
# announcements
n
Hi everyone! I want to use Flyte for my use case, which is a pipeline of tasks that run on GPU with torch tensors on GPU. Can Flyte pass GPU tensors between tasks in a workflow? Can this be done without copies between the tasks? How can I define concurrency of tasks? I saw this is possible but no documentation on how to do it. Thank you so much in advance!
s
Hi @Nimrod Rak thanks for your questions. To answer the specific question regarding GPU tensors with tasks, you can watch this specific video:

https://www.youtube.com/watch?v=M0KkYr-JJ-I

for reference on GPU accelerated torch models on Flyte. For concurrency, you can refer to this document: https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.map_task.html
n
Thank you for the quick response @Shivay Lamba! I didn’t mean using a map pattern, but rather have tasks in a workflow run on more than one process concurrently. Regarding GPU acceleration, I don’t have a problem training/running the model on GPU but rather having the workflow itself work on GPU.
s
Regarding the first part regarding tasks in a workflow run on more than one process concurrently, if @Samhita Alla or @Kevin Su can probably answer this. Regarding the GPU acceleration and running the workflows on GPU, you can configure your kubernetes cluster (where you have deployed Flyte) to support GPU nodes.
k
yes, you can specify gpu in your task, like
Copy code
@task(
    requests=Resources(
        gpu=gpu, mem=mem, storage=storage, ephemeral_storage=ephemeral_storage
    ),
)
def pytorch_mnist_task():
   ...
We have a
torch.tensor
transformer in flytekit, which means you can directly return a
torch.tensor
in your task. Flyte will help your save this data in s3, and the downstream task can fetch data from the bucket.
Copy code
@task
def t1() -> torch.Tensor:
    return torch.tensor([[1.0, -1.0, 2], [1.0, -1.0, 9], [0, 7.0, 3]])

@task
def t2(df: torch.Tensor)
    ...

@workflow
def wf():
   df = t1()
   t2(df=df)
n
Very cool @Kevin Su! and this doesn’t involve copies between tasks, correct?
k
yes, exactly. you don’t need to handle that by yourself. flyte will help you copy the data.
n
will it copy the data though? I don’t want it to…
k
For now, flyte will copy all the task’s output to s3 for caching
s
@Nimrod Rak, yes, you can pass GPU tensors between tasks and there’s also automatic GPU to CPU conversion with Flyte. Refer to https://docs.flyte.org/projects/cookbook/en/latest/auto/core/type_system/pytorch_types.html#auto-gpu-to-cpu-cpu-to-gpu-conversion. May I know why you do not want the data to be copied? The parallelization of tasks is automatically determined by Flyte, depending on the data dependencies amongst the tasks. You needn’t worry about the concurrency.
n
Great to hear about the parallelization, so there is a dispatcher that determines what receives what resources etc? I don’t want copies because all I need is a pipeline to connect units that already run on GPU and the copies are just overhead.
s
Great to hear about the parallelization, so there is a dispatcher that determines what receives what resources etc?
Yes! Refer to https://docs.flyte.org/en/latest/deployment/cluster_config/performance.html#worst-case-workflows-poison-pills-max-parallelism for an in-depth view of workers to run the flyte code.
I don’t want copies because all I need is a pipeline to connect units that already run on GPU and the copies are just overhead.
Copying is inevitable cause the data would be uploaded and downloaded in case of tasks’ communication. If there’s no copy available, you wouldn’t be able to access the tensor in your other task where it’s an input.
n
I see… I’ll check if it’s relevant for me. Thank you very much!
k
@Nimrod Rak I see your original question was not answered. By default- tensors are passed as remote objects. Flyte checkpoints every step and hence will copy the tensors to a remote location like s3. The step will download it - 😢 A solution to avoid copying but get a better load time would be to use - intratask checkpointing. But it's not a workflow, so intermediates are not persisted, checkpoints are But in case of failure, checkpoints are restored
@Nimrod Rak there are some folks trying to integrate Flyte with an in memory cache and co-scheduler cc @Tao He
157 Views