https://flyte.org logo
#ask-the-community
Title
# ask-the-community
g

Georgi Ivanov

11/16/2023, 1:25 PM
Hi @Kevin Su , one of my colleagues have been testing the built-in databricks plugin in the propeller and found another issue that looks like a bug. When providing a
new_cluster
stanza for the plugin to spin up a new job cluster for the job, he gets a PERMISSION_DENIED error if he does not specify a policy_id, which is as expected (no issue here). However if he do specify “policy_id”: “some policy” to the
new_cluster
definition, the plugin does not even try to send a request to Databricks but just fails. The only error message in the propeller logs that indicates the problem is this
"msg": "Some downstream node has failed. Failed: [true]. TimedOut: [false]. Error: [code:\"\\306\\220\" kind:USER ]"
. I compared both runs and in both cases a flyteworkflow is created in the correct flyte namspace and the only difference is that in the one that fails we have a “policy_id” field. My flyte version is 1.10.5 and I also tried compiling the propeller with your log debug statements to inspect if there is any possible error coming from Databricks but it looks like we never made the API call to Databricks but rather we fail beforehand.
l

L godlike

11/16/2023, 2:55 PM
I think it is due to other error. https://github.com/flyteorg/flytekit/pull/1951
in this example, you don't need policy id.
g

Georgi Ivanov

11/16/2023, 5:02 PM
@L godlike can you elaborate please ?
I am using the built-in databricks webapi plugin in the propeller
@Georgi Ivanov Would you like to take a look at this doc?
And list those steps you are not familiar about
I will try to help you.
g

Georgi Ivanov

11/21/2023, 3:01 PM
i have setup the built-in plugin and it works fine, however a user of flyte had problems submitting a workflow to databricks. When the workflow is registered and run from Flyte (via the UI) the workflow exists and no API call is made to Databricks.
l

L godlike

11/21/2023, 3:02 PM
Can you give me an example? Or try to ellaborate more?
Kevin and I will try to help you and enhance the feature
g

Georgi Ivanov

11/21/2023, 3:03 PM
The only error I was able to find is the the above mentioned error. Yet if the user updates via workflow and removes “policy_id” the workflow is successfully sent to Databricks API
l

L godlike

11/21/2023, 3:03 PM
Copy code
curl -X PATCH -n \
-H "Authorization: Bearer <your-personal-access-token>" \
https://<databricks-instance>/api/2.0/workspace-conf \
-d '{
    "enableDcs": "true"
    }'
Have you tried this before?
g

Georgi Ivanov

11/21/2023, 3:04 PM
I updated the propeller code with some debug statetements that Kevin used to print the request/response from Databricks, but it looks like with the problem workflow we never called databricks.
1 sec
l

L godlike

11/21/2023, 3:04 PM
This is a necessary step
I've updated it in the new documentaion, make it more noticeable
Please tell me the result if you can use it whether use policy id or not
Thank you really much
g

Georgi Ivanov

11/21/2023, 3:17 PM
we can start DBX clusters with custom containers
l

L godlike

11/21/2023, 3:18 PM
yes, does this solve the problem with policy id?
g

Georgi Ivanov

11/21/2023, 3:18 PM
no
it is rather weird to be honest
I inspected the FlyteWorkflow CRDs
and they look ok to me, I mean flyte can create the CRDs for each submitted worflow
but somehow when the DBX payload has policy_id in it, flyte does not start the payload and just throws the above mentioned error
on another note
the same user has problems using pyflyte (the user has used flyte before so he is experienced in it)
l

L godlike

11/21/2023, 3:20 PM
Thank you very much
g

Georgi Ivanov

11/21/2023, 3:20 PM
Copy code
Failed with Exception Code: SYSTEM:Unknown
Underlying Exception: [Errno 98] Address already in use
53593
l

L godlike

11/21/2023, 3:20 PM
can you provide a python example for us to debug?
Kevin and I will help in this week
g

Georgi Ivanov

11/21/2023, 3:21 PM
our flyte is configured with Okta for OIDC and built-in OAuth server
kk
1 sec
Copy code
import datetime
import random
from operator import add
import flytekit
from flytekit import Resources, task, workflow
from flytekitplugins.spark.task import Databricks

@task(
    task_config=Databricks(
       databricks_conf={
         "run_name" : "flytekit databricks plugin example",
         "timeout_seconds" : 3600,
         "new_cluster" : {
            "num_workers": 2,
            "spark_version": "12.2.x-scala2.12",
            "spark_conf": {
                "spark.hadoop.fs.s3a.server-side-encryption-algorithm": "AES256",
                "spark.driver.extraJavaOptions": "-Dlog4j2.formatMsgNoLookups=true",
                "spark.executor.extraJavaOptions": "-Dlog4j2.formatMsgNoLookups=true"
            },
            "aws_attributes": {
                "first_on_demand": 1,
                "availability": "SPOT_WITH_FALLBACK",
                "zone_id": "auto",
                "instance_profile_arn": "arn:aws:iam::<account>:instance-profile/<profile>",
                "spot_bid_price_percent": 100,
                "ebs_volume_count": 0
            },
            "policy_id": "<policy_id>",
            "node_type_id": "m5d.large",
            "ssh_public_keys": [],
            "custom_tags": {
            ...
            },
            "cluster_log_conf": {
                "s3": {
                    "destination": "s3://<bucket>/cluster-logs",
                    "region": "us-east-1",
                    "enable_encryption": "true",
                    "canned_acl": "bucket-owner-full-control"
                }
            },
            "spark_env_vars": {
                "LIMITED_INTERNET_ACCESS": "false"
            },
            "enable_elastic_disk": "true",
            "init_scripts": []
         }
       },
       databricks_instance="<instance>",
       databricks_token="<token>",
       applications_path="s3://<bucket>/entrypoint.py"
    ),
    limits=Resources(mem="2000M"),
    cache_version="1",
)
def print_spark_config():
    spark = flytekit.current_context().spark_session
    print(spark.sparkContext.getConf().getAll())

@workflow
def my_db_job():
    print_spark_config()
l

L godlike

11/21/2023, 3:25 PM
This will be really helpful
Thank you very much again.
g

Georgi Ivanov

11/21/2023, 3:25 PM
so with this workflow, flyte returns the above error. I don’t see any communication back and forth to Databricks
if we remove he policy_id, the job is sent to databricks
this does not make any sense to me
in both cases the FlyteWorkflow CRD is created
l

L godlike

11/21/2023, 3:26 PM
We will spend time deep dive to it, thanks a lot
g

Georgi Ivanov

11/21/2023, 3:26 PM
thanks
and this is how the user runs it
Copy code
pyflyte -k db_plugin package --fast -d "/databricks/driver" --image <custom_databricks_runner_image> --force --output=db_plugin.tgz    
flytectl register files --project flytesnacks --domain development --archive db_plugin.tgz --version v21 --destinationDirectory "/databricks/driver"
there is a reason why he is not running it directly with “pyflyte run”
1. he wants to run it like a production workflow (following the recommended pattern)
l

L godlike

11/21/2023, 3:28 PM
you can use pyflyte register instead
g

Georgi Ivanov

11/21/2023, 3:29 PM
2. second, he is not able to actually use “pyflyte run”, because what happens with him, is that pyflyte gets two callback responses (from the built-in OAuth server) and tries to bind twice to localhost on the same port and it fails
Copy code
Failed with Exception Code: SYSTEM:Unknown
Underlying Exception: [Errno 98] Address already in use
53593
again, I used to face the same problem, but it disappeared for me ( I did not do anything )
I don’t know why this happens to him, but he found out that by using these two commands he is able to register the workflow and then run it from the console
k

Kevin Su

11/22/2023, 11:56 AM
@Georgi Ivanov I found the bug, will create a pr today. will ping you once I open a PR
g

Georgi Ivanov

11/22/2023, 5:00 PM
that’s about the double auth ?
@L godlike
l

L godlike

11/22/2023, 11:18 PM
I don’t know the bug yet, will wake for Kevin’s update and double check with him
Wait for
Thank for your patience
k

Kevin Su

11/22/2023, 11:20 PM
Not double auth. About policy ID
a

Anantha Janakiraman

11/23/2023, 3:12 AM
Hi Kevin Su and L godlike, I am the colleague that Georgi was referring to in his message above. It would be great if you can help check the double auth issue with pyflyte register and pyflyte run as well from your side
k

Kevin Su

11/24/2023, 1:52 AM
@Georgi Ivanov could you give it a try
I built a image for propeller.
pingsutw/debug-dbx:v2
There was a panic in the dbx plugin, so you didn’t see any error message.
12 Views