Hi all, just getting my hands dirty and trying to ...
# ask-the-community
p
Hi all, just getting my hands dirty and trying to wrap my head around FlyteFile/FlyteDirectory. From the ShellTask example I can see the output paths being generated dynamically based on current_context() etc. My question is: what is the best practice for referencing pre-existing data in the object store (minio in the local sandbox in this case) in tasks inputs? From the data management docs I see "primitive data types and references to large objects fall under Metadata - Meta input or Meta output, and the actual large object is known as Raw data", but I'm not entirely sure how to apply that. Any help is greatly appreciated!
k
This is actually a great question
Please help improve the docs - but I will try to write an answer when at thr keyboard
p
Always happy to give back! My workflows are almost exclusively file-based so once I've got my bearings I could put together an example/tutorial.
k
Yes so Flyte connects tasks using data flow. The data is persisted at every step. But if all data was equal, to pass data between tasks, thr engine would have to load everything, This has 2 problems 1. Security- engine will have access to all data and this may not be desirable for engine that processes different teams 2. Engine would become a throughput bottleneck Another limitation would be, caching and data reuse would need full copies. To make the data storage efficient, secure and fast. Flyte uses a concept like programming languages - stack based parameters (pass by value) and heap parameters (pass by reference/ pointers) Thus if you are passing integers etc these are simply passed from tasks to task. Limited to 10MBs But if you pass files, file like objects, dataframes and directories then the data is automatically persisted to s3 first time and henceforth a reference to this location is passed You need to ensure that the data will not be deleted in the back Flytefile, flytedirectory and structureddataset provide abstraction for this persistence
All these objects simply carry a reference to the remote file and if needed are either downloaded on demand or can be streamed
Downloading will need space on local file system (in the container) streaming can be faster but your program will have to work on the stream Of bytes Though we make streaming look like as if you are opening a file
Streaming is new, it’s only in 1.5 https://github.com/flyteorg/flytekit/pull/1512
Cc @Samhita Alla
p
Thanks @Ketan (kumare3)! That separation makes sense. I'm just doing some trial and error now trying to understand these different paths, but I'm thinking that FlyteFile might accept an os.path URI in minio storage as it's constructor argument..
k
yup
it does
if you look its a string uri - it can be anything
what flytekit does is use a thing called a protocol - like
http://, s3://, gs://, file:///
etc
and we 1.5 release, all protocols that fsspec supports are automatically supported
and flyte will transparently handle that data
p
Yes I gave that a few reads, that example is fetching from https whereas I'm trying to figure out how to pass it a path to the demo sandbox minio storage, but it sounds like it "just works"!
k
😄
ya if you use pyflyte run with a local file, it will automatically create a secure upload link, upload the file and run that
if you use the UI, you can actually upload a file
p
Just to wrap this up for any folks that may be curious in the future: I was able to upload a file ahead of time to the demo sandbox minio storage via the UI and reference it in a ShellTask like so:
Copy code
from flytekit import kwtypes, task, workflow
from flytekit.extras.tasks.shell import OutputLocation, ShellTask
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile

t1 = ShellTask(
    name="task_1",
    debug=True,
    script="""
    set -ex
    echo "Hey there! Let's run some bash scripts using Flyte's ShellTask."
    head {inputs.x}
    """,
    inputs=kwtypes(x=FlyteFile),
    output_locs=[OutputLocation(var="i", var_type=FlyteFile, location="{inputs.x}")],
)

@workflow
def shell_wf() -> FlyteFile:
    x = FlyteFile("<s3://my-s3-bucket/data/inputs/my_test.txt>")
    t1_out = t1(x=x)
    return t1_out


if __name__ == "__main__":
    print(f"Running wf() {wf()}")
I was also able to pass in handy local files during execution with wf arguments:
Copy code
@workflow
def shell_wf(x: FlyteFile) -> FlyteFile:
    t1_out = t1(x=x)
    return t1_out
pyflyte run --remote cat_test.py shell_wf --x ./my_test.txt
a
Hello @Ketan (kumare3), I am trying to understand how Flyte handles large data objects and found this thread. I still have some questions. Let say I need to transfer large pandas dataframe objects from one task to another. Based on this thread, what I understood is that flyte will store these raw data in s3 buckets and only pass around metadata that reference the data paths between tasks. So my questions will be: • Does that mean flyte will load dataframes from the specified data paths in each task before using them? • What if instead of passing large pandas dataframes, I pass in python class object with the dataframes as instance variables? PS: I am relatively new to both Flyte and Ops side of things like this. Thank you for your help.
k
You are right, if you use the dataframe it will reload it. If you want to control that behavior my recommendation would be to use structureddataset
a
I see. And if that's the case then I will have to create a specific s3 bucket for flyte when not using the sandbox environment. Right?
k
No you can use your own bucket. You can set the raw output data prefix and all data will end there
a
Thanks!
160 Views