freezing-airport-6809
cold-train-21872
09/22/2023, 12:08 PMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
cold-train-21872
09/22/2023, 1:32 PMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
cold-train-21872
09/22/2023, 1:56 PMโญโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ Traceback (most recent call last) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฎ
โ in <module>:3 โ
โ โ
โ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/task.py:280 in task โ
โ โ
โ โฑ 280 โ โ return wrapper(_task_function) โ
โ โ
โ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/task.py:260 in wrapper โ
โ โ
โ โฑ 260 โ โ task_instance = TaskPlugins.find_pythontask_plugin(type(task_config))( โ
โ โ
โ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/tracker.py:77 in โ
โ __call__ โ
โ โ
โ โฑ 77 โ โ o = super(InstanceTrackingMeta, cls).__call__(*args, **kwargs) โ
โ โ
โ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/python_function_task.py: โ
โ 121 in __init__ โ
โ โ
โ โฑ 121 โ โ super().__init__( โ
โ โ
โ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/python_auto_container.py โ
โ :85 in __init__ โ
โ โ
โ โฑ 85 โ โ super().__init__( โ
โ โ
โ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/base_task.py:432 in โ
โ __init__ โ
โ โ
โ โฑ 432 โ โ โ interface=transform_interface_to_typed_interface(interface), โ
โ โ
โ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/interface.py:247 in โ
โ transform_interface_to_typed_interface โ
โ โ
โ โฑ 247 โ inputs_map = transform_variable_map(interface.inputs, input_descriptions) โ
โ โ
โ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/interface.py:345 in โ
โ transform_variable_map โ
โ โ
โ โฑ 345 โ โ โ res[k] = transform_type(v, descriptions.get(k, k)) โ
โ โ
โ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/interface.py:350 in โ
โ transform_type โ
โ โ
โ โฑ 350 โ return _interface_models.Variable(type=TypeEngine.to_literal_type(x), description=de โ
โ โ
โ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/core/type_engine.py:857 in โ
โ to_literal_type โ
โ โ
โ โฑ 857 โ โ res = transformer.get_literal_type(python_type) โ
โ โ
โ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/types/file/file.py:322 in โ
โ get_literal_type โ
โ โ
โ โฑ 322 โ โ return LiteralType(blob=self._blob_type(format=FlyteFilePathTransformer.get_form โ
โ โ
โ /Users/hud/projects/mm-sat/flyteingest/.venv/lib/python3.9/site-packages/flytekit/types/file/file.py:306 in โ
โ get_format โ
โ โ
โ โฑ 306 โ โ return typing.cast(FlyteFile, t).extension() โ
โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ
AttributeError: type object 'Path' has no attribute 'extension'
Have not installed the plugins but not sure if that is the cause of thiscold-train-21872
09/22/2023, 3:10 PMpathlib
is not supported in Flyte, so I changed file_path: Path
to file: FlyteFile
or files: List[FlyteFile]
But these are promise objects-
How do I iterate through Promise objects?magnificent-teacher-86590
09/22/2023, 4:14 PM@workflow
cold-train-21872
09/22/2023, 4:18 PM@task
def parse_metadata_xml(file: FlyteFile) -> pl.DataFrame:
# Parse the XML file
tree = ET.parse(file)
root = tree.getroot()
# Access elements and attributes
result = {}
for child in root:
for subchild in child:
key = subchild.tag
val = subchild.text
result.update({key: val})
return pl.from_dict(result)
@workflow
def ingest_raw_data(files: List[FlyteFile]) -> pl.DataFrame:
data = []
for file in files:
if file.suffix == MetadataType.DIZBD.value:
data.append(parse_metadata_dizbd(file))
elif file.suffix == MetadataType.XML.value:
data.append(parse_metadata_xml(file))
return pl.concat(data, how="diagonal")
Although if there is a better practice to refactor this I would love to know howmagnificent-teacher-86590
09/22/2023, 4:19 PMmagnificent-teacher-86590
09/22/2023, 4:20 PM@dynamic
def dynamic_ingest_raw_data()
...
@workflow
def ingest_raw_data()
dynamic_ingest_raw_data()
magnificent-teacher-86590
09/22/2023, 4:21 PMcold-train-21872
09/22/2023, 4:22 PMList[FlyteFile]
? Or is it FlyteDirectory
bettermagnificent-teacher-86590
09/22/2023, 5:13 PMcold-train-21872
09/22/2023, 5:59 PMFlyteDirectory
as an input to some function, is there a way to get I can get that local path back?
def ingest(data_dir: FlyteDirectory):
for flytepath, filename in data_dir.crawl():
... # can I get `path/to/some/data` here somehow?
ingest("path/to/some/data")
magnificent-teacher-86590
09/22/2023, 7:40 PMcold-train-21872
09/22/2023, 7:43 PMflytepath
returned something /var/../..
- so technically flytepath
or data_dir.path
should return the path?magnificent-teacher-86590
09/22/2023, 7:48 PMcold-train-21872
09/22/2023, 7:49 PMpyflyte run ..
, with argparse? or is there a better wayfreezing-airport-6809
cold-train-21872
09/23/2023, 5:08 PM@dynamic
def parse_directory(
directory: FlyteDirectory, metadata_types: List[str]
) -> pl.DataFrame:
data = []
for metadata_type in metadata_types:
for var_path, file in directory.crawl():
extension = Path(file).suffix[1:]
if extension == metadata_type:
df = parse_metadata_file(
file=Path(var_path) / file, metadata_type=metadata_type
)
data.append(df)
return pl.concat(data, how="diagonal")
And running this pyflyte run extract.py parse_directory --directory data/XML/bla --metadata_types '["xml"]'
gave this error:
Currently only directories containing one file are supported, found [100] files found in data/XML/bla
Assuming passing a list of files is a workaround, e.g. ideally pyflyte run extract.py parse_directory --files '[f for f in dir.glob(*)]' ...
but I am not sure how to do thistall-lock-23197
cold-train-21872
09/25/2023, 11:48 AM"data/XML/bla"
gave the same error.tall-lock-23197
pyflyte run
. Can you add a workflow and call the dynamic workflow from within it? Please pass the name of the workflow to the pyflyte run
command.cold-train-21872
09/25/2023, 1:12 PMparse_metadata_files
and parse_raw_objects
. Since I figured workflows cant contain for loops
@workflow
def ingest_text_wf(
data_dir: FlyteDirectory
) -> pl.DataFrame:
# dynamic
d = parse_metadata_files(
directory=data_dir
)
# dynamic
d2 = parse_raw_objects(
directory=data_dir
)
return d.join(d2, on="ID", how="outer")
this still gives me the same error with pyflyte run
. But also, if I run this with argument parser with python extract.py --data_dir data/XML/bla
it results in`AttributeError: 'Promise' object has no attribute 'join'`tall-lock-23197
cold-train-21872
09/25/2023, 1:13 PMtall-lock-23197
thankful-minister-83577
join
on themcold-train-21872
09/25/2023, 5:06 PMthankful-minister-83577
thankful-minister-83577
cold-train-21872
09/25/2023, 9:58 PMpyflyte run extract.py ingest_text_wf --directory data/XML/bla
Would consider passing a list of files but I dont know exactly what the files are, I just know they're in the folder - How to pass an array or list files in a folder for pyflyte run ..
cold-train-21872
09/27/2023, 12:30 PMthankful-minister-83577
thankful-minister-83577
["filea, "fileb"]
thankful-minister-83577
thankful-minister-83577
cold-train-21872
09/27/2023, 4:08 PM[f for f in data_dir.glob(*)]
Hopefully that makes sense. How do I pass that in pyflyte run
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
cold-train-21872
09/28/2023, 12:40 AMtall-lock-23197
freezing-airport-6809
freezing-airport-6809
cold-train-21872
09/28/2023, 2:48 PMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
cold-train-21872
09/28/2023, 2:52 PMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
cold-train-21872
09/28/2023, 3:00 PMpyflyte run
still? ok let's come back to this later thenfreezing-airport-6809
freezing-airport-6809
thankful-minister-83577
cold-train-21872
10/02/2023, 8:36 PMpyflyte run --remote minio_file_uploader.py wf --indirpath <s3://my-s3-bucket/input-data>
the goal is process the xml files into a single polars df and export that csv via --remote
@dynamic(container_image=polars)
def parse_metadata_files(
indir: FlyteDirectory, metadata_ext: str
) -> List[FlyteFile]:
metadata_files = []
all_out = []
for var_path, file in indir.crawl():
fname, extension = Path(file).stem, Path(file).suffix[1:]
if extension == metadata_ext:
metadata_files.append((var_path, file))
metadata_dfs = [
parse_metadata_file(file=Path(var_path) / file, metadata_type=metadata_ext)
for var_path, file in metadata_files
]
metadata_df = pl.concat(metadata_dfs, how="diagonal")
all_out.append(metadata_df.write_csv("out.csv"))
return all_out
@workflow
def wf(indirpath: FlyteDirectory) -> List[FlyteFile]:
"""Usage: `pyflyte run --remote file_ingester.py wf --s3_path <s3://my-s3-bucket/input-data> """
outfiles = parse_metadata_files(indir=indirpath, metadata_ext="xml")
return outfiles
This leads to this error:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
return wrapped(*args, **kwargs)
File "/root/flyteingest/workflows/minio_file_uploader.py", line 77, in parse_metadata_files
metadata_dfs = [
File "/root/flyteingest/workflows/minio_file_uploader.py", line 78, in <listcomp>
parse_metadata_file(file=Path(var_path) / file, metadata_type=metadata_ext)
File "/root/flyteingest/workflows/minio_file_uploader.py", line 62, in parse_metadata_file
df = parser(file=file)
File "/root/flyteingest/workflows/minio_file_uploader.py", line 39, in parse_xml
tree = ET.parse(file)
File "/usr/local/lib/python3.9/xml/etree/ElementTree.py", line 1222, in parse
tree.parse(source, parser)
File "/usr/local/lib/python3.9/xml/etree/ElementTree.py", line 569, in parse
source = open(source, "rb")
Message:
[Errno 2] No such file or directory: 's3:/my-s3-bucket/input-data/32d06a1995a18b0.xml'
User error.
parse_metadata_file
and parse_xml
in this case are neither task, dynamic or workflows, and that xml file does exist in s3. Am I missing something?freezing-airport-6809
/
missing s3:/my-s3-bucket/input-data/32d06a1995a18b0.xml
-> note s3://
cold-train-21872
10/03/2023, 12:35 AMfile=Path(var_path) / file
but printing the variables usually get obscured by the short error messages - is there a way to display the full traceback?freezing-airport-6809
cold-train-21872
10/03/2023, 2:42 AMPath
issue - but still getting the same error:
Message:
[Errno 2] No such file or directory: '<s3://my-s3-bucket/input-data/32d06a1995a18b0.xml>'
tall-lock-23197
indirpath
needs to be a FlyteFile
. Also, I'm not very sure why minio is trying to upload the file but can you make sure the s3 credentials are exported in the terminal?cold-train-21872
10/03/2023, 11:07 AMpyflyte --verbose run --remote minio_file_uploader.py wf --indirpath <s3://mm-sat/input-data>
is it simply export ACCESS_KEY=... export SECRET_KEY=...
?tall-lock-23197
cold-train-21872
10/04/2023, 12:57 AMexport ACCESS_KEY=minio export SECRET_KEY=miniostorage
- the same error occurscold-train-21872
10/04/2023, 2:01 AM#this works
@dynamic(container_image=polars)
def parse_xml(file: FlyteFile) -> pl.DataFrame:
# Parse the XML file
tree = ET.parse(file)
root = tree.getroot()
# Access elements and attributes
result = {}
for child in root:
for subchild in child:
key = subchild.tag
val = subchild.text
if "id" in key.lower():
val = val.lower()[2:]
result.update({key: val})
return pl.from_dict(result)
@workflow
def wf(
file: FlyteFile,
) -> pl.DataFrame:
outfile = parse_xml(file=file)
return outfile
But, this doesn't work:
def parse_xml(file: FlyteFile) -> pl.DataFrame:
# Parse the XML file
tree = ET.parse(file)
root = tree.getroot()
# Access elements and attributes
result = {}
for child in root:
for subchild in child:
key = subchild.tag
val = subchild.text
if "id" in key.lower():
val = val.lower()[2:]
result.update({key: val})
return pl.from_dict(result)
@dynamic(container_image=polars)
def parse_metadata_files(indir: FlyteDirectory, metadata_type: str) -> pl.DataFrame:
metadata_files = []
for var_path, file in indir.crawl():
fname, extension = Path(file).stem, Path(file).suffix[1:]
if extension == metadata_type:
metadata_files.append((var_path, file))
metadata_dfs = [
parse_xml(file=os.path.join(var_path, file))
for var_path, file in metadata_files
]
return pl.concat(metadata_dfs, how="diagonal")
@workflow
def wf(
input_dir: FlyteDirectory,
) -> pl.DataFrame:
return parse_metadata_files(indir=input_dir, metadata_type="xml")
However, trying to run this locally wf(input_dir="<s3://my-s3-bucket/input-data/>")
I do get an Unable to locate credentials
error - so maybe I am not exporting the S3 credentials properly?tall-lock-23197
FLYTE_AWS_ENDPOINT
, FLYTE_AWS_ACCESS_KEY_ID
and FLYTE_AWS_SECRET_ACCESS_KEY
environment variables. The endpoint needs to be <http://localhost:30002/>
.cold-train-21872
10/04/2023, 12:13 PMtall-lock-23197
tall-lock-23197
cold-train-21872
10/05/2023, 11:38 AMtall-lock-23197
cold-train-21872
10/05/2023, 12:17 PMFLYTE_AWS_ENDPOINT
, FLYTE_AWS_ACCESS_KEY_ID
and FLYTE_AWS_SECRET_ACCESS_KEY
were set properlytall-lock-23197
export FLYTE_AWS_ENDPOINT=<http://localhost:30002>
. I just exported the three env variables and ran the code locally pyflyte run <python-file> <wf-name>
cold-train-21872
10/05/2023, 12:23 PMtall-lock-23197