GitHub
11/02/2023, 9:24 PMGitHub
11/02/2023, 9:24 PMGitHub
11/02/2023, 9:24 PMUser_1: Hello flyte team. I have a question regarding using persistent volumes (PVs) and persistent volume claims (PVCs) with flyte tasks and workflows. More specifically, I’m interested in using an AWS efs volume in my eks cluster as a shared mount for ReadWriteMany. I have looked at [this example](<https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/pod/pod.html>) and see the V1Volume being passed into the pod spec. I’m looking through [these docs](<https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Volume.md>) and see that you can create a V1Volume from a persistent_volume_claim . So in theory, I can just create a PV and PVC in my cluster, include them in my pod spec, and attach the spec to a flyte task. But I noticed that PVCs are namespace specific and flyte uses the project-domain namespace for tasks/workflows that are executing. Two questions.
Are PVCs the right solution here and if so, how can I dynamically create PVCs for my project-domain s? Is this something flyte could be configured to do for us or would we be responsible for ensuring any referenced PVCs and PVs exist.
What other options are available for mounting shared persistent volumes to my flyte tasks?
User_2: I have considered exactly the same problem in my team.
here are some solution I do.
you can create pvc per namespace using IaC platform like Terraform, Pulumi.
if you are using IaC , then you don't have to create each PVC per namespace manually.
or you should make shell script to build all PVC per namespace at once.
how about using other path in V1Volume? you can use NFS or hostpath to access some specific volume. these are also be provided in V1Volume (kubernetes client, indeed)
Yuvraj [Response]: You guys can use cluster_resource_manager , For creating PVC for each project-domain namespace like this <https://github.com/flyteorg/flyte/blob/master/charts/flyte-core/values-eks.yaml#L360>
[@Mike Zhong](<https://flyte-org.slack.com/team/U03BXNGV65A>)
There are two ways to mount volume in your flyte task,
First is using [pod plugin](<https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/pod/pod.html>)
Second is propeller’s pod template <https://docs.flyte.org/en/latest/deployment/cluster_config/general.html#using-default-k8s-podtemplates>
User_1: The cluster_resource_manager block is optional, but if we define namespace PVCs there, we would need to add them as new projects are created and redeploy to keep things updated. I was able to seamlessly get EFS working with flyte tasks so that was nice, just trying to figure out the best way to grant and manage access to the PV from other projects
other flyte projects (-p). Since the namespaces are {project}-{domain} and PVCs exist per namespace, any project that wants to use an efs backed PV would need the PVC created for its namespace. Please correct me if this is not correct
Ketan [Response]: That is true. But, if you are using EFS just create PVC in every namespace and then use podtemplates to mount them.
Now to provision them you can use ClusterResourceManager, or
you can use flytectl (gitops) style to create namesapces and manage resources in that namespaces through your regular CI/CD tooling. and you can have a centralized way to onboard onto Flyte - Creating a project has to go through a central way
Document :
1. How to mount shared persistent volumes to Flyte tasks
2. How to dynamically create Persistent Volume Claims (PVCs) for project-domains
flyteorg/flyteGitHub
11/02/2023, 9:24 PMGitHub
11/02/2023, 9:24 PMimage▾
image▾
my_task
here, the pane that comes up
image▾
GitHub
11/02/2023, 9:24 PMapiVersion: apps/v1
kind: Deployment
metadata:
name: flyte-proxy
labels:
app: flyte-proxy
spec:
selector:
matchLabels:
app: flyte-proxy
template:
metadata:
labels:
app: flyte-proxy
spec:
containers:
- name: proxy
image: envoyproxy/envoy:v1.21.1
args:
- envoy
- -c /etc/envoy/config.yaml
ports:
- name: http
containerPort: 8000
volumeMounts:
- name: config-volume
mountPath: /etc/envoy
volumes:
- name: config-volume
configMap:
name: flyte-proxy-config
config.yaml
admin:
access_log_path: /dev/null
address:
socket_address:
address: 127.0.0.1
port_value: 9901
static_resources:
listeners:
- address:
socket_address:
address: 0.0.0.0
port_value: 8000
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": <http://type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager|type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager>
stat_prefix: ingress_http
codec_type: AUTO
route_config:
name: local_route
virtual_hosts:
- name: backend
domains:
- "*"
routes:
- match:
path: "/"
redirect:
path_redirect: "/console/"
- match:
prefix: "/console"
route:
cluster: flyteconsole
- match:
prefix: "/__webpack_hmr"
route:
cluster: flyteconsole
- match:
prefix: "/api"
route:
cluster: flyteadmin
- match:
prefix: "/healthcheck"
route:
cluster: flyteadmin
- match:
prefix: "/v1"
route:
cluster: flyteadmin
- match:
prefix: "/login"
route:
cluster: flyteadmin
- match:
prefix: "/me"
route:
cluster: flyteadmin
- match:
prefix: "/flyteidl.service.AdminService"
route:
cluster: flyteadmin_grpc
- match:
prefix: "/openapi"
route:
cluster: flyteadmin_redoc
http_filters:
- name: envoy.filters.http.router
clusters:
- name: flyteconsole
connect_timeout: 0.25s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: flyteconsole
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: flyteconsole
port_value: 80
- name: flyteadmin
connect_timeout: 0.25s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: flyteadmin
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: flyteadmin
port_value: 80
- name: flyteadmin_grpc
connect_timeout: 0.25s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
http2_protocol_options: {}
load_assignment:
cluster_name: flyteadmin_grpc
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: flyteadmin
port_value: 81
- name: flyteadmin_redoc
connect_timeout: 0.25s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: flyteadmin_redoc
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: flyteadmin
port_value: 87
Directions
1. Create configmap in flyte namespace: kubectl -n flyte create configmap flyte-proxy-config --from-file config.yaml
2. Apply the spec: kubect -n flyte apply -f envoy.yaml
3. Port Forward: kubectl -n flyte port-forward deployment/flyte-proxy 30080:8000
4. Update ~/.flyte/config.yaml
to point to `localhost:8000"
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:24 PMGitHub
11/02/2023, 9:24 PMGitHub
11/02/2023, 9:25 PMimage▾
GitHub
11/02/2023, 9:25 PMUnauthenticated desc = token parse error [JWT_VERIFICATION_FAILED] Could not retrieve id token from metadata, caused by: rpc error: code = Unauthenticated desc = Request unauthenticated with IDToken
but this is ambiguous. For example, if the audience is misconfigured, flyteadmin logs
"Failed to parse Access Token from context. Will attempt to find IDToken. Error: invalid audience
which is far more useful for understanding misconfigured deployments and diagnosing what went wrong
Goal: What should the final outcome look like, ideally?
Flyteadmin auth failures should include more details in error message responses
Describe alternatives you've considered
N/A
Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMtf.tensor
as a native Flyte type.
API reference for serialization and deserialization of tensors:
• Serialization: https://www.tensorflow.org/api_docs/python/tf/io/serialize_tensor
• Deserialization: https://www.tensorflow.org/api_docs/python/tf/io/parse_tensor
More on TypeTransfomers can be found here.
Related PR that adds PyTorch tensor and module as Flyte types: flyteorg/flytekit#1032
flyteorg/flyteGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMpyflyte package
, run
, and register
, along with the legacy serialize
, the pyflyte code could probably use some cleanup.
Details
This ticket includes but shouldn't be construed as to be limited to the following:
• Use the same module loading code between the run
and register
commands. Since run
has to first register
, and since it also loads when listing commands in a module, it makes sense to reuse the same code.
• There's a lot helper code in the flytekit/tools
folder, and kind organized poorly. Attempt to streamline this.
• Figure out proper handling of the --pkgs
switch that exists at the top level of pyflyte
, in the register
command. Currently it just raises an error.
Misc
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMsudo flytectl demo start
to start the cluster but at the end, it tells me the connection is being refused. I saw the note about KUBECONFIG and FLTYECTL_CONFIG, but not sure if this will fix the issue or how or where to set these variables. Amy help would be greatly appreciated! I'm on an ubuntu laptop.
flyteorg/flyteGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMGitHub
11/02/2023, 9:25 PMlit
and transformed_lit
variables look identical, but are not after converting to Python value.
import string
from flytekit.core.context_manager import FlyteContext
from flytekit.core.type_engine import DictTransformer, TypeEngine
from flytekit.models import literals
def simulate_dict_transform(data: dict, python_type: type) -> dict:
t = DictTransformer()
ctx = FlyteContext.current_context()
lit = t.to_literal(ctx, data, python_type, TypeEngine.to_literal_type(python_type))
transformed_lit = literals.Literal.from_flyte_idl(lit.to_flyte_idl())
return t.to_python_value(ctx, transformed_lit, python_type)
data = {k: v for v, k in enumerate(string.ascii_lowercase)}
transformed = simulate_dict_transform(data, dict[str, int])
print(data.keys())
print(transformed.keys())
assert list(data.keys()) == list(transformed.keys())
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMThe Getting Started example executes a workflow locally using the default Docker image in the sandbox server, right? But I thought that the code was required to be inside the container so that Flyte could run it. Does pyflyte run an implicit fast serialization? If not, what is it doing?@Ketan (kumare3) 's (abridged) answer:
talks now (using flyteremote) to Flyteadmin to get a signedURL which is bound to the compressed archive of the code.pyflyte
This is whereuploads the code (we wanted to make things easier for the user)pyflyte
You can supply an alternate image, if not, it uses a default image with only flytekit and pandas/numpy installed in it.References: https://flyte-org.slack.com/archives/CNMKCU6FR/p1652968591652399 https://docs.flyte.org/en/latest/getting_started/index.html#executing-workflows-on-a-flyte-cluster Are you sure this issue hasn't been raised already? ☑︎ Yes Have you read the Code of Conduct? ☑︎ Yes flyteorg/flyte
GitHub
11/02/2023, 9:25 PMPlatform defaults are the defaults when no task level requests/limits are set.
If only one task level request/limit is set AND that request is under the platform limit both values will be set to whichever value was set in the task.
If only one task level request/limit is set AND that request is over the platform limit the unset value is set to the platform limit.For full details, see the Slack thread below. References: https://flyte-org.slack.com/archives/C03CY9S9MFE/p1652970945750369 https://docs.flyte.org/en/latest/concepts/admin.html#task-resource-defaults Are you sure this issue hasn't been raised already? ☑︎ Yes Have you read the Code of Conduct? ☑︎ Yes flyteorg/flyte
GitHub
11/02/2023, 9:25 PMimage▾
GitHub
11/02/2023, 9:25 PMINFO[0000] [0] Couldn't find a config file []. Relying on env vars and pflags.
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0xfef5a3]
goroutine 1 [running]:
<http://github.com/flyteorg/flytectl/cmd/core.CommandContext.AdminClient(...)|github.com/flyteorg/flytectl/cmd/core.CommandContext.AdminClient(...)>
/home/runner/work/flytectl/flytectl/cmd/core/cmd_ctx.go:48
<http://github.com/flyteorg/flytectl/cmd/create.createProjectsCommand({0x23461e8|github.com/flyteorg/flytectl/cmd/create.createProjectsCommand({0x23461e8>, 0xc000050110}, {0x1ffffffffffffff?, 0x39?, 0x8?}, {0x0, {0x0, 0x0}, {0x0, 0x0}, ...})
/home/runner/work/flytectl/flytectl/cmd/create/project.go:62 +0xe3
<http://github.com/flyteorg/flytectl/cmd/core.generateCommandFunc.func1(0xc000454c80|github.com/flyteorg/flytectl/cmd/core.generateCommandFunc.func1(0xc000454c80>?, {0xc00027e800, 0x0, 0x8})
/home/runner/work/flytectl/flytectl/cmd/core/cmd.go:65 +0x625
<http://github.com/spf13/cobra.(*Command).execute(0xc000454c80|github.com/spf13/cobra.(*Command).execute(0xc000454c80>, {0xc00027e780, 0x8, 0x8})
/home/runner/go/pkg/mod/github.com/spf13/cobra@v1.1.3/command.go:852 +0x67c
<http://github.com/spf13/cobra.(*Command).ExecuteC(0xc00055b900)|github.com/spf13/cobra.(*Command).ExecuteC(0xc00055b900)>
/home/runner/go/pkg/mod/github.com/spf13/cobra@v1.1.3/command.go:960 +0x39c
<http://github.com/spf13/cobra.(*Command).Execute(...)|github.com/spf13/cobra.(*Command).Execute(...)>
/home/runner/go/pkg/mod/github.com/spf13/cobra@v1.1.3/command.go:897
<http://github.com/flyteorg/flytectl/cmd.ExecuteCmd()|github.com/flyteorg/flytectl/cmd.ExecuteCmd()>
/home/runner/work/flytectl/flytectl/cmd/root.go:134 +0x1e
main.main()
/home/runner/work/flytectl/flytectl/main.go:12 +0x1d
{
"App": "flytectl",
"Build": "c06589e",
"Version": "0.5.25",
"BuildTime": "2022-05-17 18:15:50.341414608 -0400 EDT m=+0.024193929"
}{
"App": "controlPlane",
"Build": "072ce74",
"Version": "v1.1.2",
"BuildTime": "2022-05-16 16:49:17.16516979 +0000 UTC m=+0.185367550"
}
Expected behavior
A nice error message telling the user to run config init
maybe?
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PM--await-first-task
parameter that would wait for the first task to succeed before exiting the CLI. An indicator showing that the job was successful pushed to remote and is in running state would also be useful.
For #2, I use something like this currently, and would love for this functionality to be added to flyte remote!
import time
from flytekit.clients.friendly import SynchronousFlyteClient
from flytekit.models.core.execution import NodeExecutionPhase
from flytekit.models.node_execution import NodeExecution
from flytekit.remote.executions import FlyteWorkflowExecution
from flyte_common.remote.remote import get_flyte_client
from flyte_common.remote.ui.status import status
class NodeMonitor:
"""Class for monitoring nodes in a workflow execution."""
client: SynchronousFlyteClient = get_flyte_client()
def __init__(self, workflow_execution: FlyteWorkflowExecution):
self.workflow_execution = workflow_execution
@status(message="Validating spec on Flyte")
def wait_for_first_node_to_complete(self) -> None:
"""Wait for first node to complete before returning.
Note: this is currently an experimental feature we've added to help validate
specs remotely and notify user of any spec errors. It is recommended to have a
lightweight first task so that your workflow will fail fast if there are problems
with the spec.
"""
first_node = self.get_first_node()
while True:
node = self.client.get_node_execution(first_node.id)
status_string = NodeExecutionPhase.enum_to_string(node.closure.phase)
if status_string in ["FAILED", "FAILING", "ABORTED", "TIMED_OUT"]:
raise ValueError(
f"Spec validation failed. {node.closure.error.message}"
)
if status_string in ["SUCCEEDING", "SUCCEEDED"]:
return
time.sleep(1) # Slow down requests to Flyte
def get_first_node(self) -> NodeExecution:
"""Return the first node part of a workflow execution."""
# Note: this functionality is not on current `FlyteRemote`
while True:
nodes = self.client.list_node_executions(
self.workflow_execution.id,
sort_by=Sort(key="started_at", direction="ASCENDING"),
)[0]
if len(nodes) > 1:
# Wait until first node after "start-node"
return nodes[0]
time.sleep(1) # Slow down requests to Flyte
Describe alternatives you've considered
N/A
Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMremote.fetch
a task or workflow or launch plan where one of the inputs is a StructuredDataset, and then try to execute it, flytekit will try to "guess" the interface for that structured dataset input and the type that it will come up with is the Python/flytekit StructuredDataset
class. This is correct, but when we go and try to create the execution, we need to translate the dataframe from a pd.DataFrame or whatever instance into a StructuredDataset Literal. Since flytekit thinks the type annotation is a Python StructuredDataset, it will try to look it up in the list of formats/encoders it has and fail because it's not a real dataframe type.
An example stack trace:
Traceback (most recent call last):
File "/Users/nielsbantilan/miniconda3/envs/unionml/bin/unionml", line 33, in <module>
sys.exit(load_entry_point('unionml', 'console_scripts', 'unionml')())
File "/Users/nielsbantilan/miniconda3/envs/unionml/lib/python3.9/site-packages/click/core.py", line 1130, in __call__
return self.main(*args, **kwargs)
File "/Users/nielsbantilan/miniconda3/envs/unionml/lib/python3.9/site-packages/click/core.py", line 1055, in main
rv = self.invoke(ctx)
File "/Users/nielsbantilan/miniconda3/envs/unionml/lib/python3.9/site-packages/click/core.py", line 1657, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/nielsbantilan/miniconda3/envs/unionml/lib/python3.9/site-packages/click/core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/nielsbantilan/miniconda3/envs/unionml/lib/python3.9/site-packages/click/core.py", line 760, in invoke
return __callback(*args, **kwargs)
File "/Users/nielsbantilan/miniconda3/envs/unionml/lib/python3.9/site-packages/typer/main.py", line 500, in wrapper
return callback(**use_params) # type: ignore
File "/Users/nielsbantilan/git/unionml/unionml/cli.py", line 99, in predict
predictions = model.remote_predict(app_version, model_version, wait=True, **prediction_inputs)
File "/Users/nielsbantilan/git/unionml/unionml/model.py", line 535, in remote_predict
execution = self._remote.execute(
File "/Users/nielsbantilan/miniconda3/envs/unionml/lib/python3.9/site-packages/flytekit/remote/remote.py", line 796, in execute
return self.execute_remote_wf(
File "/Users/nielsbantilan/miniconda3/envs/unionml/lib/python3.9/site-packages/flytekit/remote/remote.py", line 889, in execute_remote_wf
return self.execute_remote_task_lp(
File "/Users/nielsbantilan/miniconda3/envs/unionml/lib/python3.9/site-packages/flytekit/remote/remote.py", line 862, in execute_remote_task_lp
return self._execute(
File "/Users/nielsbantilan/miniconda3/envs/unionml/lib/python3.9/site-packages/flytekit/remote/remote.py", line 658, in _execute
lit = TypeEngine.to_literal(ctx, v, hint, variable.type)
File "/Users/nielsbantilan/miniconda3/envs/unionml/lib/python3.9/site-packages/flytekit/core/type_engine.py", line 696, in to_literal
lv = transformer.to_literal(ctx, python_val, python_type, expected)
File "/Users/nielsbantilan/miniconda3/envs/unionml/lib/python3.9/site-packages/flytekit/types/structured/structured_dataset.py", line 486, in to_literal
fmt = self.DEFAULT_FORMATS[python_type]
KeyError: <class 'flytekit.types.structured.structured_dataset.StructuredDataset'>
We need to improve the erroring/experience around this. Potential things include:
• Asking the user to provide a StructuredDataset instance instead of the raw dataframe?
• Continuing to error but informing the user to provide the type_hints map.
Misc
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:25 PMv: latest
which contains latest github code state. We are installing flyte through pypi, so we only have access to the latest stable push, which means we sometimes see incorrect docs. It's also unlikely we'll always be able to stay on the latest version, so being able to see fixed versions of the documentation would be incredibly useful.
flyteorg/flyteGitHub
11/02/2023, 9:25 PM