Pulkit Mishra
12/25/2022, 3:54 PMfrom typing import List
from flytekit import task, workflow, conditional
import base64
import requests
import cv2
@task
def _is_base64(data:bytes) -> bool:
try:
return base64.b64encode(base64.b64decode(data)) == data
except Exception:
return False
@task
def download(uri:str) -> bytes:
#code to download from url
return data
@task
def write_video(data:bytes) -> None:
if _is_base64(data=data):
data = base64.b64decode(data)
with open("input.mp4", "wb") as out_file:
out_file.write(data)
@workflow
def wf(data:str, uri:str, job_id:str) -> None:
write_video(data=conditional("input").if_(data == "1").then(download(uri=uri)).else_().then(data))
however I am running into errors and would really appreciate some help.Samhita Alla
12/26/2022, 3:48 AMbytes
isn’t a native Flyte type, and hence, it gets pickled automatically. That should work. But the recommended way of usage is to either send bytes as string or handle bytes conversion using a custom TypeTransformer. If the bytes string is large, you can also store that in a file and return that as a FlyteFile.Pulkit Mishra
12/26/2022, 3:53 AMTraceback (most recent call last):
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/promise.py", line 940, in create_and_link_node
binding_from_python_std(
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/promise.py", line 660, in binding_from_python_std
binding_data = binding_data_from_python_std(ctx, expected_literal_type, t_value, t_value_type)
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/promise.py", line 599, in binding_data_from_python_std
raise AssertionError(
AssertionError: Cannot pass output from task n0 that produces no outputs to a downstream task
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/conda/envs/venv-app/bin/pyflyte", line 8, in <module>
sys.exit(main())
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/click/core.py", line 1130, in __call__
return self.main(*args, **kwargs)
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/click/core.py", line 1055, in main
rv = self.invoke(ctx)
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/click/core.py", line 1657, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/click/core.py", line 1657, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/click/core.py", line 1651, in invoke
cmd_name, cmd, args = self.resolve_command(ctx, args)
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/click/core.py", line 1698, in resolve_command
cmd = self.get_command(ctx, cmd_name)
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/clis/sdk_in_container/run.py", line 607, in get_command
entity = load_naive_entity(module, exe_entity, project_root)
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/clis/sdk_in_container/run.py", line 447, in load_naive_entity
importlib.import_module(module_name)
File "/opt/conda/envs/venv-app/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 843, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/usr/src/app/sports.py", line 57, in <module>
def wf(data:str, uri:str, job_id:str, callback_url:str, result_path:str) -> None:
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/workflow.py", line 739, in workflow
return wrapper(_workflow_function)
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/workflow.py", line 734, in wrapper
workflow_instance.compile()
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/workflow.py", line 614, in compile
workflow_outputs = exception_scopes.user_entry_point(self._workflow_function)(**input_kwargs)
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 198, in user_entry_point
return wrapped(*args, **kwargs)
File "/usr/src/app/sports.py", line 58, in wf
write_video(data=conditional("input").if_(data == "1").then(download(uri=uri)).else_().then(data))
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/base_task.py", line 283, in __call__
return flyte_entity_call_handler(self, *args, **kwargs)
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/promise.py", line 1029, in flyte_entity_call_handler
return create_and_link_node(ctx, entity=entity, **kwargs)
File "/opt/conda/envs/venv-app/lib/python3.8/site-packages/flytekit/core/promise.py", line 950, in create_and_link_node
raise AssertionError(f"Failed to Bind variable {k} for function {entity.name}.") from e
AssertionError: Failed to Bind variable data for function sports.write_video.
Samhita Alla
12/26/2022, 3:59 AMelse-then
construct. Could you return data
from a task if that value needs to be included in else-then
?Pulkit Mishra
12/26/2022, 4:05 AMSamhita Alla
12/26/2022, 4:57 AMdata
in else
? You’re sending conditional
output to write_video
task, so you need to return something to it. When you include data
in else
, it doesn’t return any output. Got it?Pulkit Mishra
12/26/2022, 5:56 AM@task
def decode(data:bytes) -> bytes:
try:
if base64.b64encode(base64.b64decode(data)) == data:
return base64.b64decode(data)
else:
return data
except Exception:
return data
@task
def download(uri:str) -> bytes:
data, err = #code to download video from uri
return data
@task
def return_data(data:bytes) -> bytes:
return data
@task
def write_video(data:bytes) -> None:
with open("input.mp4", "wb") as out_file:
out_file.write(data)
write_video(data=decode(data=conditional("input").if_(data == "1").then(download(uri=uri)).else_().then(return_data(data=data))))
Samhita Alla
12/26/2022, 6:00 AMreturn_data
accepts data
of type bytes
when data
is of type str
as per the workflow input?Pulkit Mishra
12/26/2022, 8:04 AM