Ketan (kumare3)
Hud
09/22/2023, 12:08 PMKetan (kumare3)
Hud
09/22/2023, 1:32 PMKetan (kumare3)
Hud
09/22/2023, 1:56 PM╭─────────────────────────────────────── Traceback (most recent call last) ───────────────────────────────────────╮
│ in <module>:3 │
│ │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/task.py:280 in task │
│ │
│ ❱ 280 │ │ return wrapper(_task_function) │
│ │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/task.py:260 in wrapper │
│ │
│ ❱ 260 │ │ task_instance = TaskPlugins.find_pythontask_plugin(type(task_config))( │
│ │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/tracker.py:77 in │
│ __call__ │
│ │
│ ❱ 77 │ │ o = super(InstanceTrackingMeta, cls).__call__(*args, **kwargs) │
│ │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/python_function_task.py: │
│ 121 in __init__ │
│ │
│ ❱ 121 │ │ super().__init__( │
│ │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/python_auto_container.py │
│ :85 in __init__ │
│ │
│ ❱ 85 │ │ super().__init__( │
│ │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/base_task.py:432 in │
│ __init__ │
│ │
│ ❱ 432 │ │ │ interface=transform_interface_to_typed_interface(interface), │
│ │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/interface.py:247 in │
│ transform_interface_to_typed_interface │
│ │
│ ❱ 247 │ inputs_map = transform_variable_map(interface.inputs, input_descriptions) │
│ │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/interface.py:345 in │
│ transform_variable_map │
│ │
│ ❱ 345 │ │ │ res[k] = transform_type(v, descriptions.get(k, k)) │
│ │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/interface.py:350 in │
│ transform_type │
│ │
│ ❱ 350 │ return _interface_models.Variable(type=TypeEngine.to_literal_type(x), description=de │
│ │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/type_engine.py:857 in │
│ to_literal_type │
│ │
│ ❱ 857 │ │ res = transformer.get_literal_type(python_type) │
│ │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/types/file/file.py:322 in │
│ get_literal_type │
│ │
│ ❱ 322 │ │ return LiteralType(blob=self._blob_type(format=FlyteFilePathTransformer.get_form │
│ │
│ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/types/file/file.py:306 in │
│ get_format │
│ │
│ ❱ 306 │ │ return typing.cast(FlyteFile, t).extension() │
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
AttributeError: type object 'Path' has no attribute 'extension'
Have not installed the plugins but not sure if that is the cause of thispathlib
is not supported in Flyte, so I changed file_path: Path
to file: FlyteFile
or files: List[FlyteFile]
But these are promise objects-
How do I iterate through Promise objects?Jay Ganbat
09/22/2023, 4:14 PM@workflow
Hud
09/22/2023, 4:18 PM@task
def parse_metadata_xml(file: FlyteFile) -> pl.DataFrame:
# Parse the XML file
tree = ET.parse(file)
root = tree.getroot()
# Access elements and attributes
result = {}
for child in root:
for subchild in child:
key = subchild.tag
val = subchild.text
result.update({key: val})
return pl.from_dict(result)
@workflow
def ingest_raw_data(files: List[FlyteFile]) -> pl.DataFrame:
data = []
for file in files:
if file.suffix == MetadataType.DIZBD.value:
data.append(parse_metadata_dizbd(file))
elif file.suffix == MetadataType.XML.value:
data.append(parse_metadata_xml(file))
return pl.concat(data, how="diagonal")
Although if there is a better practice to refactor this I would love to know howJay Ganbat
09/22/2023, 4:19 PM@dynamic
def dynamic_ingest_raw_data()
...
@workflow
def ingest_raw_data()
dynamic_ingest_raw_data()
Hud
09/22/2023, 4:22 PMList[FlyteFile]
? Or is it FlyteDirectory
betterJay Ganbat
09/22/2023, 5:13 PMHud
09/22/2023, 5:59 PMFlyteDirectory
as an input to some function, is there a way to get I can get that local path back?
def ingest(data_dir: FlyteDirectory):
for flytepath, filename in data_dir.crawl():
... # can I get `path/to/some/data` here somehow?
ingest("path/to/some/data")
Jay Ganbat
09/22/2023, 7:40 PMHud
09/22/2023, 7:43 PMflytepath
returned something /var/../..
- so technically flytepath
or data_dir.path
should return the path?Jay Ganbat
09/22/2023, 7:48 PMHud
09/22/2023, 7:49 PMpyflyte run ..
, with argparse? or is there a better wayKetan (kumare3)
Hud
09/23/2023, 5:08 PM@dynamic
def parse_directory(
directory: FlyteDirectory, metadata_types: List[str]
) -> pl.DataFrame:
data = []
for metadata_type in metadata_types:
for var_path, file in directory.crawl():
extension = Path(file).suffix[1:]
if extension == metadata_type:
df = parse_metadata_file(
file=Path(var_path) / file, metadata_type=metadata_type
)
data.append(df)
return pl.concat(data, how="diagonal")
And running this pyflyte run extract.py parse_directory --directory data/XML/bla --metadata_types '["xml"]'
gave this error:
Currently only directories containing one file are supported, found [100] files found in data/XML/bla
Assuming passing a list of files is a workaround, e.g. ideally pyflyte run extract.py parse_directory --files '[f for f in dir.glob(*)]' ...
but I am not sure how to do thisSamhita Alla
Hud
09/25/2023, 11:48 AM"data/XML/bla"
gave the same error.Samhita Alla
pyflyte run
. Can you add a workflow and call the dynamic workflow from within it? Please pass the name of the workflow to the pyflyte run
command.Hud
09/25/2023, 1:12 PMparse_metadata_files
and parse_raw_objects
. Since I figured workflows cant contain for loops
@workflow
def ingest_text_wf(
data_dir: FlyteDirectory
) -> pl.DataFrame:
# dynamic
d = parse_metadata_files(
directory=data_dir
)
# dynamic
d2 = parse_raw_objects(
directory=data_dir
)
return d.join(d2, on="ID", how="outer")
this still gives me the same error with pyflyte run
. But also, if I run this with argument parser with python extract.py --data_dir data/XML/bla
it results in`AttributeError: 'Promise' object has no attribute 'join'`Samhita Alla
Hud
09/25/2023, 1:13 PMSamhita Alla
Yee
join
on themHud
09/25/2023, 5:06 PMYee
Hud
09/25/2023, 9:58 PMpyflyte run extract.py ingest_text_wf --directory data/XML/bla
Would consider passing a list of files but I dont know exactly what the files are, I just know they're in the folder - How to pass an array or list files in a folder for pyflyte run ..
Yee
["filea, "fileb"]
Hud
09/27/2023, 4:08 PM[f for f in data_dir.glob(*)]
Hopefully that makes sense. How do I pass that in pyflyte run
Yee
Hud
09/28/2023, 12:40 AMSamhita Alla
Ketan (kumare3)
Hud
09/28/2023, 2:48 PMKetan (kumare3)
Hud
09/28/2023, 2:52 PMKetan (kumare3)
Hud
09/28/2023, 3:00 PMpyflyte run
still? ok let's come back to this later thenKetan (kumare3)
Yee
Hud
10/02/2023, 8:36 PMpyflyte run --remote minio_file_uploader.py wf --indirpath <s3://my-s3-bucket/input-data>
the goal is process the xml files into a single polars df and export that csv via --remote
@dynamic(container_image=polars)
def parse_metadata_files(
indir: FlyteDirectory, metadata_ext: str
) -> List[FlyteFile]:
metadata_files = []
all_out = []
for var_path, file in indir.crawl():
fname, extension = Path(file).stem, Path(file).suffix[1:]
if extension == metadata_ext:
metadata_files.append((var_path, file))
metadata_dfs = [
parse_metadata_file(file=Path(var_path) / file, metadata_type=metadata_ext)
for var_path, file in metadata_files
]
metadata_df = pl.concat(metadata_dfs, how="diagonal")
all_out.append(metadata_df.write_csv("out.csv"))
return all_out
@workflow
def wf(indirpath: FlyteDirectory) -> List[FlyteFile]:
"""Usage: `pyflyte run --remote file_ingester.py wf --s3_path <s3://my-s3-bucket/input-data> """
outfiles = parse_metadata_files(indir=indirpath, metadata_ext="xml")
return outfiles
This leads to this error:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
return wrapped(*args, **kwargs)
File "/root/flyteingest/workflows/minio_file_uploader.py", line 77, in parse_metadata_files
metadata_dfs = [
File "/root/flyteingest/workflows/minio_file_uploader.py", line 78, in <listcomp>
parse_metadata_file(file=Path(var_path) / file, metadata_type=metadata_ext)
File "/root/flyteingest/workflows/minio_file_uploader.py", line 62, in parse_metadata_file
df = parser(file=file)
File "/root/flyteingest/workflows/minio_file_uploader.py", line 39, in parse_xml
tree = ET.parse(file)
File "/usr/local/lib/python3.9/xml/etree/ElementTree.py", line 1222, in parse
tree.parse(source, parser)
File "/usr/local/lib/python3.9/xml/etree/ElementTree.py", line 569, in parse
source = open(source, "rb")
Message:
[Errno 2] No such file or directory: 's3:/my-s3-bucket/input-data/32d06a1995a18b0.xml'
User error.
parse_metadata_file
and parse_xml
in this case are neither task, dynamic or workflows, and that xml file does exist in s3. Am I missing something?Ketan (kumare3)
/
missing s3:/my-s3-bucket/input-data/32d06a1995a18b0.xml
-> note s3://
Hud
10/03/2023, 12:35 AMfile=Path(var_path) / file
but printing the variables usually get obscured by the short error messages - is there a way to display the full traceback?Ketan (kumare3)
Hud
10/03/2023, 2:42 AMPath
issue - but still getting the same error:
Message:
[Errno 2] No such file or directory: '<s3://my-s3-bucket/input-data/32d06a1995a18b0.xml>'
Samhita Alla
indirpath
needs to be a FlyteFile
. Also, I'm not very sure why minio is trying to upload the file but can you make sure the s3 credentials are exported in the terminal?Hud
10/03/2023, 11:07 AMpyflyte --verbose run --remote minio_file_uploader.py wf --indirpath <s3://mm-sat/input-data>
is it simply export ACCESS_KEY=... export SECRET_KEY=...
?Samhita Alla
Hud
10/04/2023, 12:57 AMexport ACCESS_KEY=minio export SECRET_KEY=miniostorage
- the same error occurs#this works
@dynamic(container_image=polars)
def parse_xml(file: FlyteFile) -> pl.DataFrame:
# Parse the XML file
tree = ET.parse(file)
root = tree.getroot()
# Access elements and attributes
result = {}
for child in root:
for subchild in child:
key = subchild.tag
val = subchild.text
if "id" in key.lower():
val = val.lower()[2:]
result.update({key: val})
return pl.from_dict(result)
@workflow
def wf(
file: FlyteFile,
) -> pl.DataFrame:
outfile = parse_xml(file=file)
return outfile
But, this doesn't work:
def parse_xml(file: FlyteFile) -> pl.DataFrame:
# Parse the XML file
tree = ET.parse(file)
root = tree.getroot()
# Access elements and attributes
result = {}
for child in root:
for subchild in child:
key = subchild.tag
val = subchild.text
if "id" in key.lower():
val = val.lower()[2:]
result.update({key: val})
return pl.from_dict(result)
@dynamic(container_image=polars)
def parse_metadata_files(indir: FlyteDirectory, metadata_type: str) -> pl.DataFrame:
metadata_files = []
for var_path, file in indir.crawl():
fname, extension = Path(file).stem, Path(file).suffix[1:]
if extension == metadata_type:
metadata_files.append((var_path, file))
metadata_dfs = [
parse_xml(file=os.path.join(var_path, file))
for var_path, file in metadata_files
]
return pl.concat(metadata_dfs, how="diagonal")
@workflow
def wf(
input_dir: FlyteDirectory,
) -> pl.DataFrame:
return parse_metadata_files(indir=input_dir, metadata_type="xml")
However, trying to run this locally wf(input_dir="<s3://my-s3-bucket/input-data/>")
I do get an Unable to locate credentials
error - so maybe I am not exporting the S3 credentials properly?Samhita Alla
FLYTE_AWS_ENDPOINT
, FLYTE_AWS_ACCESS_KEY_ID
and FLYTE_AWS_SECRET_ACCESS_KEY
environment variables. The endpoint needs to be <http://localhost:30002/>
.Hud
10/04/2023, 12:13 PMSamhita Alla
Hud
10/05/2023, 11:38 AMSamhita Alla
Hud
10/05/2023, 12:17 PMFLYTE_AWS_ENDPOINT
, FLYTE_AWS_ACCESS_KEY_ID
and FLYTE_AWS_SECRET_ACCESS_KEY
were set properlySamhita Alla
export FLYTE_AWS_ENDPOINT=<http://localhost:30002>
. I just exported the three env variables and ran the code locally pyflyte run <python-file> <wf-name>
Hud
10/05/2023, 12:23 PMSamhita Alla