acoustic-carpenter-78188
03/24/2023, 4:28 AMlist transformer and then recursively find a pickle transformer. Also due the default pickle transport is a File, a list of even simple objects can result in 100's of files and since they are asynchronously written can cause a huge performance penalty.
Example
import attr
import flytekit
## Worflow below is slow
@attr.define
class Foo:
x: int
@flytekit.task
def t1_slow() -> list[Foo]:
return [Foo(i) for i in range(2000)]
@flytekit.task
def t2_slow(foos: list[Foo]) -> list[int]:
return [foo.x for foo in foos]
@flytekit.workflow
def wf_slow() -> list[int]:
foos = t1_slow()
return t2_slow(foos=foos)
## Worflow below is fast
@attr.define
class Container:
foos: list[Foo]
@flytekit.task
def t1_fast() -> Container:
return Container([Foo(i) for i in range(2000)])
@flytekit.task
def t2_fast(container: Container) -> list[int]:
return [foo.x for foo in container.foos]
@flytekit.workflow
def wf_fast() -> list[int]:
container = t1_fast()
return t2_fast(container=container)
For context, wf_slow can be around 30x slower in some random cases.
Goal: What should the final outcome look like, ideally?
In both the cases the performance should be close to possible. For pickle objects in general that are small, we could use a binary transport?
Describe alternatives you've considered
NA
Propose: Link/Inline OR Additional context
In pickle transformer we can add the following snippet
https://github.com/flyteorg/flytekit/blob/de2878903b594069f1629d7ee11bf59ab04b59f2/flytekit/types/pickle/pickle.py#L51
if lv.scalar.blob:
... # continue with current logic
elif lv.scalar.binary:
... # load bytearray
and while to_literal
x = cloudpickle.dumps()
if len(x) < threshold:
lv.scalar.binary = x
else
continue
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyte