Hi All, Our team's working on Flyte + Databricks ...
# ask-the-community
r
Hi All, Our team's working on Flyte + Databricks integration in an Enterprise workspace, struggling with these issues: https://github.com/flyteorg/flyte/issues/3855 https://github.com/flyteorg/flyte/issues/3853 We are happy to contribute, just need some guidance on how to localize the issue.
y
@Kevin Su when you’re back up, any ideas.
@Frank Shen do you have any ideas? you’ve used this at least briefly right?
f
Hi @Yee, I haven’t used databricks tasks in flyte.
k
looking
@Robert Ambrus Did you see any error in the propeller pod while running databricks task?
btw, could you try to send a get request to dbx by using curl? like
Copy code
curl --netrc --request GET --header "Authorization: Bearer $DATABRICKS_TOEN" \
'<https://dbc-32fcad04-13c2.cloud.databricks.com/api/2.0/jobs/runs/get?run_id=306>'
I want to see the json response. databricks probably updated their response spec, so flyte can’t properly parse the response.
r
@Kevin Su sure, let me check
k
loader must define exec_module() when running Databricks task
which version of python are you using
did you see the same error when running workflow locally?
#3855 [BUG] Flyte task keeps running forever when running a Databricks job
btw, Does the databricks job job succeed or fail?
r
btw, Does the databricks job job succeed or fail?
Databricks job succeeded
which version of python are you using
I'm running this job on DBR 11.3 LTS (in both cases), it has Python 3.9.5 (added this info to the ticket also)
btw, could you try to send a get request to dbx by using curl?
Copy code
{
  "attempt_number": 0,
  "cleanup_duration": 0,
  "cluster_instance": {
    "cluster_id": "<my-cluster-id>",
    "spark_context_id": "<my-spark-context-id>"
  },
  "cluster_spec": {
    "existing_cluster_id": "<my-cluster-id>"
  },
  "creator_user_name": "<my-username>",
  "end_time": 1688987784820,
  "execution_duration": 223000,
  "format": "SINGLE_TASK",
  "job_id": 1060720031042619,
  "number_in_job": 574539,
  "run_id": 574539,
  "run_name": "dbx simplified example",
  "run_page_url": "<my-run-page-url>",
  "run_type": "SUBMIT_RUN",
  "setup_duration": 41000,
  "start_time": 1688987520036,
  "state": {
    "life_cycle_state": "TERMINATED",
    "result_state": "SUCCESS",
    "state_message": "",
    "user_cancelled_or_timedout": false
  },
  "task": {
    "spark_python_task": {
      "parameters": [
        "pyflyte-fast-execute",
        "--additional-distribution",
        "s3://<my-s3-bucket>/flytesnacks/development/UMZ6XPNM4L6KL4YALV56QDMSX4======/script_mode.tar.gz",
        "--dest-dir",
        ".",
        "--",
        "pyflyte-execute",
        "--inputs",
        "s3://<my-s3-bucket>/metadata/propeller/flytesnacks-development-ff83ea058624d44ddbe9/n0/data/inputs.pb",
        "--output-prefix",
        "s3://<my-s3-bucket>/metadata/propeller/flytesnacks-development-ff83ea058624d44ddbe9/n0/data/0",
        "--raw-output-data-prefix",
        "s3://<my-s3-bucket>/raw_data/sh/ff83ea058624d44ddbe9-n0-0",
        "--checkpoint-path",
        "s3://<my-s3-bucket>/raw_data/sh/ff83ea058624d44ddbe9-n0-0/_flytecheckpoints",
        "--prev-checkpoint",
        "\"\"",
        "--resolver",
        "flytekit.core.python_auto_container.default_task_resolver",
        "--",
        "task-module",
        "dbx_simplified_example",
        "task-name",
        "print_spark_config"
      ],
      "python_file": "dbfs:/tmp/flyte/entrypoint.py"
    }
  }
}
(added to the ticket also)
Did you see any error in the propeller pod while running databricks task?
No, I didn't. It's pretty weird, I also expected some error logs, but haven't seen any - let me double-check
Did you see any error in the propeller pod while running databricks task?
It's weird - I've just triggered a run (11/07/2023), but can't see any new logs in
flyteproperrel
. The latest logs are 4 days old.
could it be a plugin initialization error or it's something completely different?
k
I’ve just triggered a run (11/07/2023)
so the task is still running? and the databricks job is already completed.
r
yes, exactly
k
@Robert Ambrus could you check the propeller log again? want to know if there is any new error message?
r
as I understand,
flyteproperrel
is responsible for the task management. is there a way to monitor the HTTP traffic between
flyteproperrel
and
Databricks
?
yeah, just triggered a new run, let me check
k
is there a way to monitor the HTTP traffic between
need to add more logs to the plugin
do you remember when did you see this error? when you trying to abort task? do you see the same error in the new run?
r
hmm, yeah. anyway, based on this code, we should see errors in the logs if the communication is broken
k
yes, you’re right.
r
here's the status of the recent run:
image.png
image.png
no change in flyteproperrel logs
in
flyteadmin
logs
I can see log entries related to the run
k
oh, I see. propeller didn’t create resource metadata for some reasons. investigating
r
image.png
this is the
flyteadmin
log
k
I create a small pr, and build an image. could you give it a try?
Copy code
pingsutw/flytepropeller:c04b9260a4f1fe17f30283b470525807357a01ec
r
sure, let us take a look. I guess we just need to override the
flyteproperrel
image reference in our setup with the one you shared
like this, right?
Copy code
flytepropeller:
  enabled: true
  manager: false
  # -- Whether to install the flyteworkflows CRD with helm
  createCRDs: true
  # -- Replicas count for Flytepropeller deployment
  replicaCount: 1
  image:
    # -- Docker image for Flytepropeller deployment
    repository: pingsutw/flytepropeller # FLYTEPROPELLER_IMAGE
    tag: c04b9260a4f1fe17f30283b470525807357a01ec # FLYTEPROPELLER_TAG
    pullPolicy: IfNotPresent
k
yes, correct
or you can directly update the image in the flytepropeller deployment
r
all right
thank you, working on the deployment with my team mates (I don't have a required access) so, this change will basically tell us whether the
taskCtx.ResourceMeta
is initialized The POST request that is creating the job is successfully completed, so probably we can presume that this part is completed successfully:
Copy code
resp, err := <http://p.client.Do|p.client.Do>(req)
	if err != nil {
		return nil, nil, err
	}
Probably something goes wrong here, right?
Copy code
data, err := buildResponse(resp)
	if err != nil {
		return nil, nil, err
	}
	if data["run_id"] == "" {
		return nil, nil, pluginErrors.Wrapf(pluginErrors.RuntimeFailure, err,
			"Unable to fetch statementHandle from http response")
	}
It is quite strange that we do not have any errors in the logs - my guess is that these errors should be propagated to the
flyteproperrel
logs. Right?
cc @Georgi Ivanov
g
hi all
we tried to deploy the image but we got this
Copy code
flytepropeller-8574c869bb-d8bzv      0/1     Error              2 (23s ago)   33s
flytepropeller-8574c869bb-srwqd      0/1     CrashLoopBackOff   1 (16s ago)   24s
Copy code
k logs flytepropeller-8574c869bb-srwqd -n flyte
exec /bin/flytepropeller: exec format error
the reason is because of this
Copy code
ELF 64-bit LSB executable, ARM aarch64, version 1 (SYSV), statically linked, Go BuildID=yrkx_WEsTHfXYES1W5qM/G0DQz0Khz26svL-2chDO/BgntAhD_JOMfoTcNhtoP/JaCUuM92wwK3w0KL3Pzo, with debug_info, not stripped
we are running flyte on EKS and this image is ARMx64
can we get a x86_64 image or perhaps a multiarch one to test it
k
Oh sorry, let me build a new one
g
thank you
k
pingsutw/flytepropeller:e37bdfeffac589273f58e024ed9ef8bd5c48dbc6
g
checking
r
@Kevin Su we deployed
flyteproperrel
from the image, we got this in the logs again:
Copy code
time="2023-07-12T11:24:12Z" level=info msg=------------------------------------------------------------------------
time="2023-07-12T11:24:12Z" level=info msg="App [flytepropeller], Version [unknown], BuildSHA [unknown], BuildTS [2023-07-12 11:24:12.690099108 +0000 UTC m=+0.023105387]"
time="2023-07-12T11:24:12Z" level=info msg=------------------------------------------------------------------------
time="2023-07-12T11:24:12Z" level=info msg="Detected: 8 CPU's\n"
{"json":{},"level":"warning","msg":"defaulting max ttl for workflows to 23 hours, since configured duration is larger than 23 [23]","ts":"2023-07-12T11:24:12Z"}
{"json":{},"level":"warning","msg":"stow configuration section missing, defaulting to legacy s3/minio connection config","ts":"2023-07-12T11:24:12Z"}
I0712 11:24:13.017134       1 leaderelection.go:248] attempting to acquire leader lease flyte/propeller-leader...
I0712 11:24:29.591775       1 leaderelection.go:258] successfully acquired lease flyte/propeller-leader
{"json":{"routine":"databricks-worker-1"},"level":"error","msg":"worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-1"},"level":"error","msg":"Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-2"},"level":"error","msg":"worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-2"},"level":"error","msg":"Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-4"},"level":"error","msg":"worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-4"},"level":"error","msg":"Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-0"},"level":"error","msg":"worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-0"},"level":"error","msg":"Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-6"},"level":"error","msg":"worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-6"},"level":"error","msg":"Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-8"},"level":"error","msg":"worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-8"},"level":"error","msg":"Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-5"},"level":"error","msg":"worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-5"},"level":"error","msg":"Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-9"},"level":"error","msg":"worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-9"},"level":"error","msg":"Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-3"},"level":"error","msg":"worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-3"},"level":"error","msg":"Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-7"},"level":"error","msg":"worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
{"json":{"routine":"databricks-worker-7"},"level":"error","msg":"Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-12T11:24:59Z"}
so, this error occurs at container start
hmm, what is this error about? "Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper"
after this error, I can't see and
flyteproperrel
logs
hmm, we have a bunch of workflows / tasks deployed in our Flyte workspace
I suspect
flyteproperrel
is trying to refresh the status of these tasks
and that's why we have these errors right after the container start
does it make sense @Kevin Su?
as I see the error is coming from here
and this is the interface:
k
@Robert Ambrus are you able to jump on a call, and share screen?
just add one more change, could you test it again? pingsutw/flytepropeller:55fa6a247e659f629d535c022c84d8985054368f
r
sorry, I'm in another call
ok, let us check
g
hi @Kevin Su
we deployed this version
we still see these errors in the flytepropeller logs
Copy code
{"json":{"routine":"databricks-worker-0"},"level":"error","msg":"Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-13T09:51:46Z"}
{"json":{"routine":"databricks-worker-9"},"level":"error","msg":"worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-13T09:51:46Z"}
{"json":{"routine":"databricks-worker-9"},"level":"error","msg":"Failed to sync. Error: worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper","ts":"2023-07-13T09:51:46Z"}
but as @Robert Ambrus said, we are able to trigger a job on Databricks from flyte, it’s just that this error pops up and it looks like the communication with the flyte plugins on Databricks is not fully working
r
@Kevin Su any update on this? we tested the PATCHED images - unfortunately facing the same issue (see above) any idea why the
worker panic'd and is shutting down. Error: interface conversion: interface {} is databricks.ResourceMetaWrapper, not *databricks.ResourceMetaWrapper
is happening?
k
we just found the main issue, and fixed it. https://github.com/flyteorg/flyteplugins/pull/407