Hi, Have a quick question regarding the databricks...
# ask-the-community
a
Hi, Have a quick question regarding the databricks plugin. Is there a way for us to retrieve the databricks job cluster id from a task and pass it to the next task in the workflow so we can reuse the cluster already created rather than creating a cluster in every task
k
Is this using the agent?
I love the idea, but don’t know what it should look like. Please propose
a
this is using the built-in plugin but we would want to do this on the agent as well
but we can start with the built-in plugin
k
In a month we want to transition to agent completely as it allows local execution and ease of development
Would you be open to jumping on a call to discuss
a
sure can we do it tomorrow?
central time
k
let me ping you
k
you want createDatabricksClusterOperator and deleteDatabricksClusterOperator I guess, so that you can use it in your pipeline.
a
@Kevin Su are you referring to airflow operators above?
k
no, I mean we should probably add those operator in flytekit
a
yeah got it
@Ketan (kumare3) @Kevin Su I was putting a little bit of thought into this. I will describe the problem again just to refresh your memory: The concern today while using databricks is that when we move from one task to another in our Flyte workflow, the job cluster that is created on databricks terminates and gets recreated for the next task which adds cluster creation/start up time to every task in the workflow. Databricks job clusters allow a single task to be executed on the cluster and upon termination of the task/cluster, there is no way for us to restart that cluster or databricks does not provide a mechanism to restart a terminated cluster. This is possible only with interactive clusters on databricks which is generally not suitable for production scale workflows and used only for interactive analysis using notebooks. Databricks recently came out with a solution for this problem with their new orchestration capability that allows multiple tasks to be executed on the same cluster (meaning databricks provides a way to run multiple tasks before the cluster is terminated). Please read the document: https://www.databricks.com/blog/2022/02/04/saving-time-and-costs-with-cluster-reuse-in-databricks-jobs.html I am asking if we can achieve something similar in Flyte by modifying the plugin to allow multiple tasks to be executed on databricks. How do we execute multiple tasks on the same cluster before the cluster is terminated on the databricks side
k
cc @Ketan (kumare3)
@anantharaman janakiraman one question. do you want to share the same cluster across tasks in the same workflow? or do you want to share the cluster across two different workflows?
k
@anantharaman janakiraman they are using some internal mechanism that is not exposed AFAICT. But you can potentially achieve the same using following Create a workflow that creates a cluster and you pass the cluster id to all steps of the workflow
Copy code
delete_cluster(cluster_id: str):
  ...

@workflow(on_failure=delete_cluster())
def wf(cluster_id: str):
   create_cluster()
   do_job_1(cluster_id)
   do_job_2(cluster_id)
   delete_cluster(cluster_id)
a
@Kevin Su As a first step I want share the same cluster across multiple tasks in the same workflow. That would be a good first step
k
or, use auto_termination and set it to a few minutes. And then in agent, 1. check if cluster with a specific name exists, if it does, then launch job, 2. if not then launch job with create cluster set
the problem is - cluster-name is not unique in databricks
so we will have to name the cluster with
project-domain-execution-id
a
@Ketan (kumare3) I am not sure if that is possible. Databricks job clusters are different from the interactive ones (all purpose clusters). When you launch job on DBX, the cluster is created and I do not think you can keep the cluster running after the job terminates or you cannot restart the cluster using cluster id
how are we launching the job on Databricks via the plugin?
k
@anantharaman janakiraman check this - https://docs.databricks.com/api/workspace/jobs/create when you launch job you can give it
job_clusters
and tghen you can set the
new_cluster
which allows setting
autotermination_minutes
a
@Ketan (kumare3) I also read the API reference to check if there is provision to pass "job_clusters" in this API to reuse the same cluster across multiple tasks of job But upon reading comments from the people in databricks forum, you would get an error like "Shared job cluster feature is not supported in runs/submit API." Shared job cluster for jobs/runs/submit API is not supported at the moment.
@Kevin Su We are using Jobs 2.0 or Jobs 2.1 API?
k
2.1
a
2.0 doesn't support multi-task execution but 2.1 should (not sure why people are complaining on 2.1). This is a sample payload for 2.1 with job_clusters specification like @Ketan (kumare3) was suggesting and I got it from a test run through the UI
Copy code
{
  "job_id": 53,
  "settings": {
    "name": "A job with multiple tasks",
    "email_notifications": {},
    "timeout_seconds": 0,
    "max_concurrent_runs": 1,
    "job_clusters": [
      {
        "job_cluster_key": "default_cluster",
        "new_cluster": {
          "spark_version": "7.3.x-scala2.12",
          "node_type_id": "i3.xlarge",
          "spark_conf": {
            "spark.speculation": true
          },
          "aws_attributes": {
            "availability": "SPOT",
            "zone_id": "us-west-2a"
          },
          "autoscale": {
            "min_workers": 2,
            "max_workers": 8
          }
        }
      },
      {
        "job_cluster_key": "data_processing_cluster",
        "new_cluster": {
          "spark_version": "7.3.x-scala2.12",
          "node_type_id": "r4.2xlarge",
          "spark_conf": {
            "spark.speculation": true
          },
          "aws_attributes": {
            "availability": "SPOT",
            "zone_id": "us-west-2a"
          },
          "autoscale": {
            "min_workers": 8,
            "max_workers": 16
          }
        }
      }
    ],
    "tasks": [
      {
        "task_key": "ingest_orders",
        "description": "Ingest order data",
        "depends_on": [],
        "job_cluster_key": "auto_scaling_cluster",
        "spark_jar_task": {
          "main_class_name": "com.databricks.OrdersIngest",
          "parameters": [
            "--data",
            "dbfs:/path/to/order-data.json"
          ]
        },
        "libraries": [
          {
            "jar": "dbfs:/mnt/databricks/OrderIngest.jar"
          }
        ],
        "timeout_seconds": 86400,
        "max_retries": 3,
        "min_retry_interval_millis": 2000,
        "retry_on_timeout": false
      },
      {
        "task_key": "clean_orders",
        "description": "Clean and prepare the order data",
        "notebook_task": {
          "notebook_path": "/Users/user@databricks.com/clean-data"
        },
        "job_cluster_key": "default_cluster",
        "max_retries": 3,
        "min_retry_interval_millis": 0,
        "retry_on_timeout": true,
        "timeout_seconds": 3600,
        "email_notifications": {}
      },
      {
        "task_key": "analyze_orders",
        "description": "Perform an analysis of the order data",
        "notebook_task": {
          "notebook_path": "/Users/user@databricks.com/analyze-data"
        },
        "depends_on": [
          {
            "task_key": "clean_data"
          }
        ],
        "job_cluster_key": "data_processing_cluster",
        "max_retries": 3,
        "min_retry_interval_millis": 0,
        "retry_on_timeout": true,
        "timeout_seconds": 3600,
        "email_notifications": {}
      }
    ],
    "format": "MULTI_TASK"
  },
  "created_time": 1625841911296,
  "creator_user_name": "<mailto:user@databricks.com|user@databricks.com>",
  "run_as_user_name": "<mailto:user@databricks.com|user@databricks.com>"
}
Through the plugin I believe we are submitting a single task using the SINGLE_TASK format
k
Shared job cluster for jobs/runs/submit API is not supported at the moment. (edited)
if that’s the case, we need to add a task to create spark cluster first. need to playaround with it to know what they support right now.
yes, we use SINGLE_TASK format
a
multi-task format with job_cluster specification is what we need
I am able to run the job on databricks using 2.1 API with the above payload
k
but what if you have a workflow like?
Copy code
dbx task -> python task on k8s -> dbx task
a
that will have a downtime for cluster creation for sure and there is no way around it. This will solve the majority of cases where sequential tasks are running on databricks and trying to use the same cluster
k
so how does your workflow look like
Copy code
dbx -> dbx -> dbx

or 
              dbx
           / 
start-node -- dbx
           \
             dbx
a
it is a combination of both. We are not allowing users to run tasks on k8s for now and they are being asked to run their tasks only on dbx (except for the dynamic workflow task). The second pattern above in most cases will require separate clusters to be created or I could run them in parallel if they are not resource intensive
k
If dependencies are the same, we should run them in the same cluster, but not sure if it’s doable on dbx
a
we could...I just have to pass an empty array for depends_on in the multi-task payload above in which case it will run both the tasks concurrently on the same cluster
and of course use the same job_cluster_key
so @Kevin Su how do we make multi task format work with flyte
@Ketan (kumare3) kindly help
k
I don’t understand what is multi-task
a
multi-task format on Jobs 2.1 api allows you to pass mutliple tasks in the same job via task setting and also allows you to use the same or different clusters for the individual tasks in the task list.
@Kevin Su is saying that we are using a single task format for submitting the jobs to databricks through the plugin which means databricks will create a task level cluster that starts when the task starts and terminates when the task completes thereby not allowing you to reuse the cluster for subsequent tasks in the workflow
k
Flyte does not have the capability of fusing tasks
a
@Ketan (kumare3) @Kevin Su I tried to create a job cluster with autotermination parameter but unfortunately like I suspected I got an error that says "Automated clusters do not support autotermination"
so I think this is a major road block at this point in using databricks through flyte
k
This is sad
a
There is one other dirty option - I can try and run a notebook on a job cluster with a script that doesn't terminate so the cluster stays active and then see if I can use the cluster for another task 🙂
k
Ohh my god
a
I know but at least can confirm if there is a possibility to reuse the cluster
k
Yes
a
@Ketan (kumare3) and @Kevin Su running a sleep script and keeping the cluster alive and then trying to reuse the cluster using the cluster_id is not an option too. Databricks throws an exception stating existing_cluster_id parameter can be used only on all-purpose clusters and not on job clusters. So that door is also closed 🙂
k
OMG