https://flyte.org logo
Title
m

Mehtab Mehdi

10/27/2022, 1:35 PM
This is our Dockerfile FROM python:3.8-slim-buster WORKDIR /root ENV VENV /opt/venv ENV LANG C.UTF-8 ENV LC_ALL C.UTF-8 ENV PYTHONPATH /root RUN apt-get update && apt-get install -y build-essential # Install the AWS cli separately to prevent issues with boto being written over RUN pip3 install awscli # Similarly, if you're using GCP be sure to update this command to install gsutil # RUN curl -sSL https://sdk.cloud.google.com | bash # ENV PATH="$PATH:/root/google-cloud-sdk/bin" ENV VENV /opt/venv # Virtual environment RUN python3 -m venv ${VENV} ENV PATH="${VENV}/bin:$PATH" # Install Python dependencies COPY ./requirements.txt /root COPY ./flyte_db_plugin.py /root COPY ./myfly.py /root RUN pip install -r /root/requirements.txt # Copy the actual code COPY . /root # This tag is supplied by the build script and will be used to determine the version # when registering tasks, workflows, and launch plans ARG tag ENV FLYTE_INTERNAL_IMAGE $tag
s

Samhita Alla

10/28/2022, 3:54 AM
@Mehtab Mehdi, are you using any custom images in your tasks?
m

Mehtab Mehdi

10/28/2022, 8:16 AM
Hi @Samhita Alla This error is resolved after installing the flytekit and databricks-sql-connector, but now assertion error is coming Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point return wrapped(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/flytekit/core/base_task.py", line 472, in dispatch_execute native_inputs = TypeEngine.literal_map_to_kwargs(exec_ctx, input_literal_map, self.python_interface.inputs) File "/usr/local/lib/python3.8/site-packages/flytekit/core/type_engine.py", line 800, in literal_map_to_kwargs return {k: TypeEngine.to_python_value(ctx, lm.literals[k], python_types[k]) for k, v in lm.literals.items()} File "/usr/local/lib/python3.8/site-packages/flytekit/core/type_engine.py", line 800, in <dictcomp> return {k: TypeEngine.to_python_value(ctx, lm.literals[k], python_types[k]) for k, v in lm.literals.items()} File "/usr/local/lib/python3.8/site-packages/flytekit/core/type_engine.py", line 764, in to_python_value return transformer.to_python_value(ctx, lv, expected_python_type) File "/usr/local/lib/python3.8/site-packages/flytekit/types/structured/structured_dataset.py", line 682, in to_python_value return self.open_as(ctx, lv.scalar.structured_dataset, df_type=expected_python_type, updated_metadata=metad) File "/usr/local/lib/python3.8/site-packages/flytekit/types/structured/structured_dataset.py", line 717, in open_as result = decoder.decode(ctx, sd, updated_metadata) File "/usr/local/lib/python3.8/site-packages/flytekit/types/structured/basic_dfs.py", line 59, in decode ctx.file_access.get_data(path, local_dir, is_multipart=True) File "/usr/local/lib/python3.8/site-packages/flytekit/core/data_persistence.py", line 435, in get_data raise FlyteAssertion( Message: Failed to get data from /tmp/flyte-zpvn_96g/raw/c66a42cfaa9dcbef8ce353f7997c427a to /tmp/flyte_vs8m701/local_flytekit/6b40c975d4003184400cad2843cca87f (recursive=True). Original exception: cannot copy tree '/tmp/flyte-zpvn_96g/raw/c66a42cfaa9dcbef8ce353f7997c427a': not a directory User error.
s

Samhita Alla

10/28/2022, 10:26 AM
Can you share with me the task definition?
m

Mehtab Mehdi

10/28/2022, 12:21 PM
import typing import pandas as pd import numpy as np #from databricks import sql import os from flytekit import task, workflow import flyte_db_plugin as fdp result= fdp.DatabricksTask("","").get_sql("select * from student") @task def compute_result(df:pd.DataFrame)-> pd.DataFrame: return df @workflow def wf()->pd.DataFrame: # df = pd.DataFrame(result) return (compute_result(df=result)) if name == "__main__": print(wf())
This is our workflow file where we are using flyte_databricks_plugin
s

Samhita Alla

10/28/2022, 12:50 PM
What’s the value of result? Is result getting populated?
m

Mehtab Mehdi

10/28/2022, 12:52 PM
result is giving the data in the local but UI is not popping up
s

Samhita Alla

10/28/2022, 12:54 PM
Not sure what’s happening. Is the custom task giving that error?
m

Mehtab Mehdi

10/28/2022, 12:55 PM
i wrote the plugin custom file, that plugin i am using in this workflow file.
s

Samhita Alla

10/28/2022, 12:58 PM
Yeah, but what’s giving you the error? If it’s because of the plugin, it could be an issue with it.
m

Mehtab Mehdi

10/28/2022, 1:00 PM
from dataclasses import dataclass from typing import Dict, Optional, Type from flytekit.configuration import SerializationSettings from flytekit.extend import SQLTask from flytekit.models import task as _task_model from flytekit.types.schema import FlyteSchema from databricks import sql import os import pandas as pd _SERVER_HOSTNAME_FIELD = "server_hostname" _HTTP_PATH = "http_path" _ACCESS_TOKEN = "access_token" _WAREHOUSE_FIELD = "warehouse" @dataclass class DatabricksConfig(object): server_hostname: Optional[str] = None http_path: Optional[str] = None access_token: Optional[str] = None warehouse: Optional[str] = None class DatabricksTask(SQLTask[DatabricksConfig]): _TASK_TYPE = "databricks" def __init__( self, name: str, query_template: str, task_config: Optional[DatabricksConfig] = None, inputs: Optional[Dict[str, Type]] = None, output_schema_type: Optional[Type[FlyteSchema]] = None, **kwargs, ): outputs = { "results": output_schema_type, } if task_config is None: task_config = DatabricksConfig() super().__init__( name=name, task_config=task_config, query_template=query_template, inputs=inputs, outputs=outputs, task_type=self._TASK_TYPE, **kwargs, ) self._output_schema_type = output_schema_type def get_config(self, settings: SerializationSettings) -> Dict[str, str]: return { _SERVER_HOSTNAME_FIELD: self.task_config.server_hostname, _HTTP_PATH: self.task_config.http_path, _ACCESS_TOKEN: self.task_config.access_token, _WAREHOUSE_FIELD: self.task_config.warehouse, } def get_sql(self,query): # sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI) with sql.connect(server_hostname = XXXXXX, http_path = XXXXXX, access_token =XXXXXX) as connection: with connection.cursor() as cursor: cursor.execute(query) result = cursor.fetchall() colnames = [desc[0] for desc in cursor.description] df = pd.DataFrame(result, columns=colnames) return df
This is the plugin file
s

Samhita Alla

10/28/2022, 1:31 PM
`get_sql`’s signature should be along https://github.com/flyteorg/flytekit/blob/386626356ff2cb810733a29a51cf3d6351c8424d/plugins/flytekit-bigquery/flytekitplugins/bigquery/task.py#L81 signature, right? Why is there no
SerializationSettings
?
m

Mehtab Mehdi

10/28/2022, 1:43 PM
yeah i tried to use the serialization settings but still same error😌
s

Samhita Alla

10/28/2022, 2:15 PM
cc: @Eduardo Apolinario (eapolinario)