Hi I'm trying to write a workflow that takes in vi...
# ask-the-community
p
Hi I'm trying to write a workflow that takes in video as input and simply writes it. Input can either be in uri variable which is a downloadable link to video or it can be in data which is base64 string It is known that if input is an uri then value of data is going to be 1. Thus I check if value of data is 1 and if it is then call download task which downloads video from uri and saves content in data. This data is then passed to write_video task I'm new to flyte and this code is what I have so far.
Copy code
from typing import List
from flytekit import task, workflow, conditional
import base64
import requests
import cv2

@task
def _is_base64(data:bytes) -> bool:
    try:
        return base64.b64encode(base64.b64decode(data)) == data
    except Exception:
        return False

@task
def download(uri:str) -> bytes:
    #code to download from url
    return data

@task
def write_video(data:bytes) -> None:
    if _is_base64(data=data):
        data = base64.b64decode(data)
    with open("input.mp4", "wb") as out_file:
        out_file.write(data)

@workflow
def wf(data:str, uri:str, job_id:str) -> None:
    write_video(data=conditional("input").if_(data == "1").then(download(uri=uri)).else_().then(data))
however I am running into errors and would really appreciate some help.
One clear issue is the way I am handling bytes type. what is the correct way of doing that?
s
Hi @Pulkit Mishra!
bytes
isn’t a native Flyte type, and hence, it gets pickled automatically. That should work. But the recommended way of usage is to either send bytes as string or handle bytes conversion using a custom TypeTransformer. If the bytes string is large, you can also store that in a file and return that as a FlyteFile.
p
thanks for the response @Samhita Alla I understand, will change bytes to FlyteFile. but if that is not the reason for failure then can you help me understand what is? This is the stacktrace I am getting
Copy code
Traceback (most recent call last):
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/promise.py", line 940, in create_and_link_node
    binding_from_python_std(
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/promise.py", line 660, in binding_from_python_std
    binding_data = binding_data_from_python_std(ctx, expected_literal_type, t_value, t_value_type)
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/promise.py", line 599, in binding_data_from_python_std
    raise AssertionError(
AssertionError: Cannot pass output from task n0 that produces no outputs to a downstream task

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/conda/envs/venv-app/bin/pyflyte", line 8, in <module>
    sys.exit(main())
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/click/core.py", line 1657, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/click/core.py", line 1657, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/click/core.py", line 1651, in invoke
    cmd_name, cmd, args = self.resolve_command(ctx, args)
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/click/core.py", line 1698, in resolve_command
    cmd = self.get_command(ctx, cmd_name)
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/clis/sdk_in_container/run.py", line 607, in get_command
    entity = load_naive_entity(module, exe_entity, project_root)
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/clis/sdk_in_container/run.py", line 447, in load_naive_entity
    importlib.import_module(module_name)
  File "/opt/conda/envs/venv-app/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 843, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/usr/src/app/sports.py", line 57, in <module>
    def wf(data:str, uri:str, job_id:str, callback_url:str, result_path:str) -> None:
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/workflow.py", line 739, in workflow
    return wrapper(_workflow_function)
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/workflow.py", line 734, in wrapper
    workflow_instance.compile()
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/workflow.py", line 614, in compile
    workflow_outputs = exception_scopes.user_entry_point(self._workflow_function)(**input_kwargs)
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 198, in user_entry_point
    return wrapped(*args, **kwargs)
  File "/usr/src/app/sports.py", line 58, in wf
    write_video(data=conditional("input").if_(data == "1").then(download(uri=uri)).else_().then(data))
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/base_task.py", line 283, in __call__
    return flyte_entity_call_handler(self, *args, **kwargs)
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/promise.py", line 1029, in flyte_entity_call_handler
    return create_and_link_node(ctx, entity=entity, **kwargs)
  File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/promise.py", line 950, in create_and_link_node
    raise AssertionError(f"Failed to Bind variable {k} for function {entity.name}.") from e
AssertionError: Failed to Bind variable data for function sports.write_video.
s
You’ll need to include a task in the
else-then
construct. Could you return
data
from a task if that value needs to be included in
else-then
?
p
So I need to create a dummy task that takes in data and simply returns it? Is there a better way of doing this maybe?
s
Why do you want to return
data
in
else
? You’re sending
conditional
output to
write_video
task, so you need to return something to it. When you include
data
in
else
, it doesn’t return any output. Got it?
p
This is how I am handling it for now. Other than changing bytes handling to TypeTransformer is there something else you would suggest changing @Samhita Alla?
Copy code
@task
def decode(data:bytes) -> bytes:
    try:
        if base64.b64encode(base64.b64decode(data)) == data:
            return base64.b64decode(data)
        else:
            return data
    except Exception:
        return data

@task
def download(uri:str) -> bytes:
    data, err = #code to download video from uri
    return data

@task
def return_data(data:bytes) -> bytes:
    return data

@task
def write_video(data:bytes) -> None:
    with open("input.mp4", "wb") as out_file:
        out_file.write(data)

write_video(data=decode(data=conditional("input").if_(data == "1").then(download(uri=uri)).else_().then(return_data(data=data))))
s
I’m wondering why
return_data
accepts
data
of type
bytes
when
data
is of type
str
as per the workflow input?
p
Yeah my bad. I only checked when the input is uri
154 Views