Georgi Ivanov
11/16/2023, 1:25 PMnew_cluster
stanza for the plugin to spin up a new job cluster for the job, he gets a PERMISSION_DENIED error if he does not specify a policy_id, which is as expected (no issue here). However if he do specify “policy_id”: “some policy” to the new_cluster
definition, the plugin does not even try to send a request to Databricks but just fails. The only error message in the propeller logs that indicates the problem is this "msg": "Some downstream node has failed. Failed: [true]. TimedOut: [false]. Error: [code:\"\\306\\220\" kind:USER ]"
. I compared both runs and in both cases a flyteworkflow is created in the correct flyte namspace and the only difference is that in the one that fails we have a “policy_id” field. My flyte version is 1.10.5 and I also tried compiling the propeller with your log debug statements to inspect if there is any possible error coming from Databricks but it looks like we never made the API call to Databricks but rather we fail beforehand.L godlike
11/16/2023, 2:55 PMGeorgi Ivanov
11/16/2023, 5:02 PMGeorgi Ivanov
11/21/2023, 3:01 PML godlike
11/21/2023, 3:02 PMGeorgi Ivanov
11/21/2023, 3:03 PML godlike
11/21/2023, 3:03 PMcurl -X PATCH -n \
-H "Authorization: Bearer <your-personal-access-token>" \
https://<databricks-instance>/api/2.0/workspace-conf \
-d '{
"enableDcs": "true"
}'
Georgi Ivanov
11/21/2023, 3:04 PML godlike
11/21/2023, 3:04 PMGeorgi Ivanov
11/21/2023, 3:17 PML godlike
11/21/2023, 3:18 PMGeorgi Ivanov
11/21/2023, 3:18 PML godlike
11/21/2023, 3:20 PMGeorgi Ivanov
11/21/2023, 3:20 PMFailed with Exception Code: SYSTEM:Unknown
Underlying Exception: [Errno 98] Address already in use
53593
L godlike
11/21/2023, 3:20 PMGeorgi Ivanov
11/21/2023, 3:21 PMimport datetime
import random
from operator import add
import flytekit
from flytekit import Resources, task, workflow
from flytekitplugins.spark.task import Databricks
@task(
task_config=Databricks(
databricks_conf={
"run_name" : "flytekit databricks plugin example",
"timeout_seconds" : 3600,
"new_cluster" : {
"num_workers": 2,
"spark_version": "12.2.x-scala2.12",
"spark_conf": {
"spark.hadoop.fs.s3a.server-side-encryption-algorithm": "AES256",
"spark.driver.extraJavaOptions": "-Dlog4j2.formatMsgNoLookups=true",
"spark.executor.extraJavaOptions": "-Dlog4j2.formatMsgNoLookups=true"
},
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"zone_id": "auto",
"instance_profile_arn": "arn:aws:iam::<account>:instance-profile/<profile>",
"spot_bid_price_percent": 100,
"ebs_volume_count": 0
},
"policy_id": "<policy_id>",
"node_type_id": "m5d.large",
"ssh_public_keys": [],
"custom_tags": {
...
},
"cluster_log_conf": {
"s3": {
"destination": "s3://<bucket>/cluster-logs",
"region": "us-east-1",
"enable_encryption": "true",
"canned_acl": "bucket-owner-full-control"
}
},
"spark_env_vars": {
"LIMITED_INTERNET_ACCESS": "false"
},
"enable_elastic_disk": "true",
"init_scripts": []
}
},
databricks_instance="<instance>",
databricks_token="<token>",
applications_path="s3://<bucket>/entrypoint.py"
),
limits=Resources(mem="2000M"),
cache_version="1",
)
def print_spark_config():
spark = flytekit.current_context().spark_session
print(spark.sparkContext.getConf().getAll())
@workflow
def my_db_job():
print_spark_config()
L godlike
11/21/2023, 3:25 PMGeorgi Ivanov
11/21/2023, 3:25 PML godlike
11/21/2023, 3:26 PMGeorgi Ivanov
11/21/2023, 3:26 PMpyflyte -k db_plugin package --fast -d "/databricks/driver" --image <custom_databricks_runner_image> --force --output=db_plugin.tgz
flytectl register files --project flytesnacks --domain development --archive db_plugin.tgz --version v21 --destinationDirectory "/databricks/driver"
L godlike
11/21/2023, 3:28 PMGeorgi Ivanov
11/21/2023, 3:29 PMFailed with Exception Code: SYSTEM:Unknown
Underlying Exception: [Errno 98] Address already in use
53593
Kevin Su
11/22/2023, 11:56 AMGeorgi Ivanov
11/22/2023, 5:00 PML godlike
11/22/2023, 11:18 PMKevin Su
11/22/2023, 11:20 PMAnantha Janakiraman
11/23/2023, 3:12 AMKevin Su
11/24/2023, 1:52 AMpingsutw/debug-dbx:v2