Victor Churikov
06/15/2023, 12:50 PMif
in my @workflow to conditionally run these tasks is not supported because of limitations related to DAG and/or serialization
I tried with a conditional (from flytekit import conditional
), when I do conditional('somename').if_(mybool).then(mytask(myinputs=myinputs))
it ignores my conditional with no representation in the graph UI.
When I add an .else_().then(anytask())
it gets represented in graphs UI but errors with AttributeError: 'FlyteBranchNode' object has no attribute 'interface'
after the run already begins
I tried with @task
and with @dynamic
What is the correct way to conditionally run these multiple tasks based an a boolean output of another task? Note that these tasks also have dependencies on each other, one of them produces outputs and the rest receive it as inputMalleshwari Sri Perumbudur
06/15/2023, 3:25 PMTim Sheiner
06/15/2023, 9:12 PMMarti Jorda Roca
06/16/2023, 11:18 AMRobert Ambrus
06/16/2023, 3:19 PMpyflyte
like this:
pyflyte run -i <corporate_docker_registry>/<prefix>/flyte-dbx-demo:0.0.1 --remote --destination-dir . dbx_example_job_cluster.py my_databricks_job
Job successfully submitted to Databricks, but the cluster creation failed because the IMAGE could not be pulled from our corporate docker registry (due to missing credentials).
Analyzed the audit logs and found that the docker image config looks like this (_basic_auth_ block is missing):
"docker_image": {
"url": "<corporate_docker_registry>/<prefix>/flyte-dbx-demo:0.0.1",
}
I'm assuming that if we add imagePullSecrets
to our service account as described here, Flyte will pass the Docker credentials to Databricks in job definition like this:
"docker_image": {
"url": "<corporate_docker_registry>/<prefix>/flyte-dbx-demo:0.0.1",
"basic_auth": {
"username": <user>,
"password": <token>
}
}
Can you please confirm? (Please note that I'm only experiencing this issue when trying to run a job on a _new_cluster,_ I was able to successfully complete a job on an _existing__cluster.)
cc @Evan Sadler @Kevin SuLen Strnad
06/16/2023, 3:53 PMkftensorflow
plugin a try and struggling to setup correctly. I don’t see issues on github, so I figured I would try here to see if anyone can catch anything. I am on flytepropeller 1.6.1 and flytekit 1.7.0. Details in 🧵Ksenia Semenova
06/16/2023, 4:39 PMRezwan Abir
06/16/2023, 6:40 PMpyflyte run
is used)
in ingest_flyte.py we have from pdi_flyte.flyte.workflows.constants
import ConflictActionList`
however after packaging and deploying I'm getting ModuleNotFoundError: No module named 'pdi_flyte`
any ideas?seunggs
06/17/2023, 2:48 AMSuper Bo
06/18/2023, 3:49 PMFailed to get data from \<ns3://meta-bucket/flytesnacks/development/HF4IUC4ZOJ77UBKLLP77H4SY24======/fastf>\na252ae25deeecc9b853db87173e84cb.tar.gz
to /root/ (recursive=False).\n\nOriginal exception: Access Denied\n"
seunggs
06/18/2023, 10:27 PM"['\\ufeffID'] not found in axis"
The original code is:
df.drop([r"\ufeffID"], axis=1, inplace=True)
So it looks like another escape character is added during execution?Richard Li
06/19/2023, 9:00 AMfrom flytekit import task, workflow, ImageSpec
image_spec = ImageSpec(packages=["numpy", "tensorflow"], base_image="<http://container-image-registry.corp.linkedin.com:8083/lps-image/linkedin/mldev-workspace/mldev-base-image:0.0.16|container-image-registry.corp.linkedin.com:8083/lps-image/linkedin/mldev-workspace/mldev-base-image:0.0.16>", pip_index="<https://lerna.tools.corp.linkedin.com/pypi/simple/>", registry="localhost:30000")
@task(container_image=image_spec)
def t1() -> str:
import numpy as np
print("task 1")
print(np.array([1,2,3]))
import tensorflow as tf
print("task 2")
print(tf.constant(8))
return("blah1")
@workflow
def wf():
t1()
Then I run:
pyflyte run --remote example.py wf
Error:
Failed with Unknown Exception <class 'Exception'> Reason: failed to build the imageSpec at /var/folders/9v/3dl8rh2x0nx9ccbv3xkd708h002_42/T/flyte-mktmp10q/sandbox/local_flytekit/4e7f726c4f5a035e23addb662302e8e1/build.envd with error b'error: failed to do request: Head "<http://localhost:30000/v2/flytekit/blobs/sha256:a0e77e7122fc5edfccf28327cc28f2c15e34ee2ec76fa0c7486a6679c014a134>": dial tcp 127.0.0.1:30000: connect: connection refused\n'
Richard Li
06/19/2023, 9:01 AMJakub Peschel
06/19/2023, 9:02 AM(venv) jpeschel@kinnan:~/Workplace/flyte-demo$ pyflyte run flytedemo.py training_workflow --hyperparameters '{"C": 0.1}'
LogisticRegression(C=0.1, max_iter=3000)
But if I start new local cluster:
(venv) jpeschel@kinnan:~/Workplace/flyte-demo$ ./bin/flytectl demo start
INFO[0000] [0] Couldn't find a config file []. Relying on env vars and pflags.
🧑🏭 Bootstrapping a brand new flyte cluster... 🔨 🔧
delete existing sandbox cluster [y/n]:
y
🐋 Going to use Flyte v1.7.0 release with image <http://cr.flyte.org/flyteorg/flyte-sandbox-bundled:sha-1ae254f8683699b68ecddc89d775fc5d39cc3d84|cr.flyte.org/flyteorg/flyte-sandbox-bundled:sha-1ae254f8683699b68ecddc89d775fc5d39cc3d84>
🐋 pulling docker image for release <http://cr.flyte.org/flyteorg/flyte-sandbox-bundled:sha-1ae254f8683699b68ecddc89d775fc5d39cc3d84|cr.flyte.org/flyteorg/flyte-sandbox-bundled:sha-1ae254f8683699b68ecddc89d775fc5d39cc3d84>
🧑🏭 booting Flyte-sandbox container
Waiting for cluster to come up...
Waiting for cluster to come up...
Waiting for cluster to come up...
Waiting for cluster to come up...
Waiting for cluster to come up...
Waiting for cluster to come up...
context flyte-sandbox already exist. Overwriting it
context modified for "flyte-sandbox" and switched over to it.
+-----------------------------------+---------------+-----------+
| SERVICE | STATUS | NAMESPACE |
+-----------------------------------+---------------+-----------+
| k8s: This might take a little bit | Bootstrapping | |
+-----------------------------------+---------------+-----------+
+-----------------------------------+---------------+-----------+
| SERVICE | STATUS | NAMESPACE |
+-----------------------------------+---------------+-----------+
I don't get the expected output from Getting started page and when I try to send the workflow on the cluster I get this error:
(venv) jpeschel@kinnan:~/Workplace/flyte-demo$ pyflyte run --remote flytedemo.py training_workflow --hyperparameters '{"C": 0.1}'
Failed with Exception Code: SYSTEM:Unknown
RPC Failed, with Status: StatusCode.UNAVAILABLE
details: failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:30080: Socket closed
Debug string UNKNOWN:failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:30080: Socket closed {created_time:"2023-06-19T09:17:08.725217881+02:00", grpc_status:14}
I tried to check whether the problem is caused by closed ports but netstat showed that port is opened:
(venv) jpeschel@kinnan:~/Workplace/flyte-demo$ sudo netstat -ntlp
[sudo] password for jpeschel:
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:6443 0.0.0.0:* LISTEN 101329/docker-proxy
tcp 0 0 127.0.0.53:53 0.0.0.0:* LISTEN 3430/systemd-resolv
tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN 4241/cupsd
tcp 0 0 0.0.0.0:30080 0.0.0.0:* LISTEN 101277/docker-proxy
tcp 0 0 0.0.0.0:30001 0.0.0.0:* LISTEN 101302/docker-proxy
tcp 0 0 0.0.0.0:30000 0.0.0.0:* LISTEN 101316/docker-proxy
tcp 0 0 0.0.0.0:30002 0.0.0.0:* LISTEN 101290/docker-proxy
tcp6 0 0 ::1:631 :::* LISTEN 4241/cupsd
tcp6 0 0 127.0.0.1:63342 :::* LISTEN 15418/java
As well as ping from nmap:
(venv) jpeschel@kinnan:~/Workplace/flyte-demo$ nmap -p 30080 127.0.0.1
Starting Nmap 7.80 ( <https://nmap.org> ) at 2023-06-19 10:06 CEST
Nmap scan report for localhost (127.0.0.1)
Host is up (0.000080s latency).
PORT STATE SERVICE
30080/tcp open unknown
I am at the ubuntu 22.0.4.2 LTS, I have 11th Gen Intel® Core™ i7-11850H @ 2.50GHz × 16 and 32GiB of memory, which should be more than sufficient for this demo.
Is there something that I didn't do that is required?Victor Churikov
06/19/2023, 9:14 AMAttributeError: 'FlyteBranchNode' object has no attribute 'interface'
(it does not abort my run on flyte remote, so it is tolerable)Erik Dao
06/19/2023, 9:39 AMweather_data_op = NamedTuple(
"WeatherDataOP",
temperature=float,
humidity=float,
wind_speed=float
)
@task(cache=False, limits=Resources(cpu="1", mem="2Gi", ephemeral_storage="2Gi"))
def fetch_weather_data(latitude: float, longitude: float) -> weather_data_op:
api_key = os.environ["WEATHER_API_KEY"]
url = f"<http://api.openweathermap.org/data/2.5/weather?lat={latitude}&lon={longitude}&appid={api_key}>"
res = requests.get(url)
if not (res.status_code == 200):
return None
response = res.json()
return weather_data_op(
temperature=float(response['main']['temp']),
humidity=float(response['main']['humidity']),
wind_speed=float(response['wind']['speed']),
)
Depending on the input longitude and latitude, the weather API might return None.
When I execute this task for the inputs that return in None, Flyte will complain
╭─────────────────────────────────────── Traceback (most recent call last) ───────────────────────────────────────╮
│ in <module>:4 │
│ │
│ ❱ 4 fetch_weather_data(latitude=1237.0, longitude=-154.0) │
│ │
│ /opt/bitnami/jupyterhub-singleuser/.local/lib/python3.8/site-packages/flytekit/core/base_task.py:304 in │
│ __call__ │
│ │
│ ❱ 304 │ │ return flyte_entity_call_handler(self, *args, **kwargs) # type: ignore │
│ │
│ /opt/bitnami/jupyterhub-singleuser/.local/lib/python3.8/site-packages/flytekit/core/promise.py:1114 in │
│ flyte_entity_call_handler │
│ │
│ ❱ 1114 │ │ │ result = cast(LocallyExecutable, entity).local_execute(child_ctx, **kwargs) │
│ │
│ /opt/bitnami/jupyterhub-singleuser/.local/lib/python3.8/site-packages/flytekit/core/base_task.py:285 in │
│ local_execute │
│ │
│ ❱ 285 │ │ │ outputs_literal_map = self.sandbox_execute(ctx, input_literal_map) │
│ │
│ /opt/bitnami/jupyterhub-singleuser/.local/lib/python3.8/site-packages/flytekit/core/base_task.py:351 in │
│ sandbox_execute │
│ │
│ ❱ 351 │ │ return self.dispatch_execute(ctx, input_literal_map) │
│ │
│ /opt/bitnami/jupyterhub-singleuser/.local/lib/python3.8/site-packages/flytekit/core/base_task.py:569 in │
│ dispatch_execute │
│ │
│ ❱ 569 │ │ │ │ │ expected_output_names[i]: native_outputs[i] for i, _ in enumerate(na │
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
TypeError: 'NoneType' object is not iterable
So I’m wondering what is a proper way to write a Flyte task that optionally returns None?Robert Ambrus
06/19/2023, 2:06 PMpyflyte
. For some reason, when I add policy_id
to the cluster config, the Flyte job fails with an error like this:Robert Ambrus
06/19/2023, 2:07 PMDatabricks Console
link looks like this:
https://<workspace_id>.<http://cloud.databricks.com/#job//run/%3Cnil%3E|cloud.databricks.com/#job//run/%3Cnil%3E>
Robert Ambrus
06/19/2023, 2:11 PMkubectl
, but probably non-admin
user cannot access kubectl
. Is there any other way to get access to the logs?Chandrashekhar Kotekar
06/20/2023, 10:42 AMdemo
cluster in my local / system / laptop / machine but it is failing with below error:
{"status":"Status: Downloaded newer image for <http://cr.flyte.org/flyteorg/flyte-sandbox-bundled:sha-1ae254f8683699b68ecddc89d775fc5d39cc3d84%22|cr.flyte.org/flyteorg/flyte-sandbox-bundled:sha-1ae254f8683699b68ecddc89d775fc5d39cc3d84">}
🧑🏭 booting Flyte-sandbox container
😬 Something went wrong: Failed to start Sandbox container 🐋, Please check your docker client and try again.
Error: Error response from daemon: driver failed programming external connectivity on endpoint flyte-sandbox (5eaf6609c86e6d818291e2f060e42f8002dfd4e288393da6e01109cfa5a60bd7): listen tcp4 0.0.0.0:6443: bind: address already in use
I am on a Mac machine and I have a Docker Desktop which allows me to have locak Kubernetes to play with. As per this documentation of Kubernetes, 6443
port is used by Kubernetes API server
. May I please know if and how I can change the port number for this flyte-sandbox?Fran Jurinec
06/20/2023, 10:57 AMChandrashekhar Kotekar
06/20/2023, 10:59 AMflytectl config validate
command, although I have config-sandbox-yaml
and config.yaml
files in ~/.flyte
directory
Couldn't find a config file.
Validated config file successfully.
Nuno Martins
06/20/2023, 1:52 PMNandakumar Raghu
06/20/2023, 3:08 PMpyflyte run
or python -m
but, I am getting this error -
2023-06-20 17:04:51,354876 ERROR {"asctime": "2023-06-20 17:04:51,354", "name": "flytekit", "levelname": base_task.py:587
"ERROR", "message": "Failed to convert outputs of task 'read_dataset' at
position 0:\n [Errno 28] Error writing bytes to file. Detail: [errno
28] No space left on device"}
Is there a virtual directory or disk flyte creates when running on a local machine which I can purge? The data for training is being pulled from s3.David Espejo (he/him)
06/20/2023, 3:53 PMGeert
06/20/2023, 6:47 PMexec_spec.yaml
to git? Or rather only the launchplans? Or neither? Are there some best practices (e.g. keep them in ./exec_specs
or ./launchplans
respectively)? Maybe someone has a good example repo to look at. Thanks!Tommy Nam
06/21/2023, 4:52 PM2023/06/21 16:43:05 /go/pkg/mod/gorm.io/gorm@v1.24.1-0.20221019064659-5dd2bb482755/callbacks.go:134
[126.528ms] [rows:-] SELECT c.column_name, constraint_name, constraint_type FROM information_schema.table_constraints tc JOIN information_schema.constraint_column_usage AS ccu USING (constraint_schema, constraint_name) JOIN information_schema.columns AS c ON c.table_schema = tc.constraint_schema AND tc.table_name = c.table_name AND ccu.column_name = c.column_name WHERE constraint_type IN ('PRIMARY KEY', 'UNIQUE') AND c.table_catalog = 'app' AND c.table_schema = CURRENT_SCHEMA() AND c.table_name = 'datasets'
{"json":{"src":"start.go:174"},"level":"panic","msg":"Failed to start Admin, err: database migration failed: ERROR: relation \"description_entities\" does not exist (SQLSTATE 42P01)","ts":"2023-06-21T16:43:05Z"}
panic: (*logrus.Entry) 0xc000492230
goroutine 68 [running]:
<http://github.com/sirupsen/logrus.(*Entry).log(0xc0004921c0|github.com/sirupsen/logrus.(*Entry).log(0xc0004921c0>, 0x0, {0xc001218200, 0x7d})
/go/pkg/mod/github.com/sirupsen/logrus@v1.8.1/entry.go:259 +0x45b
<http://github.com/sirupsen/logrus.(*Entry).Log(0xc0004921c0|github.com/sirupsen/logrus.(*Entry).Log(0xc0004921c0>, 0x0, {0xc001487e68?, 0x1?, 0x1?})
/go/pkg/mod/github.com/sirupsen/logrus@v1.8.1/entry.go:293 +0x4f
<http://github.com/sirupsen/logrus.(*Entry).Logf(0xc0004921c0|github.com/sirupsen/logrus.(*Entry).Logf(0xc0004921c0>, 0x0, {0x3071945?, 0x0?}, {0xc0009c3c50?, 0x0?, 0x0?})
/go/pkg/mod/github.com/sirupsen/logrus@v1.8.1/entry.go:338 +0x85
<http://github.com/sirupsen/logrus.(*Entry).Panicf(0x3f0ca00|github.com/sirupsen/logrus.(*Entry).Panicf(0x3f0ca00>?, {0x3071945?, 0x416667?}, {0xc0009c3c50?, 0x29b5840?, 0x1?})
/go/pkg/mod/github.com/sirupsen/logrus@v1.8.1/entry.go:376 +0x34
<http://github.com/flyteorg/flytestdlib/logger.Panicf({0x3f0ca00|github.com/flyteorg/flytestdlib/logger.Panicf({0x3f0ca00>?, 0xc0009ead40?}, {0x3071945, 0x1e}, {0xc0009c3c50, 0x1, 0x1})
/go/pkg/mod/github.com/flyteorg/flytestdlib@v1.0.17/logger/logger.go:188 +0x64
<http://github.com/flyteorg/flyte/cmd/single.glob..func4.1()|github.com/flyteorg/flyte/cmd/single.glob..func4.1()>
/flyteorg/build/cmd/single/start.go:174 +0xcc
<http://golang.org/x/sync/errgroup.(*Group).Go.func1()|golang.org/x/sync/errgroup.(*Group).Go.func1()>
/go/pkg/mod/golang.org/x/sync@v0.1.0/errgroup/errgroup.go:75 +0x64
created by <http://golang.org/x/sync/errgroup.(*Group).Go|golang.org/x/sync/errgroup.(*Group).Go>
/go/pkg/mod/golang.org/x/sync@v0.1.0/errgroup/errgroup.go:72 +0xa5
Two other clusters were deployed just yesterday and never ran into this error. Hopefully it's something in terms of configuration that I've overlooked but was wondering if anybody had some insight.
EDIT: Two other clusters were using Postgres 14.6 and the one erroring out is using 14.7; could that be the source of the error?Albert Wibowo
06/21/2023, 6:16 PMJay Ganbat
06/21/2023, 8:38 PMv1.4.2
does Flytefile upload create a tmp file in the pod, i was looking at the source code and this is the data_persistence.py put
method
def put(self, from_path: str, to_path: str, recursive: bool = False):
if from_path != to_path:
if recursive:
self.copy_tree(from_path, to_path)
else:
# Emulate s3's flat storage by automatically creating directory path
self._make_local_path(os.path.dirname(self.strip_file_header(to_path)))
# Write the object to a local file in the temp local folder
copyfile(self.strip_file_header(from_path), self.strip_file_header(to_path))
it mentions writing an object to temp local dir.Nicholas Roberson
06/21/2023, 9:33 PMworkflow_a
hits a transient error, in code can we get that WorkflowExecution
object or use the FlyteRemote
class to restart it from where it left off, either in-place or as a new execution? This is aside from adding retries or anything to the task definition.