Hello, I am trying to write a test flyte databrick...
# ask-the-community
f
Hello, I am trying to write a test flyte databricks workflow that I can run on the remote flyte cluster. I have enabled Databricks Agent on the cluster. I am following https://flyte.org/blog/flyte-1-10-monorepo-new-agents-eager-workflows-and-more#:~:text=databricks_instance%3Dos.[…]ron%5B%22DATABRICKS_HOST%22%5D%2C And I got error when doing: pyflyte run flyte_databricks.py wf Failed with Unknown Exception class ‘TypeError’ Reason: *__init__*() got an unexpected keyword argument ‘spark_conf’ *__init__*() got an unexpected keyword argument ‘spark_conf’ Could you provide me with a working example of using DatabricksAgent? I have pip installed the latest flytekit locally:
Copy code
pip list|grep flyte
flyteidl                        1.10.6
flytekit                        1.10.3b2
flytekitplugins-spark           1.10.3b2
s
k
could you share the example you ran
could you try the example in this PR description? https://github.com/flyteorg/flytekit/pull/1951
f
@Kevin Su, I have trouble obtaining the AWS creds locally due to company changing the way we auth to AWS. Not until I resolve that, running locally is not possible. Besides do you have sample code for databricks.py?
k
i see, but you can register that workflow to flyte cluster, and run it
f
@Kevin Su, could you share the link to the databricks.py code?
I would like to get the databricksagent task config right.
k
Copy code
import datetime
import os
import random
from operator import add

from click.testing import CliRunner

import flytekit
from flytekit import Resources, Secret, task, workflow, ImageSpec
from flytekit.clis.sdk_in_container import pyflyte
from flytekitplugins.spark import Databricks

SECRET_GROUP = "token-info"
SECRET_NAME = "token_secret"

image = ImageSpec(base_image="pingsutw/databricks:v4", registry="pingsutw")

@task(
    task_config=Databricks(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "600M",
            "spark.executor.memory": "600M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "1",
            "spark.driver.cores": "1",
        },
        executor_path="/databricks/python3/bin/python",
        applications_path="dbfs:///FileStore/tables/entrypoint.py",
        databricks_conf={
            "run_name": "flytekit databricks plugin example",
            "new_cluster": {
                "spark_version": "13.3.x-scala2.12",
                "node_type_id": "m6i.large",  # TODO: test m6i.large, i3.xlarge
                "num_workers": 3,
                "aws_attributes": {
                    "availability": "ON_DEMAND",
                    "instance_profile_arn": "arn:aws:iam::xxxxxx:instance-profile/databricks-agent",
                    "ebs_volume_type": "GENERAL_PURPOSE_SSD",
                    "ebs_volume_count": 1,
                    "ebs_volume_size": 100,
                },
            },
            "timeout_seconds": 3600,
            "max_retries": 1,
        },
        databricks_instance="xxxxxxx.cloud.databricks.com",
    ),
    limits=Resources(mem="2000M"),
    # container_image=image,
    container_image="pingsutw/databricks:v7"
)
def hello_spark(partitions: int) -> float:
    print("Starting Spark with Partitions: {}".format(partitions))

    n = 100000 * partitions
    sess = flytekit.current_context().spark_session
    count = (
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    )
    pi_val = 4.0 * count / n
    print("Pi val is :{}".format(pi_val))
    return pi_val


def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0


@task(cache_version="1")
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
    print("My printed value: {} @ {}".format(value_to_print, date_triggered))
    return 1


@workflow
def wf(
    triggered_date: datetime.datetime = datetime.datetime.now(),
) -> float:
    """
    Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care
    about how the image is configured.
    """
    pi = hello_spark(partitions=50)
    print_every_time(value_to_print=pi, date_triggered=triggered_date)
    return pi


if __name__ == '__main__':
    runner = CliRunner()
    result = runner.invoke(pyflyte.main,
                           ["run",
                            "--raw-output-data-prefix",
                            "<s3://flyte-batch/spark/>",
                            "/Users/kevin/git/flytekit/flyte-example/databricks_wf",
                            "wf"])
    print(result.output)
f
This is for Agent?
k
databricks.py
yes
both agent and back plugin use the same code.
we didn’t change the user api
f
Will the python task in the same workflow be executed on databricks cluster?
k
yes, it will. agent will submit it to databricks cluster
f
My expectation is to have only the datbricks spark task run on databricks, the rest of the tasks in the same workflow on the flyte cluster. Is my expectation wrong?
k
ah, sorry, misunderstand you message above. yes, only the task with databricks config will be submitted to the databricks cluster
f
Great!
Hi @Kevin Su, I got error:
Copy code
pyflyte run --raw-output-data-prefix <s3://mlp-flyte-workflow/test> databricks.py wf
Running Execution on local.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/14 16:36:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2023-12-14 16:36:56,557422 ERROR    {"asctime": "2023-12-14 16:36:56,557", "name": "flytekit", "levelname": "ERROR", "message": "Agent failed to run the task with error: Please make sure to add                                  task.py:189
                                    secret_requests=[Secret(group=FLYTE_DATABRICKS_ACCESS_TOKEN, key=None)] in @task. Unable to find secret for key None in group FLYTE_DATABRICKS_ACCESS_TOKEN in Env
                                    Var:_FSEC_FLYTE_DATABRICKS_ACCESS_TOKEN and FilePath: /etc/secrets/flyte_databricks_access_token"}
And I put:
Copy code
@task(
    secret_requests=[
        Secret(
            group=SECRET_GROUP,
            key=SECRET_NAME,
            mount_requirement=Secret.MountType.ENV_VAR,
        )
    ],
What should be in the secret_requests?
k
should use pyflyte run --remote because you can’t run it locally. just use
Copy code
pyflyte run --remote databricks.py wf
f
I am able to add the aws keys therefore I can run locally.
k
ohh, I see. why do you add the
secret_requests
f
I tried pyflyte run --remote and got error in pod flytesnacks-development/f1362fb6223764094a5d-n0-0f1362fb6223764094a5d n0 0
Copy code
tar: Removing leading `/' from member names

╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /databricks/python3/bin/pyflyte-execute:8 in <module>                        │
│                                                                              │
│ ❱ 8 │   sys.exit(execute_task_cmd())                                         │
│                                                                              │
│ /databricks/python3/lib/python3.9/site-packages/click/core.py:1157 in        │
│ __call__                                                                     │
│                                                                              │
│ ❱ 1157 │   │   return self.main(*args, **kwargs)                             │
│                                                                              │
│ /databricks/python3/lib/python3.9/site-packages/click/core.py:1078 in main   │
│                                                                              │
│ ❱ 1078 │   │   │   │   │   rv = self.invoke(ctx)                             │
│                                                                              │
│ /databricks/python3/lib/python3.9/site-packages/click/core.py:1434 in invoke │
│                                                                              │
│ ❱ 1434 │   │   │   return ctx.invoke(self.callback, **ctx.params)            │
│                                                                              │
│ /databricks/python3/lib/python3.9/site-packages/click/core.py:783 in invoke  │
│                                                                              │
│ ❱  783 │   │   │   │   return __callback(*args, **kwargs)                    │
│                                                                              │
│ /databricks/python3/lib/python3.9/site-packages/flytekit/bin/entrypoint.py:4 │
│ 93 in execute_task_cmd                                                       │
│                                                                              │
│ ❱ 493 │   _execute_task(                                                     │
│                                                                              │
│ /databricks/python3/lib/python3.9/site-packages/flytekit/exceptions/scopes.p │
│ y:143 in f                                                                   │
│                                                                              │
│ ❱ 143 │   │   │   return outer_f(inner_f, args, kwargs)                      │
│                                                                              │
│ /databricks/python3/lib/python3.9/site-packages/flytekit/exceptions/scopes.p │
│ y:173 in system_entry_point                                                  │
│                                                                              │
│ ❱ 173 │   │   │   │   return wrapped(*args, **kwargs)                        │
│                                                                              │
│ /databricks/python3/lib/python3.9/site-packages/flytekit/bin/entrypoint.py:3 │
│ 65 in _execute_task                                                          │
│                                                                              │
│ ❱ 365 │   │   _task_def = resolver_obj.load_task(loader_args=resolver_args)  │
│                                                                              │
│ /databricks/python3/lib/python3.9/site-packages/flytekit/core/utils.py:319   │
│ in wrapper                                                                   │
│                                                                              │
│ ❱ 319 │   │   │   │   return func(*args, **kwargs)                           │
│                                                                              │
│ /databricks/python3/lib/python3.9/site-packages/flytekit/core/python_auto_co │
│ ntainer.py:251 in load_task                                                  │
│                                                                              │
│ ❱ 251 │   │   task_def = getattr(task_module, task_name)                     │
╰──────────────────────────────────────────────────────────────────────────────╯
AttributeError: module 'databricks' has no attribute 'hello_spark'
Getting <s3://mlp-flyte-artifact/flytesnacks/development/6GNCBGLZ5JD6QPBXXBNY7V632A======/script_mode.tar.gz> to /root/
Stream closed EOF for flytesnacks-development/f1362fb6223764094a5d-n0-0 (f1362fb6223764094a5d-n0-0)
If I don’t add the secret_requests in the @task I got:
Copy code
pyflyte run --raw-output-data-prefix <s3://mlp-flyte-workflow/test> databricks.py wf
Running Execution on local.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/14 16:34:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2023-12-14 16:34:45,425583 ERROR    {"asctime": "2023-12-14 16:34:45,425", "name": "flytekit", "levelname": "ERROR", "message": "Agent failed to run the task with error: Please make sure to add                                  task.py:189
                                    secret_requests=[Secret(group=FLYTE_DATABRICKS_ACCESS_TOKEN, key=None)] in @task. Unable to find secret for key None in group FLYTE_DATABRICKS_ACCESS_TOKEN in Env
                                    Var:_FSEC_FLYTE_DATABRICKS_ACCESS_TOKEN and FilePath: /etc/secrets/flyte_databricks_access_token"}
The same error.
k
you don’t need secret request. just add access token to
/etc/secrets/FLYTE_DATABRICKS_ACCESS_TOKEN
locally
f
is this a file name? /etc/secrets/FLYTE_DATABRICKS_ACCESS_TOKEN
How do I add it?
I am using macbook
k
just use vim. vim /etc/secrets/FLYTE_DATABRICKS_ACCESS_TOKEN
If you want to run it on flyte cluster, you need to update the agent k8s secret. https://docs.flyte.org/en/latest/deployment/agents/databricks.html
f
I did that.
Now I got this:
Copy code
pyflyte run --raw-output-data-prefix <s3://mlp-flyte-workflow/test> databricks.py wf
Running Execution on local.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/14 17:03:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2023-12-14 17:03:10,995716 ERROR    {"asctime": "2023-12-14 17:03:10,995", "name": "flytekit", "levelname": "ERROR", "message": "Agent failed to run the task with error: Cannot connect to host                                   task.py:189
                                    <http://wbd-dcp-mlp-dev.cloud.databricks.com:443|wbd-dcp-mlp-dev.cloud.databricks.com:443> ssl:True [SSLCertVerificationError: (1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain
                                    (_ssl.c:1131)')]"}
k
qq: are you able to send any API request to databricks from local?
looks like certificate is required
could you try to run a
get
REST API from local to make sure you can connect to the databricks platform?
Copy code
curl --netrc --request GET --header "Authorization: Bearer $DATABRICKS_TOEN" \                                                          
'<https://dbc-2889d011-7c0b.cloud.databricks.com/api/2.0/jobs/runs/get?run_id=236244201527146>'
f
Copy code
curl --netrc --request GET --header "Authorization: Bearer $DATABRICKS_TOEN" \
'<https://wbd-dcp-mlp-dev.cloud.databricks.com/api/2.0/jobs/runs/get?run_id=33469631515226>'
curl: (3) URL rejected: Malformed input to a URL function
zsh: no such file or directory: <https://wbd-dcp-mlp-dev.cloud.databricks.com/api/2.0/jobs/runs/get?run_id=33469631515226>
Is this a var?
Copy code
$DATABRICKS_TOEN
k
replace it to your databricks API token
f
I did. Still got
Copy code
curl: (3) URL rejected: Malformed input to a URL function
zsh: no such file or directory: <https://wbd-dcp-mlp-dev.cloud.databricks.com/api/2.0/jobs/runs/get?run_id=33469631515226>
image.png
k
Copy code
curl: (3) URL rejected: Malformed input to a URL function
odd
f
I got back results:
Copy code
curl --netrc --request GET --header "Authorization: Bearer xxx" '<https://wbd-dcp-mlp-dev.cloud.databricks.com/api/2.0/jobs/runs/get?run_id=33469631515226>'

{"job_id":884271762736911,"run_id":33469631515226,"creator_user_name":"<mailto:frank.shen@wbd.com|frank.shen@wbd.com>","number_in_job":33469631515226,"original_attempt_run_id":33469631515226,"state":{"life_cycle_state":"TERMINATED","result_state":"SUCCESS","state_message":"","user_cancelled_or_timedout":false},"task":{"sql_task":{"query":{"query_id":"bea7b086-e308-4317-be79-55d4ac006f73"},"warehouse_id":"e414e987f3e25b85"}},"cluster_spec":{},"start_time":1702602985495,"setup_duration":0,"execution_duration":132000,"cleanup_duration":0,"end_time":1702603118520,"run_duration":133025,"trigger":"ONE_TIME","run_name":"frank_test","run_page_url":"<https://wbd-dcp-mlp-dev.cloud.databricks.com/?o=7539614086893660#job/884271762736911/run/33469631515226>","run_type":"JOB_RUN","attempt_number":0,"format":"SINGLE_TASK"}%
k
ok, it worked.
how about pyflyte run? still failing?
f
yes, same certificate error
k
so agent didn’t submit the job yet, right?
f
correct
k
which version of flytekit you are using
could you try flytekitplugins-spark==v1.10.3b3 and flytekit==v1.10.3b3
f
pip list|grep flyte flyteidl 1.10.6 flytekit 1.10.3b2 flytekitplugins-spark 1.10.3b2
k
b2 should work as well
let me test it real quick
could you try to install spark plugin from this pr?
f
I got pass that error. Thank you very much! And I got a new error:
Copy code
ERROR    {"asctime": "2023-12-14 17:45:53,424", "name": "flytekit", "levelname": "ERROR", "message": "Agent failed to run the task with error: Failed to create databricks job with error:              task.py:189
                                    {'error_code': 'INVALID_PARAMETER_VALUE', 'message': 'The instance profile arn (arn:aws:iam::245085526351:role/dcp-mlp-mlp-instance-profile) is ill-formed.'}"}
I have:
Copy code
@task(
    task_config=Databricks(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "600M",
            "spark.executor.memory": "600M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "1",
            "spark.driver.cores": "1",
        },
        executor_path="/databricks/python3/bin/python",
        applications_path="dbfs:///FileStore/tables/entrypoint.py",
        databricks_conf={
            "run_name": "flytekit databricks plugin example",
            "new_cluster": {
                "spark_version": "13.3.x-scala2.12",
                "node_type_id": "m6i.large",  # TODO: test m6i.large, i3.xlarge
                "num_workers": 3,
                "aws_attributes": {
                    "availability": "ON_DEMAND",
                    "instance_profile_arn": "arn:aws:iam::245085526351:role/dcp-mlp-mlp-instance-profile",
                    "ebs_volume_type": "GENERAL_PURPOSE_SSD",
                    "ebs_volume_count": 1,
                    "ebs_volume_size": 100,
                },
            },
            "timeout_seconds": 3600,
            "max_retries": 1,
        },
        databricks_instance="<http://wbd-dcp-mlp-dev.cloud.databricks.com|wbd-dcp-mlp-dev.cloud.databricks.com>",
    ),
    limits=Resources(mem="2000M"),
    # container_image=image,
    container_image="<http://876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte:0.2.0-pr-59-9bcc9043|876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte:0.2.0-pr-59-9bcc9043>"
)
The role arn is good.
k
nice, the format is not correct. should look like this. arnawsiam:546011168256instance-profile/databricks-demo
image.png
settings -> security
f
Thanks. Now I got
Copy code
Workflows

Runs

flytekit databricks plugin example run 
Output

run failed with error message
 Unexpected user error while preparing the cluster for the job. Cause: INVALID_PARAMETER_VALUE: Custom containers is turned off for your deployment. Please contact your workspace administrator to use this feature.
k
need to Enable custom containers
Copy code
curl -X PATCH -n -H "Authorization: Bearer <your-personal-access-token>" \
https://<databricks-instance>/api/2.0/workspace-conf \
-d '{"enableDcs": "true"}'
so you are able to use custom image for your spark task
f
I did
Do I need to replace api/2.0 with /api/2.1 in above command?
k
agent use 2.1 already
Oh, you mean workspace config
Yes, you can try it
f
still not able to use custom container.
k
Ask your infra maybe. They need to enable it
f
I just did
Thanks for the help!
k
btw, I think we can’t disable ssl by default. are you able to sign your certificate? https://stackoverflow.com/a/32579218