microscopic-animal-17045
03/06/2025, 2:10 PMjolly-florist-66728
03/06/2025, 2:10 PMFlyteFile
or FlyteDirectory
types to handle files and directories in a way that abstracts the underlying storage system.
Would you like more detail on how to implement this approach?
Sources:
- https://github.com/flyteorg/flyte/issues/4965
- https://github.com/flyteorg/flytesnacks/blob/master/examples/data_types_and_io/data_types_and_io/folder.py 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.microscopic-animal-17045
03/06/2025, 2:13 PMmicroscopic-animal-17045
03/06/2025, 2:14 PMjolly-florist-66728
03/06/2025, 2:14 PMFlyteFile
and FlyteDirectory
types to download, process, and upload files. Here's a basic example of how you can achieve this:
1. Download Files from S3: Use a task to download files from S3 into a FlyteDirectory
.
2. Process Files: Perform operations on the files within the directory.
3. Upload Files to S3: Use another task to upload the modified files back to S3.
Here's a code example:
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
import os
import urllib.request
@task
def download_files_from_s3(s3_urls: list) -> FlyteDirectory:
local_dir = FlyteDirectory()
for url in s3_urls:
file_name = os.path.basename(url)
local_path = os.path.join(local_dir.path, file_name)
urllib.request.urlretrieve(url, local_path)
return local_dir
@task
def process_files(directory: FlyteDirectory) -> FlyteDirectory:
# Example processing: read, modify, and save files
for file_path in os.listdir(directory.path):
full_path = os.path.join(directory.path, file_path)
with open(full_path, 'r') as f:
data = f.read()
# Modify data
modified_data = data.upper() # Example modification
with open(full_path, 'w') as f:
f.write(modified_data)
return directory
@task
def upload_files_to_s3(directory: FlyteDirectory, s3_destination: str):
for file_path in os.listdir(directory.path):
full_path = os.path.join(directory.path, file_path)
# Upload each file to S3
# Use your preferred method to upload to S3, e.g., boto3
# Example: boto3.client('s3').upload_file(full_path, 'bucket-name', f"{s3_destination}/{file_path}")
@workflow
def s3_file_workflow(s3_urls: list, s3_destination: str):
local_dir = download_files_from_s3(s3_urls=s3_urls)
processed_dir = process_files(directory=local_dir)
upload_files_to_s3(directory=processed_dir, s3_destination=s3_destination)
# Example usage
if __name__ == "__main__":
s3_urls = [
"s3://my-bucket/file1.txt",
"s3://my-bucket/file2.txt"
]
s3_destination = "s3://my-bucket/processed"
s3_file_workflow(s3_urls=s3_urls, s3_destination=s3_destination)
This example demonstrates downloading files from S3, processing them, and uploading the results back to S3. You can replace the processing logic with your specific operations and use a library like boto3
for S3 interactions.
Sources:
- FlyteDirectory
- FlyteFile 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.