Hi <@USU6W5ATA> , one of my colleagues have been t...
# ask-the-community
g
Hi @Kevin Su , one of my colleagues have been testing the built-in databricks plugin in the propeller and found another issue that looks like a bug. When providing a
new_cluster
stanza for the plugin to spin up a new job cluster for the job, he gets a PERMISSION_DENIED error if he does not specify a policy_id, which is as expected (no issue here). However if he do specify “policy_id”: “some policy” to the
new_cluster
definition, the plugin does not even try to send a request to Databricks but just fails. The only error message in the propeller logs that indicates the problem is this
"msg": "Some downstream node has failed. Failed: [true]. TimedOut: [false]. Error: [code:\"\\306\\220\" kind:USER ]"
. I compared both runs and in both cases a flyteworkflow is created in the correct flyte namspace and the only difference is that in the one that fails we have a “policy_id” field. My flyte version is 1.10.5 and I also tried compiling the propeller with your log debug statements to inspect if there is any possible error coming from Databricks but it looks like we never made the API call to Databricks but rather we fail beforehand.
l
I think it is due to other error. https://github.com/flyteorg/flytekit/pull/1951
in this example, you don't need policy id.
g
@L godlike can you elaborate please ?
I am using the built-in databricks webapi plugin in the propeller
@Georgi Ivanov Would you like to take a look at this doc?
And list those steps you are not familiar about
I will try to help you.
g
i have setup the built-in plugin and it works fine, however a user of flyte had problems submitting a workflow to databricks. When the workflow is registered and run from Flyte (via the UI) the workflow exists and no API call is made to Databricks.
l
Can you give me an example? Or try to ellaborate more?
Kevin and I will try to help you and enhance the feature
g
The only error I was able to find is the the above mentioned error. Yet if the user updates via workflow and removes “policy_id” the workflow is successfully sent to Databricks API
l
Copy code
curl -X PATCH -n \
-H "Authorization: Bearer <your-personal-access-token>" \
https://<databricks-instance>/api/2.0/workspace-conf \
-d '{
    "enableDcs": "true"
    }'
Have you tried this before?
g
I updated the propeller code with some debug statetements that Kevin used to print the request/response from Databricks, but it looks like with the problem workflow we never called databricks.
1 sec
l
This is a necessary step
I've updated it in the new documentaion, make it more noticeable
Please tell me the result if you can use it whether use policy id or not
Thank you really much
g
we can start DBX clusters with custom containers
l
yes, does this solve the problem with policy id?
g
no
it is rather weird to be honest
I inspected the FlyteWorkflow CRDs
and they look ok to me, I mean flyte can create the CRDs for each submitted worflow
but somehow when the DBX payload has policy_id in it, flyte does not start the payload and just throws the above mentioned error
on another note
the same user has problems using pyflyte (the user has used flyte before so he is experienced in it)
l
Thank you very much
g
Copy code
Failed with Exception Code: SYSTEM:Unknown
Underlying Exception: [Errno 98] Address already in use
53593
l
can you provide a python example for us to debug?
Kevin and I will help in this week
g
our flyte is configured with Okta for OIDC and built-in OAuth server
kk
1 sec
Copy code
import datetime
import random
from operator import add
import flytekit
from flytekit import Resources, task, workflow
from flytekitplugins.spark.task import Databricks

@task(
    task_config=Databricks(
       databricks_conf={
         "run_name" : "flytekit databricks plugin example",
         "timeout_seconds" : 3600,
         "new_cluster" : {
            "num_workers": 2,
            "spark_version": "12.2.x-scala2.12",
            "spark_conf": {
                "spark.hadoop.fs.s3a.server-side-encryption-algorithm": "AES256",
                "spark.driver.extraJavaOptions": "-Dlog4j2.formatMsgNoLookups=true",
                "spark.executor.extraJavaOptions": "-Dlog4j2.formatMsgNoLookups=true"
            },
            "aws_attributes": {
                "first_on_demand": 1,
                "availability": "SPOT_WITH_FALLBACK",
                "zone_id": "auto",
                "instance_profile_arn": "arn:aws:iam::<account>:instance-profile/<profile>",
                "spot_bid_price_percent": 100,
                "ebs_volume_count": 0
            },
            "policy_id": "<policy_id>",
            "node_type_id": "m5d.large",
            "ssh_public_keys": [],
            "custom_tags": {
            ...
            },
            "cluster_log_conf": {
                "s3": {
                    "destination": "s3://<bucket>/cluster-logs",
                    "region": "us-east-1",
                    "enable_encryption": "true",
                    "canned_acl": "bucket-owner-full-control"
                }
            },
            "spark_env_vars": {
                "LIMITED_INTERNET_ACCESS": "false"
            },
            "enable_elastic_disk": "true",
            "init_scripts": []
         }
       },
       databricks_instance="<instance>",
       databricks_token="<token>",
       applications_path="s3://<bucket>/entrypoint.py"
    ),
    limits=Resources(mem="2000M"),
    cache_version="1",
)
def print_spark_config():
    spark = flytekit.current_context().spark_session
    print(spark.sparkContext.getConf().getAll())

@workflow
def my_db_job():
    print_spark_config()
l
This will be really helpful
Thank you very much again.
g
so with this workflow, flyte returns the above error. I don’t see any communication back and forth to Databricks
if we remove he policy_id, the job is sent to databricks
this does not make any sense to me
in both cases the FlyteWorkflow CRD is created
l
We will spend time deep dive to it, thanks a lot
g
thanks
and this is how the user runs it
Copy code
pyflyte -k db_plugin package --fast -d "/databricks/driver" --image <custom_databricks_runner_image> --force --output=db_plugin.tgz    
flytectl register files --project flytesnacks --domain development --archive db_plugin.tgz --version v21 --destinationDirectory "/databricks/driver"
there is a reason why he is not running it directly with “pyflyte run”
1. he wants to run it like a production workflow (following the recommended pattern)
l
you can use pyflyte register instead
g
2. second, he is not able to actually use “pyflyte run”, because what happens with him, is that pyflyte gets two callback responses (from the built-in OAuth server) and tries to bind twice to localhost on the same port and it fails
Copy code
Failed with Exception Code: SYSTEM:Unknown
Underlying Exception: [Errno 98] Address already in use
53593
again, I used to face the same problem, but it disappeared for me ( I did not do anything )
I don’t know why this happens to him, but he found out that by using these two commands he is able to register the workflow and then run it from the console
k
@Georgi Ivanov I found the bug, will create a pr today. will ping you once I open a PR
g
that’s about the double auth ?
@L godlike
l
I don’t know the bug yet, will wake for Kevin’s update and double check with him
Wait for
Thank for your patience
k
Not double auth. About policy ID
a
Hi Kevin Su and L godlike, I am the colleague that Georgi was referring to in his message above. It would be great if you can help check the double auth issue with pyflyte register and pyflyte run as well from your side
k
@Georgi Ivanov could you give it a try
I built a image for propeller.
pingsutw/debug-dbx:v2
There was a panic in the dbx plugin, so you didn’t see any error message.