<@U0265RTUJ5B> <@USU6W5ATA> I can’t see your repli...
# ask-the-community
g
@Eduardo Apolinario (eapolinario) @Kevin Su I can’t see your replies to https://flyte-org.slack.com/archives/CP2HDHKE1/p1696954554555229 (it’s stuck on loading …) , so I am opening a new thread. Do you guys need me to open a bug report?
k
You can use discuss.Flyte.org
g
k
Cc @Kevin Su
Is there an agent for this now
g
agent ?
k
We are introducing a framework of writing all external service plugins in python - things like databricks, bigquery etc. they can be executed locally and in the remote run like a service with no pod overhead (like today) but simple to write https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/development_lifecycle/agent_service.html#extend-agent-service
g
thanks
will give it a try
which version of flyte is this - master or ?
@Han
l
k
Agent is available in 1.10, but the api might change slightly
1.9 too
l
Do you mean the flyteidl?
g
@Ketan (kumare3) I’ve tried to switch to flyteagent from the built-in plugins in propeller. I was able to start the agent service and point propeller to it. I am using flyte 1.10.0 by the way. However when I submit a databricks job to propeller which gets sent to the agent, instead of the agent contacting databricks and crafting an API call to start the job on a cluster, it starts it in a pod in the project-domain namespace. The pod is using this image cr.flyte.org/flyteorg/flytekit:py3.9-1.10.0 and the job is failing as the image does not have the flytekitplugins-spark python package installed. Seems wrong to me that the job is started in k8s and not in databricks. I believe I’ve used the correct configuration for the agent so I don’t know what I am doing wrong. Is there a way to troubleshoot this or perhaps someone can give me some guidance?
l
@Georgi Ivanov Maybe you can try to build a image with dockerfile or use imagespec feature. Here are the examples: https://github.com/pulls?q=is%3Apr+author%3AFuture-Outlier+archived%3Afalse+is%3Aclosed
k
@Georgi Ivanov it seems it’s not using agent but using regular container task execution
l
@Georgi Ivanov Can you provide the sandbox-configmap? I want to make sure you setup like this.
This is for the k8s spark cluster setup, and it is not for the agent-service.
g
@L godlike @Ketan (kumare3) I can definitely register the agent with the propeller and the propeller is able to submit jobs to it. The errors above where because I was not adding a proper task decorator while I was testing the agent. Anyway I reverted to the correct task decorator and the propeller is sending the job to the agent. several observations - in the agent service plugin, the databricks agent class expects the databricks_conf in a certain format, i.e it expects “new_cluster” stanza https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-spark/flytekitplugins/spark/agent.py#L36-L39. If “new_cluster” is not present it errors with “failed to create spark task with error ‘new_cluster’“. We can’t use “existing_cluster_id” for instance. - we need to pass “databricks_instance” as part of the task decorator. The databricks instance is read from the task context by propeller when it constucts the task template when it passes it to the agent. From what I’ve found out, there is no mention anywhere in the docs how to add the databricks instance to the agent config (in the propeller configmap). If we don’t do it the agent error out with “failed to create spark task with error ‘databricksInstance’” - passing a valid databricks cluster config (as taken from the databricks UI as json) to the “new_cluster” stanza in the “databricks_conf” block in the task decorator and submitting the job results in {“asctime”: “2023-11-01 180833,995”, “name”: “flytekit”, “levelname”: “ERROR”, “message”: “failed to create spark task with error Failed to create databricks job with error: Bad Request.“} . I’ve tried increasing the log level on the agent by adding this file to the agent /etc/flyte/config/config.yaml logger: level: 6 show-source: true and adding this env var to the agent - name: FLYTECTL_CONFIG value: /etc/flyte/config/config.yaml but I can’t see anything else apart from the above log entry. Can you give me some hints on: • how to test my job locally by submitting it to databricks from my laptop • how to increase the verbosity of the agent to see what’s happening • how to add databricks instance and the entrypoint py settings to the agent config (currently I don’t know how or where to specify the entrypoint py)
l
@Georgi Ivanov 1. For the config structure, you can found it here: https://docs.databricks.com/dev-tools/api/2.0/jobs.html 2. Does you think the file here should provide the URL? https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-spark/flytekitplugins/spark/agent.py#L36-L39 We had provided here, so I think it's not necessary to add 1 more URL, but maybe it is better to add 1 more reference here.
If you get the "Bad Request", it's probably because your task config is wrong, please check it.
k
@L godlike / @Kevin Su would love to see what is the api for databricks agent and cane we make it more type checked
l
For the verbosity, we have a PR here, but the FlyteIDL hasn't realeased yet, so please wait for it. https://github.com/flyteorg/flytekit/pull/1834
For the setting problem of entrypoint.py, you can refer this doc, they are the same in settings at Databricks platform, but different in Flyte Cluster. https://docs.flyte.org/en/latest/deployment/plugins/webapi/databricks.html#deployment-plugin-setup-webapi-databricks If you still face problem, feel free to list your problem again, we will try to help you.
g
@Ketan (kumare3) @L godlike this is what I’ve found out: - the databricks built-in plugin allows you run a databricks jobs using the jobs API and you can either use a new_cluster for the job or an existing cluster. - the databricks agent expends you to use a new_cluster, i.e. you can’t use an existing cluster. Both use the Databbricks Job API 2.0. This means we can’t create a new cluster as part of a job and attach a new policy to it (while creating). Our org has strict rules and each cluster must have a policy. Thus the databricks API call submitted by Flyte when creating a new_cluster fails with PERMISSION_DENIED. I am able to run databricks jobs with the built-in databricks plugin on existing_clusters though. Also if I use a service account with elevated permissions, I am not able to see the job as jobs 2.0 API does not provide for access control lists. There is nothing code-wise we can fix in Flyte to overcome this - if Databricks is setup to require cluster policies for job clusters this will fail as it is a limitation of the Jobs API 2.0. However Databricks Jobs API 2.1 is a solution for this. It allows specifying policy_id when creating a cluster. Furthermore, it allows specifying access_control_list when you submit the job. This means flyte no longer has to use databricks user PAT to submit jobs, but rather use a single databricks service principal PAT, and provide access_control_list stanza (which can be passed as part of the databricks_conf) via the task decorator to say who can do what to the job. The access_control_list stanza is pretty flexible - users, group, service principals and a plethora of access permissions. https://docs.databricks.com/api/workspace/jobs/submit https://docs.databricks.com/en/workflows/jobs/jobs-api-updates.html Migrating the current Agent to 2.1 job spec is pretty easy imho. However Job 2.1 provides multi-tasks configurations, which might or might not be useful to adopt in the agent (I don’t know much about flyte to say). Since flyte packages the whole workflow as a since task/job it might not be useful anyway.
l
Thanks a lot, will take a look and update here
g
do you want me to create a feature request ?
I also have a quick question regarding flyte agent. I watched the video where @Kevin Su explained that you can run your task with the AsyncAgentExecutorMixin class to mock the agent (i.e. run locally). However I believe this is only available for certain plugins like Bigquery and not the Databricks agent. Furthermore the spark flytekit plugin has a local mode, where it just a spark session using pyspark locally. It is possible to run a Spark task using the databricks locally?
Also, I’ve tried the DatabricksAgent example as shown here - https://flyte.org/blog/flyte-1-10-monorepo-new-agents-eager-workflows-and-more and it does not work for me as the DatabricksAgent class does not accept these settings. task_config=DatabricksAgent( spark_conf={ “spark.driver.memory”: “1000M”, “spark.executor.memory”: “1000M”, “spark.executor.cores”: “1", “spark.executor.instances”: “2", “spark.driver.cores”: “1", }, executor_path=“/databricks/python3/bin/python”, applications_path=“dbfs:/entrypoint.py”, databricks_conf={ “run_name”: “flytekit databricks plugin example”, “new_cluster”: { “spark_version”: “12.2.x-scala2.12”, “node_type_id”: “n2-highmem-4”, “num_workers”: 1, }, “timeout_seconds”: 3600, “max_retries”: 1, }, databricks_instance=os.environ[“DATABRICKS_HOST”], ), it did work with the Databricks class as decorator though, and since the propeller is configured to forward spark tasks to the agent-service in the end it was sent to Databricks. Am I doing something wrong here or perhaps the docs need updating ?
This is the video in question -

https://www.youtube.com/watch?v=nD98GQ-pyAE

. I would love to know how to run a flyte workflow on databricks from my laptop (bypassing flyte) for local development
l
Yes, you are right, currently databricks agent don't support for local.
k
Ohh does it need container
Then we can support
l
We will support 2.1 api and databricks local execution in this month.
g
thank you
l
@Georgi Ivanov Just create a PR here to solve support Databricks API 2.1 version. Would you like to try it? https://github.com/flyteorg/flytekit/pull/1935
Local Execution will be later, thanks for your patience.
k
@Georgi Ivanov qq: the reason you switch to flyte agent is that you want to run the databricks task locally, right? IIRC, you were using databricks plugin in Propeller, did you run into any issue?
l
@Georgi Ivanov Here's a solution for FlytePropeller https://github.com/flyteorg/flyte/pull/4361
@Georgi Ivanov either use flyteplugins->databricks or flyteplugins->agent->databricks These 2 are all fixed, not yet merged. If you are urgent, you can see PRs here. flyteplugins->databricks: https://github.com/flyteorg/flyte/pull/4361 flyteplugins->agent->databricks: https://github.com/flyteorg/flytekit/pull/1834 https://github.com/flyteorg/flytekit/pull/1935
g
@Kevin Su the databricks plugin in propeller works. However the reason we want to move the to agent is twofold. First, this is where the direction of using webapi plugins is going and we want to adopt that. This will speed up local development as well as people can test jobs locally using their tokens before moving them to flyte. Second, since the agent is written in python we can extend it, customize it if we need to. Currently we have added extra checks on the databricks side - like mandatory policy id for all new clusters. Instead of asking end users to add one with all their tasks for instance, we can add some logic to the agent to add one. And off course the access policies.
@L godlike thank you I will try them now
k
@Georgi Ivanov This is exactly what we are building the agent system for. But it is not yet fully mature
g
@Kevin Su can I ask what is the preferred way to submit databricks jobs?
I believe there are three ways to submit a databricks jobs: 1. create a new cluster and add as a minimum these libraries at cluster creation “libraries”: [ { “pypi”: { “package”: “flytekit==1.10.0" } }, { “pypi”: { “package”: “flytekitplugins-spark==1.10.0” } } ], 2. build a docker image from databricksruntime/standard docker image and add flytekit and flytekitplugins-spark as part of it. Use this image as a custom container when creating your job cluster 3. use the docker container from above but also serialize the workflow inside it and use it as a custom container for the databricks job cluster. What is the benefit of doing this anyway if flyte passes the workflow tarball via S3?
k
3
the benefit of adding the workflow code to the image is that flytekit won’t download the code from s3 when running the pod. it just run the code locally in the pod. People always copy the code to the image when running the workflow on production.
@Georgi Ivanov I created a pr to support submitting databricks from local, want to know your thought about it. The UX now
Copy code
pyflyte run spark.py wf  # Run spark in local python process
pyflyte run --raw-output-data-prefix <s3://saprk/output> spark.py wf  # We serialize the input and upload to s3, and trigger the databricks job.
The reason to do that because it’s easy to develop, and test agent locally. In addiction, you are able to run databricks task without flyte cluster. my question is should we use different flag? like --agent, --hybrid. Any feedback is appreciated
cc @Frank Shen ^^^
f
Hi @Kevin Su, That would be very nice to have especially when we can do this locally for spark tasks that runs on k8s spark operator by passing the AWS S3 creds locally like this. Is this how databricks plugin will work similarly locally?
Copy code
task_config=Spark(
        spark_conf={
            ....
            # The following is needed only when running spark task in dev's local PC. Also need to do this locally: export SPARK_LOCAL_IP="127.0.0.1"
            "spark.hadoop.fs.s3a.access.key": "aaa",
            "spark.hadoop.fs.s3a.secret.key": "bbb",
            "spark.hadoop.fs.s3a.session.token": "ccc",
        },
    ),
k
yes, when you run locally, flytekit submits a databricks job
That would be very nice to have especially when we can do this locally for spark tasks that runs on k8s spark operator by passing the AWS S3 creds locally like this.
so you want to run a spark workflow locally, but run the spark job on k8s?
is that what you want?
f
No. spark job on k8s is when it’s run remotely.
But we can still run the same workflow with a spark task using spark plugin locally by adding the s3 creds part locally.
I would like this to work for databricks plugin too.
k
yes, that’s exactly what I did in that PR
f
Awesome, thank you @Kevin Su!
k
Copy code
pyflyte run spark.py wf  # Run spark in local python process
pyflyte run --raw-output-data-prefix <s3://saprk/output> spark.py wf  # We serialize the input and upload to s3, and submit the databricks job.
is this command confused to you? or should we add new flag, like pyflyte run --hybrid, --agent, or something else
k
@Georgi Ivanov did you get a chance to try this^ what do you think?
g
Sorry I was away. I will check it today
thanks @Kevin Su