Pryce
04/07/2023, 5:34 AMfrom typing import List
from flytekit import kwtypes, workflow, dynamic
from flytekit.extras.tasks.shell import OutputLocation, ShellTask
from flytekit.types.file import FlyteFile
s1 = ShellTask(
name="shorten",
debug=True,
script="""
set -ex
head {inputs.infile} > {outputs.i}
""",
inputs=kwtypes(infile=FlyteFile),
output_locs=[OutputLocation(var="i", var_type=FlyteFile, location="outfile.txt")],
)
@dynamic
def shorten_files(files_in: List[FlyteFile]) -> str:
for i in range(len(files_in)):
s1(infile=files_in[i])
return "DONE"
@workflow
def wf(files_in: List[FlyteFile]) -> str:
return shorten_files(files_in=files_in)
if __name__ == "__main__":
print(f"Running wf() {wf()}")
.. invoked with:
pyflyte run --remote dynamic_shell.py wf --files_in '["xaa", "xab"]'
Any help would be greatly appreciated!Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 203, in user_entry_point
return wrapped(*args, **kwargs)
File "/root/cat_sam.py", line 21, in shorten_files
s1(infile=files_in[i])
File "/usr/local/lib/python3.10/site-packages/flytekit/core/base_task.py", line 299, in __call__
return flyte_entity_call_handler(self, *args, **kwargs) # type: ignore
File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 1065, in flyte_entity_call_handler
return create_and_link_node(ctx, entity=entity, **kwargs)
File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 985, in create_and_link_node
raise AssertionError(f"Failed to Bind variable {k} for function {entity.name}.") from e
Message:
Failed to Bind variable infile for function shorten.
User error.
So it seems there's some issue with passing FlyteFiles to tasks dynamically perhaps?Ketan (kumare3)
David Muraco
04/07/2023, 2:42 PMFailed to put data from /var/folders/dn/g28_vt8518dcgmj85qjt2yr00000gq/T/flyte-kvefddf2/raw/278a7fe38181b54b9fbfcb9a0b788243/xaa to <s3://my-s3-bucket/data/nn/f06f7de19b1384f11983-n0-0/77f44f27f51a4686d1369a43712e9d79/xaa> (recursive=False).\n\nOriginal exception: [Errno 2] No such file or directory: '/var/folders/dn/g28_vt8518dcgmj85qjt2yr00000gq/T/flyte-kvefddf2/raw/278a7fe38181b54b9fbfcb9a0b788243/xaa'\n\n
is it looking for a file in the job execution path instead of the relative file name?Pryce
04/07/2023, 6:17 PM{"asctime": "2023-04-07 05:29:07,844", "name": "flytekit", "levelname": "ERROR", "message": "Exception when executing Failed to Bind variable infile for function shorten.", "exc_info": "Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/flytekit/core/data_persistence.py", line 472, in put_data
DataPersistencePlugins.find_plugin(remote_path)(data_config=self.data_config).put(
File "/usr/local/lib/python3.10/site-packages/flytekitplugins/fsspec/persist.py", line 129, in put
return fs.put(from_path, to_path, recursive=recursive)
File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 114, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 99, in sync
raise return_result
File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 54, in _runner
result[0] = await coro
File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 503, in _put
return await _run_coros_in_chunks(
File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 249, in _run_coros_in_chunks
await asyncio.gather(*chunk, return_exceptions=return_exceptions),
File "/usr/local/lib/python3.10/asyncio/tasks.py", line 408, in wait_for
return await fut
File "/usr/local/lib/python3.10/site-packages/s3fs/core.py", line 1091, in _put_file
size = os.path.getsize(lpath)
File "/usr/local/lib/python3.10/genericpath.py", line 50, in getsize
return os.stat(filename).st_size
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/flyte-dxy5yx9l/raw/161c980be086f6693f212f3f7f2e2e6f/xaa'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 975, in create_and_link_node
binding_from_python_std(
File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 663, in binding_from_python_std
binding_data = binding_data_from_python_std(ctx, expected_literal_type, t_value, t_value_type)
File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 652, in binding_data_from_python_std
scalar = TypeEngine.to_literal(ctx, t_value, t_value_type or type(t_value), expected_literal_type).scalar
File "/usr/local/lib/python3.10/site-packages/flytekit/core/type_engine.py", line 820, in to_literal
lv = transformer.to_literal(ctx, python_val, python_type, expected)
File "/usr/local/lib/python3.10/site-packages/flytekit/types/file/file.py", line 330, in to_literal
ctx.file_access.put_data(source_path, remote_path, is_multipart=False)
File "/usr/local/lib/python3.10/site-packages/flytekit/core/data_persistence.py", line 476, in put_data
raise FlyteAssertion(
flytekit.exceptions.user.FlyteAssertion: Failed to put data from /tmp/flyte-dxy5yx9l/raw/161c980be086f6693f212f3f7f2e2e6f/xaa to <s3://my-s3-bucket/data/3v/f8099287f537c486da13-n0-0/26d802008024641c702ba75569262a56/xaa> (recursive=False).
Original exception: [Errno 2] No such file or directory: '/tmp/flyte-dxy5yx9l/raw/161c980be086f6693f212f3f7f2e2e6f/xaa'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 203, in user_entry_point
return wrapped(*args, **kwargs)
File "/root/cat_sam.py", line 21, in shorten_files
s1(infile=files_in[i])
File "/usr/local/lib/python3.10/site-packages/flytekit/core/base_task.py", line 299, in __call__
return flyte_entity_call_handler(self, *args, **kwargs) # type: ignore
File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 1065, in flyte_entity_call_handler
return create_and_link_node(ctx, entity=entity, **kwargs)
File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 985, in create_and_link_node
raise AssertionError(f"Failed to Bind variable {k} for function {entity.name}.") from e
AssertionError: Failed to Bind variable infile for function shorten.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/flytekit/core/base_task.py", line 524, in dispatch_execute
native_outputs = self.execute(**native_inputs)
File "/usr/local/lib/python3.10/site-packages/flytekit/core/python_function_task.py", line 166, in execute
return self.dynamic_execute(self._task_function, **kwargs)
File "/usr/local/lib/python3.10/site-packages/flytekit/core/python_function_task.py", line 301, in dynamic_execute
return self.compile_into_workflow(ctx, task_function, **kwargs)
File "/usr/local/lib/python3.10/site-packages/flytekit/core/python_function_task.py", line 196, in compile_into_workflow
cast(PythonFunctionWorkflow, self._wf).compile(**kwargs)
File "/usr/local/lib/python3.10/site-packages/flytekit/core/workflow.py", line 650, in compile
workflow_outputs = exception_scopes.user_entry_point(self._workflow_function)(**input_kwargs)
File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 214, in user_entry_point
raise FlyteScopedUserException(*_exc_info())
flytekit.exceptions.scopes.FlyteScopedUserException: Failed to Bind variable infile for function shorten."}
create_entities()
from the shell task example when the task is dynamic maybe?pyflyte run
isn't supported, maybe the members of that list are just being parsed as strings? I'm still working through all this I was just curious if I others had encountered something similarKevin Su
04/07/2023, 10:25 PMPryce
04/07/2023, 11:07 PMKevin Su
04/07/2023, 11:12 PMPryce
04/07/2023, 11:16 PMKevin Su
04/08/2023, 8:33 PMPryce
04/08/2023, 8:37 PMcurrent_context().working_directory
so it's not exactly the same.
Concretely, this works no problem in a task/wf: x = FlyteFile("<s3://my-s3-bucket/arbitrary-data/test.txt>")
Whereas os.listdir(FlyteDirectory("<s3://_my-s3-bucket/arbitrary-data/>"))_
will result in "No such files or directory".
I'm guessing this is somewhat expected behavior, just curious if this is a use-case that should be supported or I'm just doing things wrong? 😅Samhita Alla
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
@task
def t1():
return FlyteDirectory("<s3://my-s3-bucket/arbitrary>")
@workflow
def wf():
t1()
Pryce
04/10/2023, 7:23 PMimport os
from typing import List
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
@task
def t1() -> List[FlyteFile]:
files = []
for f in os.listdir(FlyteDirectory("<s3://my-s3-bucket/arbitrary>")):
files.append(f)
return files
@workflow
def wf():
t1()
My naive suspicion is that os.listdir gets called locally (to the exec container) and the dag doesn't convert paths for dirs like it does for full prefixes. Which is why it works for context().working_dir
derived paths (because they're already of the form my-s3-bucket/data/7z/f28031d37446d44c38ec-n1-0-dn3-0
) but not arbitrary ones. Might be way off base though... Thanks for looking into it, I have a workaround so I'm not hung up on this particular quirk. Love the blog posts btw!Samhita Alla
Love the blog posts btw!Thanks! A
FlyteDirectory
needs to be sent as an input to a task; it gets uploaded/downloaded at the task boundary. Can you send directory as an input to t1
and annotate it with FlyteDirectory
? os.listdir
should work then. You probably might have to pass the credentials for it to work. You can set the following env variables for demo cluster.
os.environ["AWS_ENDPOINT"] = "<http://localhost:30002/>"
os.environ["AWS_ACCESS_KEY_ID"] = "minio"
os.environ["AWS_SECRET_ACCESS_KEY"] = "miniostorage"
When I tried, it was throwing an access denied error. When you set the permissions correctly, it has to work!Pryce
04/12/2023, 8:44 PMimport os
from typing import List
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
@task
def t1(indir: FlyteDirectory) -> List[FlyteFile]:
files = []
for f in os.listdir(indir):
files.append(os.path.join(indir, f))
return files
@workflow
def wf():
t1(indir=FlyteDirectory("<s3://my-s3-bucket/arbitrary>"))
Samhita Alla
Pryce
04/13/2023, 9:53 PMx = FlyteFile(path='<s3://arbitrary-path/file.txt>')
won't raise an exception even though open(x, 'r')
within that same task will raise a file not found
error. Likewise, instantiating y = FlyteDirectory(path='<s3://arbitrary-path>')
will also work fine until I run os.listdir
. I was never doing anything with those FlyteFiles within the same context, but I assumed that since their init didn't raise an exception that their path must be valid within that context, and therefore the listing out a FlyteDir should work fine.