It should have raised an error, tuples are not sup...
# ask-the-community
k
It should have raised an error, tuples are not supported
h
hm, what's the best way to deal with this? Or whats the best practice - JSON and pd DataFrames?
k
Return dataclass or dataframe
Pandas, polars all are supported
And you can return multiple values, just triple type on its own is not today
h
Ok will do - maybe a separate question in your experience is polars or dask dataframes preferable for processing larger data
k
Depends - we like polars. Distributed not needed and Flyte can give you a large node
Check polars Flyte plugin or modin/dask also
But please install the plugins
h
hm, getting this error with `pl.DataFrame`:
Copy code
╭─────────────────────────────────────── Traceback (most recent call last) ───────────────────────────────────────╮
│ in <module>:3                                                                                                   │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/task.py:280 in task      │
│                                                                                                                 │
│ ❱ 280 │   │   return wrapper(_task_function)                                                                    │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/task.py:260 in wrapper   │
│                                                                                                                 │
│ ❱ 260 │   │   task_instance = TaskPlugins.find_pythontask_plugin(type(task_config))(                            │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/tracker.py:77 in         │
│ __call__                                                                                                        │
│                                                                                                                 │
│ ❱  77 │   │   o = super(InstanceTrackingMeta, cls).__call__(*args, **kwargs)                                    │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/python_function_task.py: │
│ 121 in __init__                                                                                                 │
│                                                                                                                 │
│ ❱ 121 │   │   super().__init__(                                                                                 │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/python_auto_container.py │
│ :85 in __init__                                                                                                 │
│                                                                                                                 │
│ ❱  85 │   │   super().__init__(                                                                                 │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/base_task.py:432 in      │
│ __init__                                                                                                        │
│                                                                                                                 │
│ ❱ 432 │   │   │   interface=transform_interface_to_typed_interface(interface),                                  │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/interface.py:247 in      │
│ transform_interface_to_typed_interface                                                                          │
│                                                                                                                 │
│ ❱ 247 │   inputs_map = transform_variable_map(interface.inputs, input_descriptions)                             │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/interface.py:345 in      │
│ transform_variable_map                                                                                          │
│                                                                                                                 │
│ ❱ 345 │   │   │   res[k] = transform_type(v, descriptions.get(k, k))                                            │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/interface.py:350 in      │
│ transform_type                                                                                                  │
│                                                                                                                 │
│ ❱ 350 │   return _interface_models.Variable(type=TypeEngine.to_literal_type(x), description=de                  │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/type_engine.py:857 in    │
│ to_literal_type                                                                                                 │
│                                                                                                                 │
│ ❱  857 │   │   res = transformer.get_literal_type(python_type)                                                  │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/types/file/file.py:322 in     │
│ get_literal_type                                                                                                │
│                                                                                                                 │
│ ❱ 322 │   │   return LiteralType(blob=self._blob_type(format=FlyteFilePathTransformer.get_form                  │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/types/file/file.py:306 in     │
│ get_format                                                                                                      │
│                                                                                                                 │
│ ❱ 306 │   │   return typing.cast(FlyteFile, t).extension()                                                      │
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
AttributeError: type object 'Path' has no attribute 'extension'
Have not installed the plugins but not sure if that is the cause of this
I am just reading about how
pathlib
is not supported in Flyte, so I changed
file_path: Path
to
file: FlyteFile
or
files: List[FlyteFile]
But these are promise objects- How do I iterate through Promise objects?
j
where are you iterating over those objects, is it in the
@workflow
h
@Jay Ganbat yes:
Copy code
@task
def parse_metadata_xml(file: FlyteFile) -> pl.DataFrame:
    # Parse the XML file
    tree = ET.parse(file)
    root = tree.getroot()

    # Access elements and attributes
    result = {}
    for child in root:
        for subchild in child:
            key = subchild.tag
            val = subchild.text
            result.update({key: val})
    return pl.from_dict(result)

@workflow
def ingest_raw_data(files: List[FlyteFile]) -> pl.DataFrame:
    data = []
    for file in files:
        if file.suffix == MetadataType.DIZBD.value:
            data.append(parse_metadata_dizbd(file))
        elif file.suffix == MetadataType.XML.value:
            data.append(parse_metadata_xml(file))
    return pl.concat(data, how="diagonal")
Although if there is a better practice to refactor this I would love to know how
j
yeah you cant do logical operations in workflow
i would suggest create a dynamic task that does that
Copy code
@dynamic
def dynamic_ingest_raw_data()
   ...

@workflow
def ingest_raw_data()
   dynamic_ingest_raw_data()
so during registration time, everything is Promise and not realized with actual input
h
Ah ok so the dynamic can take in the
List[FlyteFile]
? Or is it
FlyteDirectory
better
j
hmm if you know what files yo will be expecting then list of flytefile is probably better otherwise it might try to pull down everything in that folder if its remote
h
Okay - just wondering - I can add a local path as
FlyteDirectory
as an input to some function, is there a way to get I can get that local path back?
Copy code
def ingest(data_dir: FlyteDirectory):
    for flytepath, filename in data_dir.crawl():
        ... # can I get `path/to/some/data` here somehow?

        
ingest("path/to/some/data")
j
im a bit confused, if its local dir then Flytedirectory should work like a regular directory right so should return the path
h
Odd, I dont have it open anymore but last I checked
flytepath
returned something
/var/../..
- so technically
flytepath
or
data_dir.path
should return the path?
j
yeah it should
h
Ok thanks Jay - Oh, how should one pass a list of files in
pyflyte run ..
, with argparse? or is there a better way
k
Cc @Peeter Piegaze
h
So I am currently giving the directory as an input:
Copy code
@dynamic
def parse_directory(
    directory: FlyteDirectory, metadata_types: List[str]
) -> pl.DataFrame:
    data = []
    for metadata_type in metadata_types:
        for var_path, file in directory.crawl():
            extension = Path(file).suffix[1:]
            if extension == metadata_type:
                df = parse_metadata_file(
                    file=Path(var_path) / file, metadata_type=metadata_type
                )
                data.append(df)
    return pl.concat(data, how="diagonal")
And running this
pyflyte run extract.py parse_directory --directory data/XML/bla --metadata_types '["xml"]'
gave this error:
Copy code
Currently only directories containing one file are supported, found [100] files found in data/XML/bla
Assuming passing a list of files is a workaround, e.g. ideally
pyflyte run extract.py parse_directory --files '[f for f in dir.glob(*)]' ...
but I am not sure how to do this
s
@Hud, you should be able to send FlyteDirectory to a workflow as a string, enclosed in quotes. Can you try that and let us know if you're still seeing the error?
h
@Samhita Alla adding quotes to
"data/XML/bla"
gave the same error.
s
Oh I don't think you can call dynamic workflows directly using
pyflyte run
. Can you add a workflow and call the dynamic workflow from within it? Please pass the name of the workflow to the
pyflyte run
command.
h
So not sure if this is what you meant but I have added a workflow that contains dynamic functions
parse_metadata_files
and
parse_raw_objects
. Since I figured workflows cant contain for loops
Copy code
@workflow
def ingest_text_wf(
    data_dir: FlyteDirectory
) -> pl.DataFrame:
    # dynamic
    d = parse_metadata_files( 
        directory=data_dir
    )
    # dynamic
    d2 = parse_raw_objects(
        directory=data_dir
    )
    return d.join(d2, on="ID", how="outer")
this still gives me the same error with
pyflyte run
. But also, if I run this with argument parser with
python extract.py --data_dir data/XML/bla
it results in`AttributeError: 'Promise' object has no attribute 'join'`
s
Which flytekit version have you installed?
h
1.9.1
s
Looks like a bug to me. @Eduardo Apolinario (eapolinario) / @Kevin Su, could you confirm?
y
@Hud this won’t work right? This is something we need to address more cleanly as it’s come up multiple times, but the workflow only looks and kinda acts like python - it’s not actually python. these are indeed promise objects that get passed around, so you don’t be able to
join
on them
h
@Yee yes I'm grappling with the concept slowly, which is why I started with dynamic instead of a workflow - that gave me the 1 directory 1 object limit error - how to remove this limit?
y
can you use a list?
sorry just hopping on here, could you summarize the issue for me please?
h
@Yee So I have a folder with metadata and raw objects, just trying to parse them into some polars dataframe. this is straight forward to pass the string path, but run into the file limit issue
pyflyte run extract.py ingest_text_wf --directory data/XML/bla
Would consider passing a list of files but I dont know exactly what the files are, I just know they're in the folder - How to pass an array or list files in a folder for
pyflyte run ..
@Yee just following up on this
y
i’m confused. if you don’t know what the files are, how can you pass them in a list?
the list syntax should just be json…
["filea, "fileb"]
at least that’s what i use for other types, should also work with a list of files.
but if you don’t know the filenames, you can’t write that out right?
h
Yeah I don't know and am not going to manually key in each filename in that list - so it's more like
[f for f in data_dir.glob(*)]
Hopefully that makes sense. How do I pass that in
pyflyte run
y
not easily today.
can you just pass in the whole folder?
and do the glob in code?
h
@Yee Hm, well then we're back to the issue of one file per directory limit. That said I am not restricted to having a directory with these files - if you had just an XML file and and JPG, and the aim is to parse the XML and process the content as strings in a dataframe - what ingestion method works best for using flyte currently
s
FlyteDirectory should actually work. And the error you're encountering looks like a bug to me. Could you file an issue for the same? As a workaround, you could send a blob storage path (s3 or gcs) or you could return the list of files or directory from a Flyte task and use the same in your subsequent task, which isn't optimized but could resolve the issue you're seeing.
k
@Hud to do this securely we use generated links. But you could use AWS cli as I upload you data and then pass the s3 link
h
@Ketan (kumare3) thanks. Is there such thing as on-premise s3? could I e.g. use minio?
k
You can
That’s what you are already using if you deployed Flyte sandbox
Minio is s3
And AWS cli works L with minio
h
@Ketan (kumare3) uploading my data to my another minio server and pass that s3 link would essentially remove overcome this directory limit correct ?
k
Yup
The limit is only to upload from pyflyte run
But let us brainstorm some ideas
h
Ah so it wont work with
pyflyte run
still? ok let's come back to this later then
k
It will
Just pass the s3 directories path
y
local upload of a dir… i have a pr out for this but needs a fair bit of cleanup before merging
h
@Ketan (kumare3) so I uploaded xml files into the s3 bucket at path and passed the s3 path
pyflyte run --remote minio_file_uploader.py wf --indirpath <s3://my-s3-bucket/input-data>
the goal is process the xml files into a single polars df and export that csv via --remote
Copy code
@dynamic(container_image=polars)
def parse_metadata_files(
    indir: FlyteDirectory, metadata_ext: str
) -> List[FlyteFile]:
    metadata_files = []
    all_out = []
    for var_path, file in indir.crawl():
        fname, extension = Path(file).stem, Path(file).suffix[1:]

        if extension == metadata_ext:
            metadata_files.append((var_path, file))
    metadata_dfs = [
        parse_metadata_file(file=Path(var_path) / file, metadata_type=metadata_ext)
        for var_path, file in metadata_files
    ]
    metadata_df = pl.concat(metadata_dfs, how="diagonal")
    all_out.append(metadata_df.write_csv("out.csv"))

    return all_out

@workflow
def wf(indirpath: FlyteDirectory) -> List[FlyteFile]:  
    """Usage: `pyflyte run --remote file_ingester.py wf --s3_path <s3://my-s3-bucket/input-data> """
    outfiles = parse_metadata_files(indir=indirpath, metadata_ext="xml")
    return outfiles
This leads to this error:
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/flyteingest/workflows/minio_file_uploader.py", line 77, in parse_metadata_files
        metadata_dfs = [
      File "/root/flyteingest/workflows/minio_file_uploader.py", line 78, in <listcomp>
        parse_metadata_file(file=Path(var_path) / file, metadata_type=metadata_ext)
      File "/root/flyteingest/workflows/minio_file_uploader.py", line 62, in parse_metadata_file
        df = parser(file=file)
      File "/root/flyteingest/workflows/minio_file_uploader.py", line 39, in parse_xml
        tree = ET.parse(file)
      File "/usr/local/lib/python3.9/xml/etree/ElementTree.py", line 1222, in parse
        tree.parse(source, parser)
      File "/usr/local/lib/python3.9/xml/etree/ElementTree.py", line 569, in parse
        source = open(source, "rb")

Message:

    [Errno 2] No such file or directory: 's3:/my-s3-bucket/input-data/32d06a1995a18b0.xml'

User error.
parse_metadata_file
and
parse_xml
in this case are neither task, dynamic or workflows, and that xml file does exist in s3. Am I missing something?
k
@Hud why is one
/
missing
s3:/my-s3-bucket/input-data/32d06a1995a18b0.xml
-> note
s3://
h
@Ketan (kumare3) oh - good catch! I assume it's some issue with
file=Path(var_path) / file
but printing the variables usually get obscured by the short error messages - is there a way to display the full traceback?
k
—verbose
h
@Ketan (kumare3) it was indeed a
Path
issue - but still getting the same error:
Copy code
Message:

    [Errno 2] No such file or directory: '<s3://my-s3-bucket/input-data/32d06a1995a18b0.xml>'
s
Since you're sending a file but not a directory,
indirpath
needs to be a
FlyteFile
. Also, I'm not very sure why minio is trying to upload the file but can you make sure the s3 credentials are exported in the terminal?
h
@Samhita Alla am not sending a file - was a directory
pyflyte --verbose run --remote minio_file_uploader.py wf --indirpath <s3://mm-sat/input-data>
is it simply
export ACCESS_KEY=... export SECRET_KEY=...
?
s
Yes.
h
@Samhita Alla not sure if I did that right - since its the same S3 from flyte sandbox
export ACCESS_KEY=minio export SECRET_KEY=miniostorage
- the same error occurs
This does sound trivial, but I am not sure where the error is. The following works:
Copy code
#this works
@dynamic(container_image=polars)
def parse_xml(file: FlyteFile) -> pl.DataFrame:
    # Parse the XML file
    tree = ET.parse(file)
    root = tree.getroot()

    # Access elements and attributes
    result = {}
    for child in root:
        for subchild in child:
            key = subchild.tag
            val = subchild.text
            if "id" in key.lower():
                val = val.lower()[2:]
            result.update({key: val})
    return pl.from_dict(result)

@workflow
def wf(
    file: FlyteFile,
) -> pl.DataFrame:
    outfile = parse_xml(file=file)
    return outfile
But, this doesn't work:
Copy code
def parse_xml(file: FlyteFile) -> pl.DataFrame:
    # Parse the XML file
    tree = ET.parse(file)
    root = tree.getroot()

    # Access elements and attributes
    result = {}
    for child in root:
        for subchild in child:
            key = subchild.tag
            val = subchild.text
            if "id" in key.lower():
                val = val.lower()[2:]
            result.update({key: val})
    return pl.from_dict(result)

@dynamic(container_image=polars)
def parse_metadata_files(indir: FlyteDirectory, metadata_type: str) -> pl.DataFrame:
    metadata_files = []
    for var_path, file in indir.crawl():
        fname, extension = Path(file).stem, Path(file).suffix[1:]
        if extension == metadata_type:
            metadata_files.append((var_path, file))
    metadata_dfs = [
        parse_xml(file=os.path.join(var_path, file))
        for var_path, file in metadata_files
    ]
    return pl.concat(metadata_dfs, how="diagonal")

@workflow
def wf(
    input_dir: FlyteDirectory,
) -> pl.DataFrame:
    return parse_metadata_files(indir=input_dir, metadata_type="xml")
However, trying to run this locally
wf(input_dir="<s3://my-s3-bucket/input-data/>")
I do get an
Unable to locate credentials
error - so maybe I am not exporting the S3 credentials properly?
s
I think you need to set
FLYTE_AWS_ENDPOINT
,
FLYTE_AWS_ACCESS_KEY_ID
and
FLYTE_AWS_SECRET_ACCESS_KEY
environment variables. The endpoint needs to be
<http://localhost:30002/>
.
h
@Samhita Alla ok the credentials are found - however, the same issue occurs locally. Though I do have the full traceback, see attached
s
I'm trying to repro.
@Hud, I exported the three environment variables in the terminal, and it works for me when I run the workflow locally.
h
@Samhita Alla hm, are you able to jump on a quick call to see what I am doing wrong?
s
Sure! Have you exported the env variables?
h
Yes
FLYTE_AWS_ENDPOINT
,
FLYTE_AWS_ACCESS_KEY_ID
and
FLYTE_AWS_SECRET_ACCESS_KEY
were set properly
s
Have you set the endpoint properly?
export FLYTE_AWS_ENDPOINT=<http://localhost:30002>
. I just exported the three env variables and ran the code locally
pyflyte run <python-file> <wf-name>
h
Yes and still the same issue - could we jump on a quick call?
s
Sure