bored-beard-89967
05/13/2024, 1:54 PM@task
def test_task(a: int, b: int | None) -> int:
return a + b if b is not None else a
@workflow
def test_workflow(a: int, b: int | None) -> int:
# Imagine this is a launch plan from another project
return test_task(a=a, b=b)
@dynamic
def test_workflow_dynamic() -> list[int]:
rs = []
for a in range(3):
# b will never be used in this dynamic workflow
r = test_workflow(a=a, b=None)
rs.append(r)
return rs
@workflow
def multiple_workflow_runs() -> list[int]:
return test_workflow_dynamic()
Gives us
AssertionError: Failed to Bind variable b for function flyte.xyz.abc.test_workflow.
Yes, it might make sense to avoid optional altogether and build a new workflow per combination of non-none args, but let’s assume that is a lot of work and I don’t own that code. In general, I don’t see much explicit support for optional args and things happen to work for us.freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
bored-beard-89967
05/13/2024, 2:15 PMbored-beard-89967
05/13/2024, 2:16 PMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
bored-beard-89967
05/13/2024, 2:23 PMtest_workflow
with a launchplan. I suspect it will not work because that is what caused me to investigate this in my actual setup.bored-beard-89967
05/13/2024, 2:27 PM@task
def test_task(a: int, b: int | None) -> int:
return a + b if b is not None else a
# let this be a stub from another package for example
@workflow
def test_workflow(a: int, b: int | None) -> int:
return test_task(a=a, b=b)
from flytekit.core.launch_plan import reference_launch_plan
# reference launch plan for test_workflow above
@reference_launch_plan(
project="my_project",
domain="development",
name="flyte.abc.def.test_workflow",
version="abcdef",
)
def test_workflow_lp(a: int, b: int | None = None) -> int: ...
@dynamic
def test_workflow_dynamic() -> list[int]:
rs = []
for a in range(3):
r = test_workflow_lp(a=a, b=None)
rs.append(r)
return rs
@workflow
def multiple_workflow_runs() -> list[int]:
return test_workflow_dynamic()
@freezing-airport-6809 is this what you are expecting would work?bored-beard-89967
05/13/2024, 2:28 PMWorkflow[redwood:development:flyte.redwood.featurestore.multiple_workflow_runs] failed. RuntimeExecutionError: max number of system retry attempts [51/50] exhausted. Last known status message: 0: [User] malformed dynamic workflow, caused by: Collected Errors: 3
Error 0: Code: MismatchingTypes, Node Id: dn0, Description: Variable [b] (type [union_type:{variants:{simple:NONE structure:{tag:"none"}}}]) doesn't match expected type [blob:{format:"PythonPickle"} metadata:{fields:{key:"python_class_name" value:{string_value:"int | None"}}}].
Error 1: Code: MismatchingTypes, Node Id: dn1, Description: Variable [b] (type [union_type:{variants:{simple:NONE structure:{tag:"none"}}}]) doesn't match expected type [blob:{format:"PythonPickle"} metadata:{fields:{key:"python_class_name" value:{string_value:"int | None"}}}].
Error 2: Code: MismatchingTypes, Node Id: dn2, Description: Variable [b] (type [union_type:{variants:{simple:NONE structure:{tag:"none"}}}]) doesn't match expected type [blob:{format:"PythonPickle"} metadata:{fields:{key:"python_class_name" value:{string_value:"int | None"}}}].
1: 0: [User] malformed dynamic workflow, caused by: Collected Errors: 3
Error 0: Code: MismatchingTypes, Node Id: dn0, Description: Variable [b] (type [union_type:{variants:{simple:NONE structure:{tag:"none"}}}]) doesn't match expected type [blob:{format:"PythonPickle"} metadata:{fields:{key:"python_class_name" value:{string_value:"int | None"}}}].
Error 1: Code: MismatchingTypes, Node Id: dn1, Description: Variable [b] (type [union_type:{variants:{simple:NONE structure:{tag:"none"}}}]) doesn't match expected type [blob:{format:"PythonPickle"} metadata:{fields:{key:"python_class_name" value:{string_value:"int | None"}}}].
Error 2: Code: MismatchingTypes, Node Id: dn2, Description: Variable [b] (type [union_type:{variants:{simple:NONE structure:{tag:"none"}}}]) doesn't match expected type [blob:{format:"PythonPickle"} metadata:{fields:{key:"python_class_name" value:{string_value:"int | None"}}}].
bored-beard-89967
05/13/2024, 4:07 PM@task
def test_task(a: int, b: int | None = None) -> int:
return a + b if b is not None else a
# let this be a stub from another package for example
@workflow
def test_workflow(a: int, b: int | None = None) -> int:
return test_task(a=a, b=b)
from flytekit.core.launch_plan import reference_launch_plan
@reference_launch_plan(
project="redwood",
domain="development",
name="flyte.redwood.featurestore.test_workflow",
version="multiscale_featurestore-fast-2024-05-13_16-02-37",
)
def test_workflow_lp(a: int, b: int | None = None) -> int: ...
@dynamic
def test_workflow_dynamic() -> list[int]:
rs = []
for a in range(3):
r = test_workflow_lp(a=a)
rs.append(r)
return rs
@workflow
def multiple_workflow_runs() -> list[int]:
return test_workflow_dynamic()
Where this version actually sets a default at the workflow level before building the launchplan, whereas there was no default in the previous example.bored-beard-89967
05/13/2024, 4:10 PMint | float | None
. We only need float | None
, which fixed things for us.
So, in summary,
• defaults must be set in the underlying workflow--it is not enough to set defaults in a reference launch plan
• int | float | None
was giving us issues when resolving that single input.bored-beard-89967
05/13/2024, 4:10 PM