<@U06RKAMK0UC> provide me with a production grade ...
# ask-ai
m
@ancient-wolf-19325 provide me with a production grade function for task of downloading s3 uris to a single flytedirectory in remote and returns the flytedirectory and filepath back to the workflow. Making sure, the directory path is created and exists during entire course of workflow.
j
To create a production-grade function for downloading S3 URIs to a
FlyteDirectory
and returning it, you can use Flyte's task and workflow capabilities. Here's a concise example:
Copy code
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
import boto3
import os
from pathlib import Path

@task
def download_s3_files_to_directory(s3_uris: list[str]) -> FlyteDirectory:
    # Create a local directory for storing downloaded files
    working_dir = flytekit.current_context().working_directory
    local_dir = Path(working_dir) / "s3_files"
    local_dir.mkdir(exist_ok=True)

    # Initialize S3 client
    s3_client = boto3.client('s3')

    # Download each file from S3
    for uri in s3_uris:
        bucket, key = uri.replace("s3://", "").split("/", 1)
        local_file_path = local_dir / os.path.basename(key)
        s3_client.download_file(bucket, key, str(local_file_path))

    return FlyteDirectory(path=str(local_dir))

@workflow
def s3_to_flytedirectory_workflow(s3_uris: list[str]) -> FlyteDirectory:
    return download_s3_files_to_directory(s3_uris=s3_uris)

# Example usage
if __name__ == "__main__":
    s3_uris = ["s3://my-bucket/file1.csv", "s3://my-bucket/file2.csv"]
    result_dir = s3_to_flytedirectory_workflow(s3_uris=s3_uris)
    print(f"Files downloaded to: {result_dir.path}")
This function uses the
boto3
library to download files from S3 and stores them in a
FlyteDirectory
. The directory is created and maintained throughout the workflow execution. Would you like more details on any specific part of this implementation? Sources: - FlyteDirectory - FlyteKit Task and Workflow 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 instead of working_dir = flytekit.current_context().working_directory . Can we leverage from_source and new class methods, lets create a folder input_files and download everything there, so that it is not just part of some sandbox and can be accessed by other tasks too for modification
j
It seems likely that you can use the
FlyteDirectory.from_source
and
FlyteDirectory.new
class methods to create a directory that is accessible across tasks. This approach allows you to specify a directory path explicitly, ensuring it is not confined to a temporary sandbox. Here's how you can modify the function to use these methods:
Copy code
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
import boto3
import os
from pathlib import Path

@task
def download_s3_files_to_directory(s3_uris: list[str]) -> FlyteDirectory:
    # Create a FlyteDirectory for storing downloaded files
    local_dir = FlyteDirectory.new("input_files")

    # Initialize S3 client
    s3_client = boto3.client('s3')

    # Download each file from S3
    for uri in s3_uris:
        bucket, key = uri.replace("s3://", "").split("/", 1)
        local_file_path = Path(local_dir.path) / os.path.basename(key)
        s3_client.download_file(bucket, key, str(local_file_path))

    return local_dir

@workflow
def s3_to_flytedirectory_workflow(s3_uris: list[str]) -> FlyteDirectory:
    return download_s3_files_to_directory(s3_uris=s3_uris)

# Example usage
if __name__ == "__main__":
    s3_uris = ["s3://my-bucket/file1.csv", "s3://my-bucket/file2.csv"]
    result_dir = s3_to_flytedirectory_workflow(s3_uris=s3_uris)
    print(f"Files downloaded to: {result_dir.path}")
This code uses
FlyteDirectory.new
to create a directory named "input_files" in the current Flyte working directory, ensuring it is accessible for other tasks. Would you like more details on this approach? Sources: - FlyteDirectory - FlyteKit Task and Workflow 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 lets continue on same example by calling another task2 which will modify both the json files by appending filepath of one file to another as key:value pair and calling task3 to upload this local_dir recursively back to provided output_s3_uri path
j
To extend the example by adding a second task that modifies the JSON files and a third task to upload the modified directory back to an S3 path, you can follow this approach:
Copy code
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
import boto3
import os
import json
from pathlib import Path

@task
def download_s3_files_to_directory(s3_uris: list[str]) -> FlyteDirectory:
    # Create a FlyteDirectory for storing downloaded files
    local_dir = FlyteDirectory.new("input_files")

    # Initialize S3 client
    s3_client = boto3.client('s3')

    # Download each file from S3
    for uri in s3_uris:
        bucket, key = uri.replace("s3://", "").split("/", 1)
        local_file_path = Path(local_dir.path) / os.path.basename(key)
        s3_client.download_file(bucket, key, str(local_file_path))

    return local_dir

@task
def modify_json_files(directory: FlyteDirectory) -> FlyteDirectory:
    # List all JSON files in the directory
    json_files = list(Path(directory.path).glob("*.json"))
    
    if len(json_files) < 2:
        raise ValueError("At least two JSON files are required for modification.")

    # Read the first file and append the path of the second file
    with open(json_files[0], 'r+') as file1, open(json_files[1], 'r') as file2:
        data1 = json.load(file1)
        data2 = json.load(file2)
        data1['file_path'] = str(json_files[1])
        file1.seek(0)
        json.dump(data1, file1, indent=4)
        file1.truncate()

    return directory

@task
def upload_directory_to_s3(directory: FlyteDirectory, output_s3_uri: str):
    # Initialize S3 client
    s3_client = boto3.client('s3')

    # Upload each file in the directory to the specified S3 path
    for file_path in Path(directory.path).glob("**/*"):
        if file_path.is_file():
            bucket, key_prefix = output_s3_uri.replace("s3://", "").split("/", 1)
            key = f"{key_prefix}/{file_path.relative_to(directory.path)}"
            s3_client.upload_file(str(file_path), bucket, key)

@workflow
def s3_to_flytedirectory_workflow(s3_uris: list[str], output_s3_uri: str) -> FlyteDirectory:
    downloaded_dir = download_s3_files_to_directory(s3_uris=s3_uris)
    modified_dir = modify_json_files(directory=downloaded_dir)
    upload_directory_to_s3(directory=modified_dir, output_s3_uri=output_s3_uri)
    return modified_dir

# Example usage
if __name__ == "__main__":
    s3_uris = ["s3://my-bucket/file1.json", "s3://my-bucket/file2.json"]
    output_s3_uri = "s3://my-bucket/modified_files"
    result_dir = s3_to_flytedirectory_workflow(s3_uris=s3_uris, output_s3_uri=output_s3_uri)
    print(f"Modified files are in: {result_dir.path}")
Explanation: 1. Task 1 (
download_s3_files_to_directory
)
: Downloads files from S3 to a local directory. 2. Task 2 (
modify_json_files
)
: Modifies the JSON files by appending the file path of one file to another as a key-value pair. 3. Task 3 (
upload_directory_to_s3
)
: Uploads the modified directory back to the specified S3 path. This workflow ensures that the directory is created and maintained throughout the workflow execution, and the modifications are uploaded back to S3. If you need further customization or details, feel free to ask! 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
By the way, I found these links that might be relevant to what you're looking for. Let me know if this helps: - [FlyteDirectory Documentation](https://docs.flyte.org/en/latest/user_guide/data_types_and_io/flytedirectory.html) - [Dataclass Documentation](https://docs.flyte.org/en/latest/user_guide/data_types_and_io/dataclass.html) - [Extending Flyte](https://docs.flyte.org/en/latest/user_guide/extending/index.html)
m
@ancient-wolf-19325
j
Could you please provide more details or clarify your question? This will help me assist you better. Sources: - https://github.com/flyteorg/flytekit/blob/master/flytekit/types/directory/types.py - https://github.com/flyteorg/flyte/blob/master/docs/user_guide/data_types_and_io/flytedirectory.md 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 Minor change there 3 json file and 1 pdb file, they all have different names but have few fix keywords in their name. For example file1_abc.json, file2_efg.json, file3_hik.json, file4.pdb. In modify json file method, I only want to modify file1_abc.json with filepath of file4.pdb
j
To modify only the specific JSON file (
file1_abc.json
) by appending the file path of the PDB file (
file4.pdb
), you can adjust the
modify_json_files
task to target the JSON file with a specific keyword in its name. Here's how you can do it:
Copy code
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
import boto3
import os
import json
from pathlib import Path

@task
def download_s3_files_to_directory(s3_uris: list[str]) -> FlyteDirectory:
    # Create a FlyteDirectory for storing downloaded files
    local_dir = FlyteDirectory.new("input_files")

    # Initialize S3 client
    s3_client = boto3.client('s3')

    # Download each file from S3
    for uri in s3_uris:
        bucket, key = uri.replace("s3://", "").split("/", 1)
        local_file_path = Path(local_dir.path) / os.path.basename(key)
        s3_client.download_file(bucket, key, str(local_file_path))

    return local_dir

@task
def modify_json_file(directory: FlyteDirectory) -> FlyteDirectory:
    # Identify the specific JSON and PDB files
    json_file = next(Path(directory.path).glob("*_abc.json"), None)
    pdb_file = next(Path(directory.path).glob("*.pdb"), None)

    if json_file and pdb_file:
        # Modify the JSON file by appending the PDB file path
        with open(json_file, 'r+') as file:
            data = json.load(file)
            data['pdb_file_path'] = str(pdb_file)
            file.seek(0)
            json.dump(data, file, indent=4)
            file.truncate()

    return directory

@task
def upload_directory_to_s3(directory: FlyteDirectory, output_s3_uri: str):
    # Initialize S3 client
    s3_client = boto3.client('s3')

    # Upload each file in the directory to the specified S3 path
    for file_path in Path(directory.path).glob("**/*"):
        if file_path.is_file():
            bucket, key_prefix = output_s3_uri.replace("s3://", "").split("/", 1)
            key = f"{key_prefix}/{file_path.relative_to(directory.path)}"
            s3_client.upload_file(str(file_path), bucket, key)

@workflow
def s3_to_flytedirectory_workflow(s3_uris: list[str], output_s3_uri: str) -> FlyteDirectory:
    downloaded_dir = download_s3_files_to_directory(s3_uris=s3_uris)
    modified_dir = modify_json_file(directory=downloaded_dir)
    upload_directory_to_s3(directory=modified_dir, output_s3_uri=output_s3_uri)
    return modified_dir

# Example usage
if __name__ == "__main__":
    s3_uris = ["s3://my-bucket/file1_abc.json", "s3://my-bucket/file2_efg.json", "s3://my-bucket/file3_hik.json", "s3://my-bucket/file4.pdb"]
    output_s3_uri = "s3://my-bucket/modified_files"
    result_dir = s3_to_flytedirectory_workflow(s3_uris=s3_uris, output_s3_uri=output_s3_uri)
    print(f"Modified files are in: {result_dir.path}")
Explanation: - Targeted File Selection: The
modify_json_file
task uses
Path.glob
to find the JSON file with
_abc.json
in its name and the PDB file. - Modification: Only the identified JSON file is modified by appending the path of the PDB file. This approach ensures that only the desired JSON file is modified. Let me know if you need further assistance! Sources: - https://github.com/flyteorg/flytekit/blob/master/tests/flytekit/unit/core/test_flyte_file.py - https://github.com/flyteorg/flyte/pull/5607 Note: I wasn’t able to find highly relevant data sources, but above are a few potentially relevant links. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 How can i view aggregated logs for entire workflow? if I add logs or print statement
j
To view aggregated logs for an entire workflow in Flyte, you can utilize the Flyte Console, which provides a comprehensive view of task and workflow executions, including logs. Here's a brief overview of how you can access these logs: 1. Flyte Console: After executing a workflow, you can navigate to the Flyte Console, which is the web interface for managing Flyte entities. The console provides a detailed view of each workflow execution, including logs for each task. You can access the console by following the URL provided after running a workflow, typically in the format
http://<flyte-console-url>/console/projects/<project-name>/domains/<domain-name>/executions/<execution-id>
. 2. Task Logs: Within the Flyte Console, you can click on individual tasks to view their logs. The logs will include any print statements or logging output you have added to your tasks. This is useful for debugging and understanding the execution flow. 3. Log Aggregation: If you have configured a log aggregation service (e.g., AWS CloudWatch, GCP Stackdriver), you can view aggregated logs across multiple executions. This requires setting up the appropriate logging configuration in Flyte to direct logs to your chosen log aggregator. Would you like more details on configuring log aggregation or accessing specific logs in the Flyte Console? Sources: - Flyte Console Documentation - Configuring Logging Links in the UI 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 modify_file_json returned FileNotFound error even though downloaded_dir has the files downloaded to /tmp/flyte-_12jomy/sandbox/localflytekit/f2066.../input_files/file4.pdb, /tmp/flyte-12jomy/sandbox/localflytekit/f2066.../input_files/file1_abc.json . When I performed FlyteDirectory.li_stdir(directory) it shows me e.path has 4 objects with paths /tmp/flytesfmctvh5/local_flytekit/1691aa1
j
It seems likely that the
FileNotFoundError
is occurring because the
modify_json_file
task is not correctly identifying the file paths within the
FlyteDirectory
. This can happen if the directory structure or file paths are not as expected. Here are a few steps to troubleshoot and resolve the issue: 1. Verify File Paths: Ensure that the file paths are correctly constructed when accessing files within the
FlyteDirectory
. You can use
Path.glob
to list files and verify their existence. 2. Check Directory Structure: Use
FlyteDirectory.listdir()
to confirm the directory structure and ensure that the files are located where you expect them to be. 3. Debugging: Add print statements or logging to output the paths being accessed. This can help identify any discrepancies in the expected vs. actual file paths. Here's an updated version of the
modify_json_file
task with additional debugging:
Copy code
@task
def modify_json_file(directory: FlyteDirectory) -> FlyteDirectory:
    # List all files in the directory for debugging
    all_files = list(Path(directory.path).glob("**/*"))
    print(f"All files in directory: {[str(f) for f in all_files]}")

    # Identify the specific JSON and PDB files
    json_file = next(Path(directory.path).glob("*_abc.json"), None)
    pdb_file = next(Path(directory.path).glob("*.pdb"), None)

    if json_file and pdb_file:
        print(f"Modifying JSON file: {json_file} with PDB file path: {pdb_file}")
        # Modify the JSON file by appending the PDB file path
        with open(json_file, 'r+') as file:
            data = json.load(file)
            data['pdb_file_path'] = str(pdb_file)
            file.seek(0)
            json.dump(data, file, indent=4)
            file.truncate()
    else:
        raise FileNotFoundError("Required files not found in the directory.")

    return directory
This code includes print statements to list all files in the directory and confirm the paths of the JSON and PDB files before attempting to modify them. Would you like more details on any specific part of this process? Sources: - https://github.com/flyteorg/flyte/issues/6005 - https://github.com/flyteorg/flyte/issues/4700 👍 0 button 👎 1 button 💡 Hint: Mention @ancient-wolf-19325 in the thread for followups.
m
@ancient-wolf-19325 Path lib is useless it only returns empty, i used all_files=FlyeDirectory.listdir(directory) when I iterate and print f.remote_source I see my file_paths all_files = FlyteDirectory.listdir(directory) for file in all_files: print(file.remote_source) #prints s3://flyte-metadata/data/1w/.../file4.pdb since its remote source path its still useless, I think from_source might give me local path
j
To work with files in a
FlyteDirectory
and access their local paths, you can use the
from_source
method to create a
FlyteDirectory
object from a remote source, which will handle downloading the files locally. Here's how you can modify your approach: 1. Use
from_source
: This method will create a
FlyteDirectory
object from a remote source, ensuring that the files are downloaded locally and accessible. 2. Access Local Paths: Once the files are downloaded, you can access their local paths using the
path
attribute of the
FlyteDirectory
or
FlyteFile
objects. Here's an example of how you can achieve this:
Copy code
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
import json
from pathlib import Path

@task
def modify_json_file(directory: FlyteDirectory) -> FlyteDirectory:
    # Use FlyteDirectory.from_source to ensure files are downloaded locally
    local_dir = FlyteDirectory.from_source(directory.remote_source)

    # List all files in the directory
    all_files = FlyteDirectory.listdir(local_dir)
    print(f"All files in directory: {[str(f.path) for f in all_files]}")

    # Identify the specific JSON and PDB files
    json_file = next((f for f in all_files if "_abc.json" in f.path), None)
    pdb_file = next((f for f in all_files if f.path.endswith(".pdb")), None)

    if json_file and pdb_file:
        print(f"Modifying JSON file: {json_file.path} with PDB file path: {pdb_file.path}")
        # Modify the JSON file by appending the PDB file path
        with open(json_file.path, 'r+') as file:
            data = json.load(file)
            data['pdb_file_path'] = pdb_file.path
            file.seek(0)
            json.dump(data, file, indent=4)
            file.truncate()
    else:
        raise FileNotFoundError("Required files not found in the directory.")

    return local_dir

@workflow
def s3_to_flytedirectory_workflow(s3_uris: list[str], output_s3_uri: str) -> FlyteDirectory:
    downloaded_dir = FlyteDirectory.from_source(s3_uris[0])  # Assuming the first URI is the directory
    modified_dir = modify_json_file(directory=downloaded_dir)
    # Upload logic here...
    return modified_dir

# Example usage
if __name__ == "__main__":
    s3_uris = ["s3://my-bucket/input_files"]
    output_s3_uri = "s3://my-bucket/modified_files"
    result_dir = s3_to_flytedirectory_workflow(s3_uris=s3_uris, output_s3_uri=output_s3_uri)
    print(f"Modified files are in: {result_dir.path}")
Explanation: -
FlyteDirectory.from_source
: This method is used to create a
FlyteDirectory
object from a remote source, ensuring that files are downloaded locally. - Accessing Local Paths: The
path
attribute of each file object provides the local path, which can be used for file operations. This approach should help you work with local file paths effectively. Let me know if you need further assistance! Sources: - FlyteKit Task and Workflow 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 f.path is still not printing complete filepath with filename. Is there any other attribute I should look into.
j
If
f.path
is not providing the complete file path with the filename, you might want to ensure that you are accessing the correct attribute of the
FlyteFile
or
FlyteDirectory
object. Here are a few things to check: 1. Ensure Correct Object Type: Make sure that
f
is indeed a
FlyteFile
object. The
path
attribute should give you the full local path of the file. 2. Check
os.fspath()
: You can use
os.fspath(f)
to get the full path of the file. This is a standard way to retrieve the file system path from an object that implements the
os.PathLike
interface. 3. Debugging: Add print statements to verify the type of
f
and its attributes. This can help ensure that you are working with the expected object. Here's an example of how you can use `os.fspath()`:
Copy code
import os
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
import json

@task
def modify_json_file(directory: FlyteDirectory) -> FlyteDirectory:
    # List all files in the directory
    all_files = FlyteDirectory.listdir(directory)
    for f in all_files:
        print(f"File path: {os.fspath(f)}")  # Use os.fspath to get the full path

    # Identify the specific JSON and PDB files
    json_file = next((f for f in all_files if "_abc.json" in os.fspath(f)), None)
    pdb_file = next((f for f in all_files if os.fspath(f).endswith(".pdb")), None)

    if json_file and pdb_file:
        print(f"Modifying JSON file: {os.fspath(json_file)} with PDB file path: {os.fspath(pdb_file)}")
        # Modify the JSON file by appending the PDB file path
        with open(os.fspath(json_file), 'r+') as file:
            data = json.load(file)
            data['pdb_file_path'] = os.fspath(pdb_file)
            file.seek(0)
            json.dump(data, file, indent=4)
            file.truncate()
    else:
        raise FileNotFoundError("Required files not found in the directory.")

    return directory

# Example usage
if __name__ == "__main__":
    # Assuming you have a FlyteDirectory object
    directory = FlyteDirectory.from_source("s3://my-bucket/input_files")
    modify_json_file(directory=directory)
This code uses
os.fspath()
to ensure you get the full file path. Let me know if you need further assistance! Sources: - https://github.com/flyteorg/flyte/issues/2806 - https://github.com/flyteorg/flyte/issues/6148 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.