Hey gang, curious if anyone has experience running...
# ask-the-community
p
Hey gang, curious if anyone has experience running dynamic shell tasks? Been banging my head against a "Failed to Bind variable" error for a couple hours now.. I realize that dynamic tasks are quite strict around what you can and can't do owing to the promises / run-time compilation but I just can't figure out what I'm doing wrong:
Copy code
from typing import List
from flytekit import kwtypes, workflow, dynamic
from flytekit.extras.tasks.shell import OutputLocation, ShellTask
from flytekit.types.file import FlyteFile

s1 = ShellTask(
    name="shorten",
    debug=True,
    script="""
    set -ex
    head {inputs.infile} > {outputs.i}
    """,
    inputs=kwtypes(infile=FlyteFile),
    output_locs=[OutputLocation(var="i", var_type=FlyteFile, location="outfile.txt")],
)

@dynamic
def shorten_files(files_in: List[FlyteFile]) -> str:

    for i in range(len(files_in)):
        s1(infile=files_in[i])

    return "DONE"

@workflow
def wf(files_in: List[FlyteFile]) -> str:

    return shorten_files(files_in=files_in)


if __name__ == "__main__":
    print(f"Running wf() {wf()}")
.. invoked with:
pyflyte run --remote dynamic_shell.py wf --files_in '["xaa", "xab"]'
Any help would be greatly appreciated!
Seems to be an issue with passing around FlyteFiles as doing so with a list of strings and outputting to a FlyteFile is working. The salient error from before was:
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 203, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/cat_sam.py", line 21, in shorten_files
        s1(infile=files_in[i])
      File "/usr/local/lib/python3.10/site-packages/flytekit/core/base_task.py", line 299, in __call__
        return flyte_entity_call_handler(self, *args, **kwargs)  # type: ignore
      File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 1065, in flyte_entity_call_handler
        return create_and_link_node(ctx, entity=entity, **kwargs)
      File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 985, in create_and_link_node
        raise AssertionError(f"Failed to Bind variable {k} for function {entity.name}.") from e

Message:

    Failed to Bind variable infile for function shorten.

User error.
So it seems there's some issue with passing FlyteFiles to tasks dynamically perhaps?
k
That is odd, she’ll tasks are no different
AFK, but someone will take a look
d
in the kubernetes log, do you get something like this:
Failed to put data from /var/folders/dn/g28_vt8518dcgmj85qjt2yr00000gq/T/flyte-kvefddf2/raw/278a7fe38181b54b9fbfcb9a0b788243/xaa to <s3://my-s3-bucket/data/nn/f06f7de19b1384f11983-n0-0/77f44f27f51a4686d1369a43712e9d79/xaa> (recursive=False).\n\nOriginal exception: [Errno 2] No such file or directory: '/var/folders/dn/g28_vt8518dcgmj85qjt2yr00000gq/T/flyte-kvefddf2/raw/278a7fe38181b54b9fbfcb9a0b788243/xaa'\n\n
is it looking for a file in the job execution path instead of the relative file name?
p
Yep that's buried in the main flytekit ERROR log:
Copy code
{"asctime": "2023-04-07 05:29:07,844", "name": "flytekit", "levelname": "ERROR", "message": "Exception when executing Failed to Bind variable infile for function shorten.", "exc_info": "Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/data_persistence.py", line 472, in put_data
    DataPersistencePlugins.find_plugin(remote_path)(data_config=self.data_config).put(
  File "/usr/local/lib/python3.10/site-packages/flytekitplugins/fsspec/persist.py", line 129, in put
    return fs.put(from_path, to_path, recursive=recursive)
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 114, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 99, in sync
    raise return_result
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 54, in _runner
    result[0] = await coro
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 503, in _put
    return await _run_coros_in_chunks(
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 249, in _run_coros_in_chunks
    await asyncio.gather(*chunk, return_exceptions=return_exceptions),
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 408, in wait_for
    return await fut
  File "/usr/local/lib/python3.10/site-packages/s3fs/core.py", line 1091, in _put_file
    size = os.path.getsize(lpath)
  File "/usr/local/lib/python3.10/genericpath.py", line 50, in getsize
    return os.stat(filename).st_size
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/flyte-dxy5yx9l/raw/161c980be086f6693f212f3f7f2e2e6f/xaa'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 975, in create_and_link_node
    binding_from_python_std(
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 663, in binding_from_python_std
    binding_data = binding_data_from_python_std(ctx, expected_literal_type, t_value, t_value_type)
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 652, in binding_data_from_python_std
    scalar = TypeEngine.to_literal(ctx, t_value, t_value_type or type(t_value), expected_literal_type).scalar
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/type_engine.py", line 820, in to_literal
    lv = transformer.to_literal(ctx, python_val, python_type, expected)
  File "/usr/local/lib/python3.10/site-packages/flytekit/types/file/file.py", line 330, in to_literal
    ctx.file_access.put_data(source_path, remote_path, is_multipart=False)
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/data_persistence.py", line 476, in put_data
    raise FlyteAssertion(
flytekit.exceptions.user.FlyteAssertion: Failed to put data from /tmp/flyte-dxy5yx9l/raw/161c980be086f6693f212f3f7f2e2e6f/xaa to <s3://my-s3-bucket/data/3v/f8099287f537c486da13-n0-0/26d802008024641c702ba75569262a56/xaa> (recursive=False).

Original exception: [Errno 2] No such file or directory: '/tmp/flyte-dxy5yx9l/raw/161c980be086f6693f212f3f7f2e2e6f/xaa'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 203, in user_entry_point
    return wrapped(*args, **kwargs)
  File "/root/cat_sam.py", line 21, in shorten_files
    s1(infile=files_in[i])
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/base_task.py", line 299, in __call__
    return flyte_entity_call_handler(self, *args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 1065, in flyte_entity_call_handler
    return create_and_link_node(ctx, entity=entity, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/promise.py", line 985, in create_and_link_node
    raise AssertionError(f"Failed to Bind variable {k} for function {entity.name}.") from e
AssertionError: Failed to Bind variable infile for function shorten.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/base_task.py", line 524, in dispatch_execute
    native_outputs = self.execute(**native_inputs)
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/python_function_task.py", line 166, in execute
    return self.dynamic_execute(self._task_function, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/python_function_task.py", line 301, in dynamic_execute
    return self.compile_into_workflow(ctx, task_function, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/python_function_task.py", line 196, in compile_into_workflow
    cast(PythonFunctionWorkflow, self._wf).compile(**kwargs)
  File "/usr/local/lib/python3.10/site-packages/flytekit/core/workflow.py", line 650, in compile
    workflow_outputs = exception_scopes.user_entry_point(self._workflow_function)(**input_kwargs)
  File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 214, in user_entry_point
    raise FlyteScopedUserException(*_exc_info())
flytekit.exceptions.scopes.FlyteScopedUserException: Failed to Bind variable infile for function shorten."}
Do I need to run something like
create_entities()
from the shell task example when the task is dynamic maybe?
I also considered that maybe passing in a list of files to
pyflyte run
isn't supported, maybe the members of that list are just being parsed as strings? I'm still working through all this I was just curious if I others had encountered something similar
k
I just found the issue. flytekit doesn’t copy the file to s3 bucket, so dynamic task can’t find the file. I’m fixing it.
p
Awesome - thank you! I've had no trouble passing single files from the CLI args through a dynamic task, FWIW
k
Yes, single file can work, but list of files don’t. It’s a bug
p
Can I provide anything else for the bug ticket?
k
p
That was quick! Thanks for following up - I'll keep an eye on it.
As a workaround I tried using FlyteDirectory with an arbitrary path that already exists in the local object store, but I keep getting "No such files or directory" when trying to access it via os.listdir in tasks or the workflow definition itself. I've been mainly relying on the working with folders docs and the shell task docs, but they instantiate FlyteDirectory with
current_context().working_directory
so it's not exactly the same. Concretely, this works no problem in a task/wf:
x = FlyteFile("<s3://my-s3-bucket/arbitrary-data/test.txt>")
Whereas
os.listdir(FlyteDirectory("<s3://_my-s3-bucket/arbitrary-data/>"))_
will result in "No such files or directory". I'm guessing this is somewhat expected behavior, just curious if this is a use-case that should be supported or I'm just doing things wrong? 😅
s
It should work, @Pryce. I created a folder named "arbitrary" in minio and when I trigger this workflow, the execution's successful.
Copy code
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory


@task
def t1():
    return FlyteDirectory("<s3://my-s3-bucket/arbitrary>")


@workflow
def wf():
    t1()
p
Hey @Samhita Alla! I gave that a go and indeed it succeeds (even without the prefix existing beforehand)! However, when I run os.listdir as suggested in the "Working with Folders" docs then I get the "No such file or directory" error. In this example I had made the "arbitrary" prefix and put a couple objects in there.
Copy code
import os
from typing import List
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile


@task
def t1() -> List[FlyteFile]:
    files = []
    for f in os.listdir(FlyteDirectory("<s3://my-s3-bucket/arbitrary>")):
        files.append(f)
    return files


@workflow
def wf():
    t1()
My naive suspicion is that os.listdir gets called locally (to the exec container) and the dag doesn't convert paths for dirs like it does for full prefixes. Which is why it works for
context().working_dir
derived paths (because they're already of the form
my-s3-bucket/data/7z/f28031d37446d44c38ec-n1-0-dn3-0
) but not arbitrary ones. Might be way off base though... Thanks for looking into it, I have a workaround so I'm not hung up on this particular quirk. Love the blog posts btw!
s
Love the blog posts btw!
Thanks! A
FlyteDirectory
needs to be sent as an input to a task; it gets uploaded/downloaded at the task boundary. Can you send directory as an input to
t1
and annotate it with
FlyteDirectory
?
os.listdir
should work then. You probably might have to pass the credentials for it to work. You can set the following env variables for demo cluster.
Copy code
os.environ["AWS_ENDPOINT"] = "<http://localhost:30002/>"
os.environ["AWS_ACCESS_KEY_ID"] = "minio"
os.environ["AWS_SECRET_ACCESS_KEY"] = "miniostorage"
When I tried, it was throwing an access denied error. When you set the permissions correctly, it has to work!
p
Aha! I got it working, the trick was passing it in at the task boundary (can't believe I didn't stumble on that specific configuration in my testing). I did not need credentials in the end. Here is the working code if you're curious:
Copy code
import os
from typing import List
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile


@task
def t1(indir: FlyteDirectory) -> List[FlyteFile]:
    files = []
    for f in os.listdir(indir):
        files.append(os.path.join(indir, f))
    return files


@workflow
def wf():
    t1(indir=FlyteDirectory("<s3://my-s3-bucket/arbitrary>"))
The trick really is to pay close attention to the context that those variables are being instantiated in. If it's within a task then that context won't have any knowledge of directories whereas it seems to be okay with files, so dirs have to be done at the boundary. Thanks for walking me through it, this has been a big help!
s
It shouldn't be okay with files either. Not sure why that's working for you!
p
I had to look back across my commits to understand this but you're definitely right. I can instantiate a FlyteDir or FlyteFile with an arbitrary path at any point within a task without issue, but I can't do anything with it in that task's context. So doing
x = FlyteFile(path='<s3://arbitrary-path/file.txt>')
won't raise an exception even though
open(x, 'r')
within that same task will raise a
file not found
error. Likewise, instantiating
y = FlyteDirectory(path='<s3://arbitrary-path>')
will also work fine until I run
os.listdir
. I was never doing anything with those FlyteFiles within the same context, but I assumed that since their init didn't raise an exception that their path must be valid within that context, and therefore the listing out a FlyteDir should work fine.
I'm just un-learning my local filesystem intuitions/assumptions that are evidently pretty baked in! 😅
153 Views