Hey I was looking to extend flytekit-java remote e...
# flytekit
b
Hey I was looking to extend flytekit-java remote execution functionality to add an SdkRemoteWorkflow that works similarly to the SdkRemoteTask. But I realised that remote workflows are not supported in flytekit(python) https://github.com/flyteorg/flytekit/blob/673e076a6a5cd33be281d962e3da8204d7dca6cd/flytekit/common/translator.py#L144-L147 ? So the only way to execute remote workflows is through LaunchPlans? Is this expected to stay this way?
Does it still work to reference a remote workflow when creating a launch plan?
y
hey @Babis Kiosidis sorry for the delay - just fyi, on the python side, those classes are all deprecated.
since those classes were all written with the legacy api in mind.
that being said the spirit of what they do remains and has been implemented in the new FlyteRemote based classes.
so in general, the execution construct in flyte pertains only to launch plans, not to workflows.
however, there is syntactic sugar in the Flyte Remote experience that makes that somewhat transparent to users.
specifically this function, which will retrieve the default launch plan for a given workflow and call execute on it
is this what you were referring to?
if not could you elaborate a bit on the user experience you’re looking for?
b
aha i see! yes i think that answers my question, thanks. Now that you mentioned FlyteRemote! We used to do something like this in the past:
Copy code
@reference_workflow(
    project="some-project",
    domain="some-domain",
    name="some-name",
    version="some-version",
)
def ref_run(a: int):
    ...

@workflow some_workflow():
    out = ref_run(a=1)
    some_other_task(in=out)
Is the following the new api equivalent?
Copy code
remote = FlyteRemote.from_config(
    default_project="some-project",
    default_domain="some-domain",
)
flyte_workflow = remote.fetch_workflow(name="some-name", version="some-version")


@workflow
def remote_workflow_example():
    out = flyte_workflow(a=1)
    some_other_task(in=out)
I saw some examples with
Copy code
remote.execute(flyte_workflow, inputs={}, wait=True)
Does this mean that the flyte_workflow will execute on a remote project, or will it execute in the code project but with a remote reference?
I am trying an example like below, but it fails during the register step, not sure if it's my code, or something else that I misconfigured:
Copy code
remote = FlyteRemote.from_config()
flyte_launch_plan = remote.fetch_launch_plan(
    project="some-project",
    domain="some-domain",
    name="some-launch_plan",
)

@workflow
def remote_workflow_example(n: int):
    remote.execute(flyte_launch_plan, inputs={}, wait=True)
Copy code
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
15:11:33.194Z     return _process_result(sub_ctx.command.invoke(sub_ctx))
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
15:11:33.194Z     return _process_result(sub_ctx.command.invoke(sub_ctx))
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
15:11:33.194Z     return ctx.invoke(self.callback, **ctx.params)
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/click/core.py", line 610, in invoke
15:11:33.194Z     return callback(*args, **kwargs)
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/click/decorators.py", line 21, in new_func
15:11:33.194Z     return f(get_current_context(), *args, **kwargs)
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/flytekit/clis/sdk_in_container/serialize.py", line 377, in workflows
15:11:33.194Z     serialize_all(
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/flytekit/common/exceptions/scopes.py", line 162, in system_entry_point
15:11:33.194Z     raise ex.value
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/flytekit/common/exceptions/scopes.py", line 203, in user_entry_point
15:11:33.194Z     return wrapped(*args, **kwargs)
15:11:33.194Z   File "/root/flyte_load/load_test.py", line 96, in remote_workflow_example
15:11:33.194Z     remote.execute(flyte_launch_plan, inputs={}, wait=True)
15:11:33.194Z   File "/usr/local/lib/python3.8/functools.py", line 912, in _method
15:11:33.194Z     return method.__get__(obj, cls)(*args, **kwargs)
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/flytekit/remote/remote.py", line 815, in _
15:11:33.194Z     return self._execute(
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/flytekit/remote/remote.py", line 726, in _execute
15:11:33.194Z     exec_id = self.client.create_execution(
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/flytekit/clients/friendly.py", line 547, in create_execution
15:11:33.194Z     super(SynchronousFlyteClient, self)
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/flytekit/clients/raw.py", line 131, in handler
15:11:33.194Z     return fn(*args, **kwargs)
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/flytekit/clients/raw.py", line 527, in create_execution
15:11:33.194Z     return self._stub.CreateExecution(create_execution_request, metadata=self._metadata)
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
15:11:33.194Z     return _end_unary_response_blocking(state, call, False, None)
15:11:33.194Z   File "/root/.venv/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
15:11:33.194Z     raise _InactiveRpcError(state)
15:11:33.194Z grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
15:11:33.194Z 	status = StatusCode.UNKNOWN
15:11:33.194Z 	details = ""
15:11:33.194Z 	debug_error_string = "{"created":"@1640092293.109857916","description":"Error received from peer ipv4:10.173.33.105:8443","file":"src/core/lib/surface/call.cc","file_line":1074,"grpc_message":"","grpc_status":2}"
15:11:33.194Z >
I am a bit confused because I would expect that these would create WorkflowNodes that can be executed remotely, but I am not sure if that's what's happening
I see also this comment, which we can discuss afterwards!
Copy code
# TODO: re-consider how this works. Currently, this will only execute the flyte entity referenced by
            # flyte_id in the same project and domain. However, it is possible to execute it in a different project
            # and domain, which is specified in the first two arguments of client.create_execution. This is useful
            # in the case that I want to use a flyte entity from e.g. project "A" but actually execute the entity on a
            # different project "B". For now, this method doesn't support this use case.
y
let me try to clarify. tasks/workflow/launchplans… they’re all executable entities on flyte. when you register them for the first time, yes you need to know specifics about them (for the task you need the image that contains the actual code, for a wf you need the structure of the wf). however, once you register it… if you want to use them in new wfs, you don’t really need to know the body of it anymore. all you need to know is the interface.
a
reference_task/wf
is a way of declaring that interface. rather it’s a way of our forcing the user to specify the interface. this does two things for us: 1) it gives us the python types - there is some type erasure that happens when we go from python to flyte, there is not a one to one mapping between types. 2) it makes it so that we don’t need to make a network call to Admin to get the interface.
when you use flyte remote, and fetch something, the object you get is a different object. For instance, locally, the inheritance structure looks something like
Copy code
object -> Task -> PythonTask -> PythonAutoContainerTask -> PythonFunctionTask
When you fetch a task, what you get is
Copy code
object -> Flyte IDL's generated core.TaskTemplate -> TaskTemplate model wrapper class -> FlyteTask
same for launch plans/workflows.
the local stuff is all callable, they all implement call. This is why you can use a workflow in another workflow. The remote stuff however is not callable, they do not implement it because we don’t have the python interface.
eventually i want to make it so that when fetching a task/wf/lp, users can assign a python interface… with that in place, we can construct a reference task/wf/lp in the background and redirect __call__s to that reference entity instead.
i don’t think there’s a ticket for this yet
in any case, that’s why you get that error when you try to use a fetched entity inside a local/python workflow.
wrt the second thing, yeah, i dunno. the main thing we care about is that it’s not confusing to users.
like fetch wf from proj foo and run in proj bar… if you want to find it in the UI afterwards, where do you go? foo or bar?
(btw, I think the answer to that is foo)
is this helpful at all?
b
yeah it's very helpful thanks. As for us, we heavily rely on using remote tasks and workflows in users' workflows and we don't expect many users to simply do remote executions. What we do is we packages various infrastructure logic in remote tasks/workflows and we package all the remote references in a small library and ship it to the users. These tasks are interacting in various "complex" ways with the infrastructure so maybe the push data to a bucket and then additionally publish some metadata to another service. Or generate different paths if the task is supposed to "write to production" or if it a test run etc. The users are not aware of the implementation details, and we have a cookiecutter that uses a few of these remote tasks/wfs inside a "user" workflow. The users register these on the platform and then execute them at a different time. So I guess since FlyteRemote is not callable, we can not use it yet. We'll have to wait for that feature
Also as for this comment:
Copy code
# TODO: re-consider how this works. Currently, this will only execute the flyte entity referenced by
            # flyte_id in the same project and domain. However, it is possible to execute it in a different project
            # and domain, which is specified in the first two arguments of client.create_execution. This is useful
            # in the case that I want to use a flyte entity from e.g. project "A" but actually execute the entity on a
            # different project "B". For now, this method doesn't support this use case.
We also build our infrastructure around this. the example you mentioned:
Copy code
User owns project bar
Fetch workflow from project foo
Run workflow in project bar
Where do you see the execution?
It almost sounds like a riddle hehe. I believe that everything should happen in project
bar
. Because bar is the project that the user controls. In GKE we configured the workloadIdentity per namespace, which means that access control happens per namespace (which makes sense). If users start executing things outside their namespace, then it's difficult to enforce access control. We rely on users executing the remote tasks in their own namespace, and their own google service account should be used and have sufficient access to execute these remote tasks. When it comes to the UI that's a detail that can be solved both ways, everything that executes in a project should be visible in that project, and If a task I built is used remotely by others, I should also have that information somehow
Hey @Yee we see the fetch_latest for the tasks have been removed from the latest flytekit[python] sdk. Has the FlyteRemote been extended to being callable?
Copy code
The remote stuff however is not callable, they do not implement it because we don't have the python interface.
cc @Sonja Ericsson who was trying to upgrade flytekit to the latest internally
s
@Yee could you also let us know how the discussion goes regarding executing a remote task in project A from a workflow in project B as this would break our auth model.
maybe its already possible to do btw
y
oh sorry. fetch_latest was removed… let me see i can add it back
but yeah i think we’ll have to make it callable.
@Sonja Ericsson when you say “break our auth model”, do you mean that you’d like to “executing a remote task in project A from a workflow in project B”? or you want to prevent that from happening?
we don’t have rbac/authz really so we can’t prevent it from happening
if you’re trying to do that now, you can use
@reference_task
s
Right I remember speaking to Haytham that this was a potential issue
y
but the thing with this is a bit cumbersome for the user because the user has to specify the signature using python types.
the issues is type erasure ofc
as in, previously registered tasks need have lost their python types, cuz we only store the flyte types.
b
The
@reference_task
does not work without a version, right?
We have packaged various
@reference_task
using the fetch_latest to populate the version, and ship them as a lightweight library
y
what are you using from fetch_latest?
just the version?
b
yes to find the version
y
i can add that back pretty easily… but of what you get back is a FlyteTask, which isn’t callale.
so if the version is the only thing you’re looking for we can add that back.
will try to have something by eod
sorry for the breakage.
b
i can show you how we use it
First i need to understand it myself 😄
Ok i think we expect that the returned object will be callable, we have this hackish decorator, that fetches the latest and casts the task again as a callable The code looks something like this
Copy code
@_reference_task(internal_fetch_latest("LookupDailyPartition").id)
    def lookup_daily_partition(
        endpoint: str, partition: _datetime.datetime
    ) -> LookupOutputs:
        ...  # pragma: no cover

----
    def internal_fetch_latest(name: str) -> _SdkTask:
        return _SdkTask.fetch_latest(
            PROJECT,
            DOMAIN,
            name,
        )

----

def reference_task(
    id: _Identifier,
) -> _Callable[[F], _ReferenceTask]:
    return _cast(
        _Callable[[F], _ReferenceTask],
        _reference_task(
            project=id.project,
            domain=id.domain,
            name=id.name,
            version=id.version,
        ),
    )
y
so it should already fetch the latest if you don’t specify the version.
👍 1
is this only for tasks? or for fetched workflows and launch plans as well?
if only tasks, would you be willing to try something for me? i can cut a beta release of flytekit tomorrow with a patch just for tasks.
workflows is more complicated because they can be nested with further workflows
b
we have the same logic implemented accompanied by a custom FlyteEngineFactory to do the same for workflows/launch plans as well. It was not that complicated to implement this logic for us, because we didnt fetch the nodes or anything deeper, only put together the reference of the parent workflows/launch plas. Let us know and we can try for sure. It's a bit unclear right now if we can bump our flyte versions to the latest, because we heavily rely on this functionality.
y
unf the newest has cool upgrades to other stuff as well.
i think i can get out task/launch plan today, will cut a release by tonight
will work on the functionality to change workflows next week, hopefully it won’t take too long
b
awesome thanks! let us know if we can test somehow
y
yes please!
i’ll make a beta release
173 Views