cuddly-jelly-27016
12/11/2024, 4:54 PM@task
def task0():
return None
@workflow
def workflow0():
return task0()
@dynamic(node_dependency_hints=[workflow0])
def dynamic0():
return workflow0()
@workflow
def workflow1():
return dynamic0()
bazel run example -- remote register dynamic0
fails with
RPC Failed, with Status: StatusCode.NOT_FOUND
Details: missing entity of type TASK with identifier project:"flyteexamples" domain:"development" name:"<redacted>.workflow.workflow0" version:"L6mEsnIJmUVT84b059kVJA"
However
bazel run example -- remote register workflow1
works as expected
Expected behavior
bazel run example -- remote register dynamic0
Should register the workflow successfully
Additional context to reproduce
The cause
The registration code assumes that the last entity registered is the top level one
# concurrent register
cp_task_entity_map = OrderedDict(filter(lambda x: isinstance(x[1], task_models.TaskSpec), m.items()))
tasks = []
loop = asyncio.get_event_loop()
for entity, cp_entity in cp_task_entity_map.items():
tasks.append(
loop.run_in_executor(
None,
functools.partial(self.raw_register, cp_entity, serialization_settings, version, og_entity=entity),
)
)
identifiers_or_exceptions = []
identifiers_or_exceptions.extend(await asyncio.gather(*tasks, return_exceptions=True))
# Check to make sure any exceptions are just registration skipped exceptions
for ie in identifiers_or_exceptions:
if isinstance(ie, RegistrationSkipped):
<http://logger.info|logger.info>(f"Skipping registration... {ie}")
continue
if isinstance(ie, Exception):
raise ie
# serial register
cp_other_entities = OrderedDict(filter(lambda x: not isinstance(x[1], task_models.TaskSpec), m.items()))
for entity, cp_entity in cp_other_entities.items():
identifiers_or_exceptions.append(
self.raw_register(cp_entity, serialization_settings, version, og_entity=entity)
)
return identifiers_or_exceptions[-1]
https://github.com/flyteorg/flytekit/blob/8fdd0c68f10b9f08cd65aa74ef90f0d9af564dd2/flytekit/remote/remote.py#L890
However this assumption no longer holds when registering a task with a node_dependency_hint
on a workflow. I guess we need to switch to retrieving the top level registered entity by ID instead of assuming.
Screenshots
No response
Are you sure this issue hasn't been raised already?
• Yes
Have you read the Code of Conduct?
• Yes
flyteorg/flytecuddly-jelly-27016
12/11/2024, 4:54 PM