Babis Kiosidis
12/20/2021, 2:08 PMYee
Babis Kiosidis
12/21/2021, 11:12 AM@reference_workflow(
project="some-project",
domain="some-domain",
name="some-name",
version="some-version",
)
def ref_run(a: int):
...
@workflow some_workflow():
out = ref_run(a=1)
some_other_task(in=out)
Is the following the new api equivalent?
remote = FlyteRemote.from_config(
default_project="some-project",
default_domain="some-domain",
)
flyte_workflow = remote.fetch_workflow(name="some-name", version="some-version")
@workflow
def remote_workflow_example():
out = flyte_workflow(a=1)
some_other_task(in=out)
remote.execute(flyte_workflow, inputs={}, wait=True)
Does this mean that the flyte_workflow will execute on a remote project, or will it execute in the code project but with a remote reference?remote = FlyteRemote.from_config()
flyte_launch_plan = remote.fetch_launch_plan(
project="some-project",
domain="some-domain",
name="some-launch_plan",
)
@workflow
def remote_workflow_example(n: int):
remote.execute(flyte_launch_plan, inputs={}, wait=True)
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
15:11:33.194Z return _process_result(sub_ctx.command.invoke(sub_ctx))
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
15:11:33.194Z return _process_result(sub_ctx.command.invoke(sub_ctx))
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
15:11:33.194Z return ctx.invoke(self.callback, **ctx.params)
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/click/core.py", line 610, in invoke
15:11:33.194Z return callback(*args, **kwargs)
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/click/decorators.py", line 21, in new_func
15:11:33.194Z return f(get_current_context(), *args, **kwargs)
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/flytekit/clis/sdk_in_container/serialize.py", line 377, in workflows
15:11:33.194Z serialize_all(
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/flytekit/common/exceptions/scopes.py", line 162, in system_entry_point
15:11:33.194Z raise ex.value
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/flytekit/common/exceptions/scopes.py", line 203, in user_entry_point
15:11:33.194Z return wrapped(*args, **kwargs)
15:11:33.194Z File "/root/flyte_load/load_test.py", line 96, in remote_workflow_example
15:11:33.194Z remote.execute(flyte_launch_plan, inputs={}, wait=True)
15:11:33.194Z File "/usr/local/lib/python3.8/functools.py", line 912, in _method
15:11:33.194Z return method.__get__(obj, cls)(*args, **kwargs)
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/flytekit/remote/remote.py", line 815, in _
15:11:33.194Z return self._execute(
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/flytekit/remote/remote.py", line 726, in _execute
15:11:33.194Z exec_id = self.client.create_execution(
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/flytekit/clients/friendly.py", line 547, in create_execution
15:11:33.194Z super(SynchronousFlyteClient, self)
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/flytekit/clients/raw.py", line 131, in handler
15:11:33.194Z return fn(*args, **kwargs)
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/flytekit/clients/raw.py", line 527, in create_execution
15:11:33.194Z return self._stub.CreateExecution(create_execution_request, metadata=self._metadata)
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
15:11:33.194Z return _end_unary_response_blocking(state, call, False, None)
15:11:33.194Z File "/root/.venv/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
15:11:33.194Z raise _InactiveRpcError(state)
15:11:33.194Z grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
15:11:33.194Z status = StatusCode.UNKNOWN
15:11:33.194Z details = ""
15:11:33.194Z debug_error_string = "{"created":"@1640092293.109857916","description":"Error received from peer ipv4:10.173.33.105:8443","file":"src/core/lib/surface/call.cc","file_line":1074,"grpc_message":"","grpc_status":2}"
15:11:33.194Z >
# TODO: re-consider how this works. Currently, this will only execute the flyte entity referenced by
# flyte_id in the same project and domain. However, it is possible to execute it in a different project
# and domain, which is specified in the first two arguments of client.create_execution. This is useful
# in the case that I want to use a flyte entity from e.g. project "A" but actually execute the entity on a
# different project "B". For now, this method doesn't support this use case.
Yee
reference_task/wf
is a way of declaring that interface. rather it’s a way of our forcing the user to specify the interface. this does two things for us: 1) it gives us the python types - there is some type erasure that happens when we go from python to flyte, there is not a one to one mapping between types. 2) it makes it so that we don’t need to make a network call to Admin to get the interface.object -> Task -> PythonTask -> PythonAutoContainerTask -> PythonFunctionTask
When you fetch a task, what you get is
object -> Flyte IDL's generated core.TaskTemplate -> TaskTemplate model wrapper class -> FlyteTask
Babis Kiosidis
12/22/2021, 7:30 AM# TODO: re-consider how this works. Currently, this will only execute the flyte entity referenced by
# flyte_id in the same project and domain. However, it is possible to execute it in a different project
# and domain, which is specified in the first two arguments of client.create_execution. This is useful
# in the case that I want to use a flyte entity from e.g. project "A" but actually execute the entity on a
# different project "B". For now, this method doesn't support this use case.
We also build our infrastructure around this.
the example you mentioned:
User owns project bar
Fetch workflow from project foo
Run workflow in project bar
Where do you see the execution?
It almost sounds like a riddle hehe.
I believe that everything should happen in project bar
. Because bar is the project that the user controls.
In GKE we configured the workloadIdentity per namespace, which means that access control happens per namespace (which makes sense).
If users start executing things outside their namespace, then it's difficult to enforce access control.
We rely on users executing the remote tasks in their own namespace, and their own google service account should be used and have sufficient access to execute these remote tasks.
When it comes to the UI that's a detail that can be solved both ways, everything that executes in a project should be visible in that project, and If a task I built is used remotely by others, I should also have that information somehowThe remote stuff however is not callable, they do not implement it because we don't have the python interface.
Sonja Ericsson
02/10/2022, 1:53 PMYee
@reference_task
…Sonja Ericsson
02/10/2022, 5:04 PMYee
Babis Kiosidis
02/10/2022, 5:08 PM@reference_task
does not work without a version, right?@reference_task
using the fetch_latest to populate the version, and ship them as a lightweight libraryYee
Babis Kiosidis
02/10/2022, 5:11 PMYee
Babis Kiosidis
02/10/2022, 5:12 PM@_reference_task(internal_fetch_latest("LookupDailyPartition").id)
def lookup_daily_partition(
endpoint: str, partition: _datetime.datetime
) -> LookupOutputs:
... # pragma: no cover
----
def internal_fetch_latest(name: str) -> _SdkTask:
return _SdkTask.fetch_latest(
PROJECT,
DOMAIN,
name,
)
----
def reference_task(
id: _Identifier,
) -> _Callable[[F], _ReferenceTask]:
return _cast(
_Callable[[F], _ReferenceTask],
_reference_task(
project=id.project,
domain=id.domain,
name=id.name,
version=id.version,
),
)
Yee
Babis Kiosidis
02/11/2022, 8:45 AMYee
Babis Kiosidis
02/11/2022, 4:59 PMYee