https://flyte.org logo
#ask-the-community
Title
# ask-the-community
k

Ketan (kumare3)

09/22/2023, 3:36 AM
It should have raised an error, tuples are not supported
h

Hud

09/22/2023, 12:08 PM
hm, what's the best way to deal with this? Or whats the best practice - JSON and pd DataFrames?
k

Ketan (kumare3)

09/22/2023, 1:30 PM
Return dataclass or dataframe
Pandas, polars all are supported
And you can return multiple values, just triple type on its own is not today
h

Hud

09/22/2023, 1:32 PM
Ok will do - maybe a separate question in your experience is polars or dask dataframes preferable for processing larger data
k

Ketan (kumare3)

09/22/2023, 1:35 PM
Depends - we like polars. Distributed not needed and Flyte can give you a large node
Check polars Flyte plugin or modin/dask also
But please install the plugins
h

Hud

09/22/2023, 1:56 PM
hm, getting this error with `pl.DataFrame`:
Copy code
╭─────────────────────────────────────── Traceback (most recent call last) ───────────────────────────────────────╮
│ in <module>:3                                                                                                   │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/task.py:280 in task      │
│                                                                                                                 │
│ ❱ 280 │   │   return wrapper(_task_function)                                                                    │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/task.py:260 in wrapper   │
│                                                                                                                 │
│ ❱ 260 │   │   task_instance = TaskPlugins.find_pythontask_plugin(type(task_config))(                            │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/tracker.py:77 in         │
│ __call__                                                                                                        │
│                                                                                                                 │
│ ❱  77 │   │   o = super(InstanceTrackingMeta, cls).__call__(*args, **kwargs)                                    │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/python_function_task.py: │
│ 121 in __init__                                                                                                 │
│                                                                                                                 │
│ ❱ 121 │   │   super().__init__(                                                                                 │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/python_auto_container.py │
│ :85 in __init__                                                                                                 │
│                                                                                                                 │
│ ❱  85 │   │   super().__init__(                                                                                 │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/base_task.py:432 in      │
│ __init__                                                                                                        │
│                                                                                                                 │
│ ❱ 432 │   │   │   interface=transform_interface_to_typed_interface(interface),                                  │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/interface.py:247 in      │
│ transform_interface_to_typed_interface                                                                          │
│                                                                                                                 │
│ ❱ 247 │   inputs_map = transform_variable_map(interface.inputs, input_descriptions)                             │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/interface.py:345 in      │
│ transform_variable_map                                                                                          │
│                                                                                                                 │
│ ❱ 345 │   │   │   res[k] = transform_type(v, descriptions.get(k, k))                                            │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/interface.py:350 in      │
│ transform_type                                                                                                  │
│                                                                                                                 │
│ ❱ 350 │   return _interface_models.Variable(type=TypeEngine.to_literal_type(x), description=de                  │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/type_engine.py:857 in    │
│ to_literal_type                                                                                                 │
│                                                                                                                 │
│ ❱  857 │   │   res = transformer.get_literal_type(python_type)                                                  │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/types/file/file.py:322 in     │
│ get_literal_type                                                                                                │
│                                                                                                                 │
│ ❱ 322 │   │   return LiteralType(blob=self._blob_type(format=FlyteFilePathTransformer.get_form                  │
│                                                                                                                 │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/types/file/file.py:306 in     │
│ get_format                                                                                                      │
│                                                                                                                 │
│ ❱ 306 │   │   return typing.cast(FlyteFile, t).extension()                                                      │
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
AttributeError: type object 'Path' has no attribute 'extension'
Have not installed the plugins but not sure if that is the cause of this
I am just reading about how
pathlib
is not supported in Flyte, so I changed
file_path: Path
to
file: FlyteFile
or
files: List[FlyteFile]
But these are promise objects- How do I iterate through Promise objects?
j

Jay Ganbat

09/22/2023, 4:14 PM
where are you iterating over those objects, is it in the
@workflow
h

Hud

09/22/2023, 4:18 PM
@Jay Ganbat yes:
Copy code
@task
def parse_metadata_xml(file: FlyteFile) -> pl.DataFrame:
    # Parse the XML file
    tree = ET.parse(file)
    root = tree.getroot()

    # Access elements and attributes
    result = {}
    for child in root:
        for subchild in child:
            key = subchild.tag
            val = subchild.text
            result.update({key: val})
    return pl.from_dict(result)

@workflow
def ingest_raw_data(files: List[FlyteFile]) -> pl.DataFrame:
    data = []
    for file in files:
        if file.suffix == MetadataType.DIZBD.value:
            data.append(parse_metadata_dizbd(file))
        elif file.suffix == MetadataType.XML.value:
            data.append(parse_metadata_xml(file))
    return pl.concat(data, how="diagonal")
Although if there is a better practice to refactor this I would love to know how
j

Jay Ganbat

09/22/2023, 4:19 PM
yeah you cant do logical operations in workflow
i would suggest create a dynamic task that does that
Copy code
@dynamic
def dynamic_ingest_raw_data()
   ...

@workflow
def ingest_raw_data()
   dynamic_ingest_raw_data()
so during registration time, everything is Promise and not realized with actual input
h

Hud

09/22/2023, 4:22 PM
Ah ok so the dynamic can take in the
List[FlyteFile]
? Or is it
FlyteDirectory
better
j

Jay Ganbat

09/22/2023, 5:13 PM
hmm if you know what files yo will be expecting then list of flytefile is probably better otherwise it might try to pull down everything in that folder if its remote
h

Hud

09/22/2023, 5:59 PM
Okay - just wondering - I can add a local path as
FlyteDirectory
as an input to some function, is there a way to get I can get that local path back?
Copy code
def ingest(data_dir: FlyteDirectory):
    for flytepath, filename in data_dir.crawl():
        ... # can I get `path/to/some/data` here somehow?

        
ingest("path/to/some/data")
j

Jay Ganbat

09/22/2023, 7:40 PM
im a bit confused, if its local dir then Flytedirectory should work like a regular directory right so should return the path
h

Hud

09/22/2023, 7:43 PM
Odd, I dont have it open anymore but last I checked
flytepath
returned something
/var/../..
- so technically
flytepath
or
data_dir.path
should return the path?
j

Jay Ganbat

09/22/2023, 7:48 PM
yeah it should
h

Hud

09/22/2023, 7:49 PM
Ok thanks Jay - Oh, how should one pass a list of files in
pyflyte run ..
, with argparse? or is there a better way
k

Ketan (kumare3)

09/23/2023, 2:06 PM
Cc @Peeter Piegaze
h

Hud

09/23/2023, 5:08 PM
So I am currently giving the directory as an input:
Copy code
@dynamic
def parse_directory(
    directory: FlyteDirectory, metadata_types: List[str]
) -> pl.DataFrame:
    data = []
    for metadata_type in metadata_types:
        for var_path, file in directory.crawl():
            extension = Path(file).suffix[1:]
            if extension == metadata_type:
                df = parse_metadata_file(
                    file=Path(var_path) / file, metadata_type=metadata_type
                )
                data.append(df)
    return pl.concat(data, how="diagonal")
And running this
pyflyte run extract.py parse_directory --directory data/XML/bla --metadata_types '["xml"]'
gave this error:
Copy code
Currently only directories containing one file are supported, found [100] files found in data/XML/bla
Assuming passing a list of files is a workaround, e.g. ideally
pyflyte run extract.py parse_directory --files '[f for f in dir.glob(*)]' ...
but I am not sure how to do this
s

Samhita Alla

09/25/2023, 4:18 AM
@Hud, you should be able to send FlyteDirectory to a workflow as a string, enclosed in quotes. Can you try that and let us know if you're still seeing the error?
h

Hud

09/25/2023, 11:48 AM
@Samhita Alla adding quotes to
"data/XML/bla"
gave the same error.
s

Samhita Alla

09/25/2023, 12:59 PM
Oh I don't think you can call dynamic workflows directly using
pyflyte run
. Can you add a workflow and call the dynamic workflow from within it? Please pass the name of the workflow to the
pyflyte run
command.
h

Hud

09/25/2023, 1:12 PM
So not sure if this is what you meant but I have added a workflow that contains dynamic functions
parse_metadata_files
and
parse_raw_objects
. Since I figured workflows cant contain for loops
Copy code
@workflow
def ingest_text_wf(
    data_dir: FlyteDirectory
) -> pl.DataFrame:
    # dynamic
    d = parse_metadata_files( 
        directory=data_dir
    )
    # dynamic
    d2 = parse_raw_objects(
        directory=data_dir
    )
    return d.join(d2, on="ID", how="outer")
this still gives me the same error with
pyflyte run
. But also, if I run this with argument parser with
python extract.py --data_dir data/XML/bla
it results in`AttributeError: 'Promise' object has no attribute 'join'`
s

Samhita Alla

09/25/2023, 1:13 PM
Which flytekit version have you installed?
h

Hud

09/25/2023, 1:13 PM
1.9.1
s

Samhita Alla

09/25/2023, 1:26 PM
Looks like a bug to me. @Eduardo Apolinario (eapolinario) / @Kevin Su, could you confirm?
y

Yee

09/25/2023, 5:00 PM
@Hud this won’t work right? This is something we need to address more cleanly as it’s come up multiple times, but the workflow only looks and kinda acts like python - it’s not actually python. these are indeed promise objects that get passed around, so you don’t be able to
join
on them
h

Hud

09/25/2023, 5:06 PM
@Yee yes I'm grappling with the concept slowly, which is why I started with dynamic instead of a workflow - that gave me the 1 directory 1 object limit error - how to remove this limit?
y

Yee

09/25/2023, 9:41 PM
can you use a list?
sorry just hopping on here, could you summarize the issue for me please?
h

Hud

09/25/2023, 9:58 PM
@Yee So I have a folder with metadata and raw objects, just trying to parse them into some polars dataframe. this is straight forward to pass the string path, but run into the file limit issue
pyflyte run extract.py ingest_text_wf --directory data/XML/bla
Would consider passing a list of files but I dont know exactly what the files are, I just know they're in the folder - How to pass an array or list files in a folder for
pyflyte run ..
@Yee just following up on this
y

Yee

09/27/2023, 4:04 PM
i’m confused. if you don’t know what the files are, how can you pass them in a list?
the list syntax should just be json…
["filea, "fileb"]
at least that’s what i use for other types, should also work with a list of files.
but if you don’t know the filenames, you can’t write that out right?
h

Hud

09/27/2023, 4:08 PM
Yeah I don't know and am not going to manually key in each filename in that list - so it's more like
[f for f in data_dir.glob(*)]
Hopefully that makes sense. How do I pass that in
pyflyte run
y

Yee

09/27/2023, 4:14 PM
not easily today.
can you just pass in the whole folder?
and do the glob in code?
h

Hud

09/28/2023, 12:40 AM
@Yee Hm, well then we're back to the issue of one file per directory limit. That said I am not restricted to having a directory with these files - if you had just an XML file and and JPG, and the aim is to parse the XML and process the content as strings in a dataframe - what ingestion method works best for using flyte currently
s

Samhita Alla

09/28/2023, 6:32 AM
FlyteDirectory should actually work. And the error you're encountering looks like a bug to me. Could you file an issue for the same? As a workaround, you could send a blob storage path (s3 or gcs) or you could return the list of files or directory from a Flyte task and use the same in your subsequent task, which isn't optimized but could resolve the issue you're seeing.
k

Ketan (kumare3)

09/28/2023, 2:47 PM
@Hud to do this securely we use generated links. But you could use AWS cli as I upload you data and then pass the s3 link
h

Hud

09/28/2023, 2:48 PM
@Ketan (kumare3) thanks. Is there such thing as on-premise s3? could I e.g. use minio?
k

Ketan (kumare3)

09/28/2023, 2:48 PM
You can
That’s what you are already using if you deployed Flyte sandbox
Minio is s3
And AWS cli works L with minio
h

Hud

09/28/2023, 2:52 PM
@Ketan (kumare3) uploading my data to my another minio server and pass that s3 link would essentially remove overcome this directory limit correct ?
k

Ketan (kumare3)

09/28/2023, 2:59 PM
Yup
The limit is only to upload from pyflyte run
But let us brainstorm some ideas
h

Hud

09/28/2023, 3:00 PM
Ah so it wont work with
pyflyte run
still? ok let's come back to this later then
k

Ketan (kumare3)

09/28/2023, 3:50 PM
It will
Just pass the s3 directories path
y

Yee

09/28/2023, 4:22 PM
local upload of a dir… i have a pr out for this but needs a fair bit of cleanup before merging
h

Hud

10/02/2023, 8:36 PM
@Ketan (kumare3) so I uploaded xml files into the s3 bucket at path and passed the s3 path
pyflyte run --remote minio_file_uploader.py wf --indirpath <s3://my-s3-bucket/input-data>
the goal is process the xml files into a single polars df and export that csv via --remote
Copy code
@dynamic(container_image=polars)
def parse_metadata_files(
    indir: FlyteDirectory, metadata_ext: str
) -> List[FlyteFile]:
    metadata_files = []
    all_out = []
    for var_path, file in indir.crawl():
        fname, extension = Path(file).stem, Path(file).suffix[1:]

        if extension == metadata_ext:
            metadata_files.append((var_path, file))
    metadata_dfs = [
        parse_metadata_file(file=Path(var_path) / file, metadata_type=metadata_ext)
        for var_path, file in metadata_files
    ]
    metadata_df = pl.concat(metadata_dfs, how="diagonal")
    all_out.append(metadata_df.write_csv("out.csv"))

    return all_out

@workflow
def wf(indirpath: FlyteDirectory) -> List[FlyteFile]:  
    """Usage: `pyflyte run --remote file_ingester.py wf --s3_path <s3://my-s3-bucket/input-data> """
    outfiles = parse_metadata_files(indir=indirpath, metadata_ext="xml")
    return outfiles
This leads to this error:
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/flyteingest/workflows/minio_file_uploader.py", line 77, in parse_metadata_files
        metadata_dfs = [
      File "/root/flyteingest/workflows/minio_file_uploader.py", line 78, in <listcomp>
        parse_metadata_file(file=Path(var_path) / file, metadata_type=metadata_ext)
      File "/root/flyteingest/workflows/minio_file_uploader.py", line 62, in parse_metadata_file
        df = parser(file=file)
      File "/root/flyteingest/workflows/minio_file_uploader.py", line 39, in parse_xml
        tree = ET.parse(file)
      File "/usr/local/lib/python3.9/xml/etree/ElementTree.py", line 1222, in parse
        tree.parse(source, parser)
      File "/usr/local/lib/python3.9/xml/etree/ElementTree.py", line 569, in parse
        source = open(source, "rb")

Message:

    [Errno 2] No such file or directory: 's3:/my-s3-bucket/input-data/32d06a1995a18b0.xml'

User error.
parse_metadata_file
and
parse_xml
in this case are neither task, dynamic or workflows, and that xml file does exist in s3. Am I missing something?
k

Ketan (kumare3)

10/02/2023, 10:52 PM
@Hud why is one
/
missing
s3:/my-s3-bucket/input-data/32d06a1995a18b0.xml
-> note
s3://
h

Hud

10/03/2023, 12:35 AM
@Ketan (kumare3) oh - good catch! I assume it's some issue with
file=Path(var_path) / file
but printing the variables usually get obscured by the short error messages - is there a way to display the full traceback?
k

Ketan (kumare3)

10/03/2023, 12:40 AM
—verbose
h

Hud

10/03/2023, 2:42 AM
@Ketan (kumare3) it was indeed a
Path
issue - but still getting the same error:
Copy code
Message:

    [Errno 2] No such file or directory: '<s3://my-s3-bucket/input-data/32d06a1995a18b0.xml>'
s

Samhita Alla

10/03/2023, 4:55 AM
Since you're sending a file but not a directory,
indirpath
needs to be a
FlyteFile
. Also, I'm not very sure why minio is trying to upload the file but can you make sure the s3 credentials are exported in the terminal?
h

Hud

10/03/2023, 11:07 AM
@Samhita Alla am not sending a file - was a directory
pyflyte --verbose run --remote minio_file_uploader.py wf --indirpath <s3://mm-sat/input-data>
is it simply
export ACCESS_KEY=... export SECRET_KEY=...
?
s

Samhita Alla

10/03/2023, 1:07 PM
Yes.
h

Hud

10/04/2023, 12:57 AM
@Samhita Alla not sure if I did that right - since its the same S3 from flyte sandbox
export ACCESS_KEY=minio export SECRET_KEY=miniostorage
- the same error occurs
This does sound trivial, but I am not sure where the error is. The following works:
Copy code
#this works
@dynamic(container_image=polars)
def parse_xml(file: FlyteFile) -> pl.DataFrame:
    # Parse the XML file
    tree = ET.parse(file)
    root = tree.getroot()

    # Access elements and attributes
    result = {}
    for child in root:
        for subchild in child:
            key = subchild.tag
            val = subchild.text
            if "id" in key.lower():
                val = val.lower()[2:]
            result.update({key: val})
    return pl.from_dict(result)

@workflow
def wf(
    file: FlyteFile,
) -> pl.DataFrame:
    outfile = parse_xml(file=file)
    return outfile
But, this doesn't work:
Copy code
def parse_xml(file: FlyteFile) -> pl.DataFrame:
    # Parse the XML file
    tree = ET.parse(file)
    root = tree.getroot()

    # Access elements and attributes
    result = {}
    for child in root:
        for subchild in child:
            key = subchild.tag
            val = subchild.text
            if "id" in key.lower():
                val = val.lower()[2:]
            result.update({key: val})
    return pl.from_dict(result)

@dynamic(container_image=polars)
def parse_metadata_files(indir: FlyteDirectory, metadata_type: str) -> pl.DataFrame:
    metadata_files = []
    for var_path, file in indir.crawl():
        fname, extension = Path(file).stem, Path(file).suffix[1:]
        if extension == metadata_type:
            metadata_files.append((var_path, file))
    metadata_dfs = [
        parse_xml(file=os.path.join(var_path, file))
        for var_path, file in metadata_files
    ]
    return pl.concat(metadata_dfs, how="diagonal")

@workflow
def wf(
    input_dir: FlyteDirectory,
) -> pl.DataFrame:
    return parse_metadata_files(indir=input_dir, metadata_type="xml")
However, trying to run this locally
wf(input_dir="<s3://my-s3-bucket/input-data/>")
I do get an
Unable to locate credentials
error - so maybe I am not exporting the S3 credentials properly?
s

Samhita Alla

10/04/2023, 5:25 AM
I think you need to set
FLYTE_AWS_ENDPOINT
,
FLYTE_AWS_ACCESS_KEY_ID
and
FLYTE_AWS_SECRET_ACCESS_KEY
environment variables. The endpoint needs to be
<http://localhost:30002/>
.
h

Hud

10/04/2023, 12:13 PM
@Samhita Alla ok the credentials are found - however, the same issue occurs locally. Though I do have the full traceback, see attached
s

Samhita Alla

10/04/2023, 1:23 PM
I'm trying to repro.
@Hud, I exported the three environment variables in the terminal, and it works for me when I run the workflow locally.
h

Hud

10/05/2023, 11:38 AM
@Samhita Alla hm, are you able to jump on a quick call to see what I am doing wrong?
s

Samhita Alla

10/05/2023, 12:15 PM
Sure! Have you exported the env variables?
h

Hud

10/05/2023, 12:17 PM
Yes
FLYTE_AWS_ENDPOINT
,
FLYTE_AWS_ACCESS_KEY_ID
and
FLYTE_AWS_SECRET_ACCESS_KEY
were set properly
s

Samhita Alla

10/05/2023, 12:18 PM
Have you set the endpoint properly?
export FLYTE_AWS_ENDPOINT=<http://localhost:30002>
. I just exported the three env variables and ran the code locally
pyflyte run <python-file> <wf-name>
h

Hud

10/05/2023, 12:23 PM
Yes and still the same issue - could we jump on a quick call?
s

Samhita Alla

10/05/2023, 12:23 PM
Sure
3 Views