This message was deleted.
# databricks-integration
u
This message was deleted.
h
How are you authenticating to databricks?
h
created a class where there is a databricks connecter function is there https://docs.databricks.com/dev-tools/python-sql-connector.html#language-Cluster
h
@handsome-noon-32363, the exception is coming from within databricks sql client:
Copy code
File "/var/lib/jenkins/.local/lib/python3.7/site-packages/databricks/sql/client.py", line 100, in __init__
    raise ValueError("No valid authentication settings. Please provide an access token.")
I'm assuming you're using the connection as described in the link you mentioned, right? So in one of your tasks you have something along the lines of:
Copy code
from databricks import sql
import os

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                         http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                         access_token    = os.getenv("DATABRICKS_TOKEN"))
How are you defining those environment variables in your task?
h
from dataclasses import dataclass from typing import Dict, Optional, Type from flytekit.configuration import SerializationSettings from flytekit.extend import SQLTask from flytekit.models import task as _task_model from flytekit.types.schema import FlyteSchema _SERVER_HOSTNAME_FIELD = "server_hostname" _HTTP_PATH = "http_path" _ACCESS_TOKEN = "access_token" _WAREHOUSE_FIELD = "warehouse" @dataclass class DatabricksConfig(object): server_hostname: Optional[str] = None http_path: Optional[str] = None access_token: Optional[str] = None warehouse: Optional[str] = None class DatabricksTask(SQLTask[DatabricksConfig]): _TASK_TYPE = "databricks" def __init__( self, name: str, query_template: str, task_config: Optional[DatabricksConfig] = None, inputs: Optional[Dict[str, Type]] = None, output_schema_type: Optional[Type[FlyteSchema]] = None, **kwargs, ): outputs = { "results": output_schema_type, } if task_config is None: task_config = DatabricksConfig() super().__init__( name=name, task_config=task_config, query_template=query_template, inputs=inputs, outputs=outputs, task_type=self._TASK_TYPE, **kwargs, ) self._output_schema_type = output_schema_type def get_config(self, settings: SerializationSettings) -> Dict[str, str]: return { _SERVER_HOSTNAME_FIELD: self.task_config.server_hostname, _HTTP_PATH: self.task_config.http_path, _ACCESS_TOKEN: self.task_config.access_token, _WAREHOUSE_FIELD: self.task_config.warehouse, } def get_sql(self, settings: SerializationSettings) -> Optional[_task_model.Sql]: with sql.connect(server_hostname = os.getenv("XXXXXXX"), http_path = os.getenv("XXXXXX"), access_token = os.getenv("XXXXXXX")) as connection: sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI) return sql
h
ok. Next question is how are you defining those env vars in your container? Or this is you running locally?
h
i am first trying in my local
h
ok, so two questions: 1. are you able to authenticate with databricks? (for example, are you able to execute that
with
statement in the definition of
get_sql
separately in a python repl?) 2. In case the answer to 1. is yes, can you double-check that the env vars are actually set to the right value in the context of a flyte task?
h
when i am creating the flyte task and creating the wf the following error is coming raise ValueError("No valid authentication settings. Please provide an access token.") ValueError: No valid authentication settings. Please provide an access token.
h
ok, and what happens if you just run this code snippet?
Copy code
from databricks import sql
import os

connection = sql.connect(server_hostname = os.getenv("XXXXXXX"),
                 http_path       = os.getenv("XXXXXX"),
                 access_token    = os.getenv("XXXXXXX"))
h
yes, that is working