https://flyte.org logo
k

komal azram

03/22/2022, 3:47 PM
Hi I am trying to return a spark data frame but having this issue _user_exceptions.FlyteAssertion( flytekit.exceptions.user.FlyteAssertion: When calling tasks, only keyword args are supported. Aborting execution as detected 1 positional args (FlyteSchema.__class_getitem__.<locals>._TypedSchema(remote_path='/tmp/flyte/20220322_204000/raw/82eaa7c6b77680e044d794beaf4e01cb'),) I have defined column names with their data types as my_schema = FlyteSchema[kwtypes(ID=str,START_T=datetime.datetime,STOP=datetime.datetime,PATIENT=str,ENCOUNTER=str,CODE=str,intDESCRIPTION=str,REASONCODE=int,REASONDESCRIPTION=str)]. Following this https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_spark/dataframe_passing.html guide
k

Ketan (kumare3)

03/22/2022, 3:51 PM
Can you share a code snippet
k

komal azram

03/22/2022, 3:53 PM
Copy code
my_schema = FlyteSchema[kwtypes(ID=str,START_T=datetime.datetime,STOP=datetime.datetime,PATIENT=str,ENCOUNTER=str,CODE=str,intDESCRIPTION=str,REASONCODE=int,REASONDESCRIPTION=str)]
@task
def spark_start()-> my_schema:

    spark = SparkSession.builder \
        .master("local") \
        .appName("snowflake-test") \
        .config('spark.jars', 'jar/snowflake-jdbc-3.13.15.jar,jar/spark-snowflake_2.12-2.10.0-spark_3.2.jar') \
        .getOrCreate()
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "SELECT * FROM CAREPLANS") \
    .load()
return df
k

Ketan (kumare3)

03/22/2022, 3:54 PM
This is not the problem
The workflow
k

komal azram

03/22/2022, 3:56 PM
haven't assigned workflow to it yet
y

Yee

03/22/2022, 4:16 PM
how are you calling this task?
can you paste the code where you call it ()?
k

komal azram

03/22/2022, 4:17 PM
Copy code
if __name__ == "__main__":
    df=spark_start()
    df_expect(df)
y

Yee

03/22/2022, 4:18 PM
df_expect(df=df)
k

komal azram

03/22/2022, 4:21 PM
AttributeError: '_TypedSchema' object has no attribute 'persist'
getting this error now
y

Yee

03/22/2022, 4:29 PM
can you show the whole stack trace?
k

komal azram

03/22/2022, 4:33 PM
{"asctime": "2022-03-22 212035,124", "name": "flytekit", "levelname": "ERROR", "message": "Exception when executing '_TypedSchema' object has no attribute 'persist'", "exc_info": "Traceback (most recent call last):\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py\", line 478, in dispatch_execute\n native_outputs = self.execute(**native_inputs)\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/python_function_task.py\", line 160, in execute\n return exception_scopes.user_entry_point(self._task_function)(**kwargs)\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py\", line 198, in user_entry_point\n return wrapped(*args, **kwargs)\n File \"/home/komal/PycharmProjects/spark-ge/main.py\", line 46, in df_expect\n batch=SparkDFDataset(df)\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/great_expectations/dataset/sparkdf_dataset.py\", line 617, in __init__\n self.spark_df.persist()\nAttributeError: '_TypedSchema' object has no attribute 'persist'"} Traceback (most recent call last): File "/home/komal/PycharmProjects/spark-ge/main.py", line 64, in <module> df_expect(df=df) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 288, in call return flyte_entity_call_handler(self, *args, **kwargs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/promise.py", line 991, in flyte_entity_call_handler result = cast(LocallyExecutable, entity).local_execute(child_ctx, **kwargs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 269, in local_execute outputs_literal_map = self.dispatch_execute(ctx, input_literal_map) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 481, in dispatch_execute raise e File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 478, in dispatch_execute native_outputs = self.execute(**native_inputs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/python_function_task.py", line 160, in execute return exception_scopes.user_entry_point(self._task_function)(**kwargs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 198, in user_entry_point return wrapped(*args, **kwargs) File "/home/komal/PycharmProjects/spark-ge/main.py", line 46, in df_expect batch=SparkDFDataset(df) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/great_expectations/dataset/sparkdf_dataset.py", line 617, in init self.spark_df.persist() AttributeError: '_TypedSchema' object has no attribute 'persist'
y

Yee

03/22/2022, 9:39 PM
@Samhita Alla do you think you could help take a look? i’m not as familiar with great expectations code.
s

Samhita Alla

03/23/2022, 4:15 AM
Sure! @komal azram, would you mind sharing me the
df_expect()
task code?
I think you may have to call
open()
on your spark dataframe within the
df_expect()
code, something like
df.open(pyspark.DataFrame).all()
k

komal azram

03/23/2022, 5:43 AM
Copy code
def df_expect(df:my_schema) -> str:

    


    batch=SparkDFDataset(df)


    ID_Null=batch.expect_column_values_to_not_be_null("ID")
ID_res=print("Null values not found" if ID_Null.success else "NUll values")
return ID_res
s

Samhita Alla

03/23/2022, 5:58 AM
Can you replace
batch=SparkDFDataset(df)
with
batch=SparkDFDataset(df.open(pyspark.DataFrame).all())
?
k

komal azram

03/23/2022, 6:06 AM
AttributeError: module 'pyspark' has no attribute 'dataframe'
s

Samhita Alla

03/23/2022, 6:11 AM
Sorry, can you try
pyspark.sql.DataFrame
, @User?
k

komal azram

03/23/2022, 6:23 AM
AssertionError: SPARK_SESSION not available as a parameter in Flyte context - are you in right task-type?
s

Samhita Alla

03/23/2022, 6:26 AM
Are you using
task_config=Spark()
for your task? Example:
Copy code
@task(
    task_config=Spark(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        }
    ),
    cache_version="1",
)
def create_spark_df() -> my_schema:
    ...
k

komal azram

03/23/2022, 6:27 AM
No
is it mandatory?
s

Samhita Alla

03/23/2022, 6:29 AM
Oh, you’ll have to, to initialize the Spark session. You can see https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_spark/dataframe_passing.html example, the one you’re already referring to.
k

komal azram

03/23/2022, 6:43 AM
Thankyou @Samhita Alla
Hey I have added task_config variables as mentioned in the tutorials but it gives following error
raise AssertionError(f"{attr_name} not available as a parameter in Flyte context - are you in right task-type?") AssertionError: SPARK_SESSION not available as a parameter in Flyte context - are you in right task-type?
In my use case I have to build a spark session and connect snowflake and spark to query data from snowflake
I have added the task_configuration in the task where I am starting the spark session
s

Samhita Alla

03/23/2022, 9:43 AM
This error has to do with configuring Spark. Can you paste the stacktrace? I’d like to see where exactly this error is popping up.
k

komal azram

03/23/2022, 9:46 AM
{"asctime": "2022-03-23 144000,136", "name": "flytekit", "levelname": "ERROR", "message": "Exception when executing SPARK_SESSION not available as a parameter in Flyte context - are you in right task-type?", "exc_info": "Traceback (most recent call last):\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py\", line 478, in dispatch_execute\n native_outputs = self.execute(**native_inputs)\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/python_function_task.py\", line 160, in execute\n return exception_scopes.user_entry_point(self._task_function)(**kwargs)\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py\", line 198, in user_entry_point\n return wrapped(*args, **kwargs)\n File \"/home/komal/PycharmProjects/spark-ge/main.py\", line 58, in df_expect\n batch = SparkDFDataset(df.open(spark.sql.DataFrame).all())\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekitplugins/spark/schema.py\", line 27, in all\n return ctx.spark_session.read.parquet(self.from_path)\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/context_manager.py\", line 313, in __getattr__\n raise AssertionError(f\"{attr_name} not available as a parameter in Flyte context - are you in right task-type?\")\nAssertionError: SPARK_SESSION not available as a parameter in Flyte context - are you in right task-type?"} Traceback (most recent call last): File "/home/komal/PycharmProjects/spark-ge/main.py", line 81, in <module> df_expect(df=df) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 288, in call return flyte_entity_call_handler(self, *args, **kwargs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/promise.py", line 991, in flyte_entity_call_handler result = cast(LocallyExecutable, entity).local_execute(child_ctx, **kwargs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 269, in local_execute outputs_literal_map = self.dispatch_execute(ctx, input_literal_map) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 481, in dispatch_execute raise e File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 478, in dispatch_execute native_outputs = self.execute(**native_inputs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/python_function_task.py", line 160, in execute return exception_scopes.user_entry_point(self._task_function)(**kwargs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 198, in user_entry_point return wrapped(*args, **kwargs) File "/home/komal/PycharmProjects/spark-ge/main.py", line 58, in df_expect batch = SparkDFDataset(df.open(spark.sql.DataFrame).all()) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekitplugins/spark/schema.py", line 27, in all return ctx.spark_session.read.parquet(self.from_path) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/context_manager.py", line 313, in getattr raise AssertionError(f"{attr_name} not available as a parameter in Flyte context - are you in right task-type?") AssertionError: SPARK_SESSION not available as a parameter in Flyte context - are you in right task-type? Process finished with exit code 1
s

Samhita Alla

03/23/2022, 9:51 AM
Are you running this locally or in a configured Flyte environment with Spark backend?
Can you also share me the whole code, wherever you’re initializing the task config?
k

komal azram

03/23/2022, 9:56 AM
running it locally
give me a min
Copy code
import datetime

from flytekit import task, kwtypes
from typing import Tuple
import pyspark as spark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from great_expectations.dataset import SparkDFDataset
from flytekitplugins.spark import Spark
from flytekit.types.schema import FlyteSchema




my_schema = FlyteSchema[kwtypes(ID=str,START_T=datetime.datetime,STOP=datetime.datetime,PATIENT=str,ENCOUNTER=str,CODE=str,intDESCRIPTION=str,REASONCODE=int,REASONDESCRIPTION=str)]
@task(
task_config=Spark(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        }
    ),
    cache_version="1")
def spark_start()-> my_schema:

    spark = SparkSession.builder \
        .master("local") \
        .appName("snowflake-test") \
        .config('spark.jars', 'jar/snowflake-jdbc-3.13.15.jar,jar/spark-snowflake_2.12-2.10.0-spark_3.2.jar') \
        .getOrCreate()


f = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
        .options(**sfOptions) \
        .option("query", "SELECT * FROM CAREPLANS") \
        .load()
    return df
@task

def df_expect(df:my_schema) -> str:

    #df1 =df.open(df).all()
    batch = SparkDFDataset(df.open(spark.sql.DataFrame).all())




    #batch=SparkDFDataset(df1)


    ID_Null=batch.expect_column_values_to_not_be_null("ID")
    ID_res=print("Null values not found" if ID_Null.success else "NUll values")

    CODE_Null=batch.expect_column_values_to_not_be_null("CODE")
    CODE_res=print("Null values not found" if CODE_Null.success else "NUll values")

    DESCRIPTION_NUll=batch.expect_column_values_to_not_be_null("DESCRIPTION")
    DESCRIPTION_res=print("Null values not found" if DESCRIPTION_NUll.success else "NUll values")

    ID_Unique= batch.expect_column_values_to_be_unique("ID")
    Unique_res=print("Column has Unique values" if ID_Unique.success else "duplicate values")
    return ID_res,CODE_res,DESCRIPTION_res,Unique_res

if __name__ == "__main__":
    df=spark_start()
    df_expect(df=df)
s

Samhita Alla

03/23/2022, 10:07 AM
In the
spark_start
task, can you replace
SparkSession
with
sess
Copy code
sess = flytekit.current_context().spark_session
k

komal azram

03/24/2022, 9:26 AM
Copy code
def spark_start()-> my_schema:
    

    spark = flytekit.current_context().sparkSession.builder \
        .master("local") \
        .appName("snowflake-test") \
        .config('spark.jars', 'jar/snowflake-jdbc-3.13.15.jar,jar/spark-snowflake_2.12-2.10.0-spark_3.2.jar') \
        .getOrCreate(
you mean like that
s

Samhita Alla

03/24/2022, 9:35 AM
Yes!
Copy code
sess = flytekit.current_context().spark_session 
    spark = sess.builder...
k

komal azram

03/24/2022, 10:46 AM
sertionError: SPARK_SESSION not available as a parameter in Flyte context - are you in right task-type?
still getting same error
s

Samhita Alla

03/24/2022, 10:54 AM
Not sure why that’s happening. Can you share me your code once again?
Can you merge both of them into a single task? I think because we’re trying to open our dataframe through the Spark mode, it needs that session to be available.
k

komal azram

03/24/2022, 11:00 AM
ok
s

Samhita Alla

03/24/2022, 11:04 AM
Thank you being patient. Let me know after you run your modified code.
k

komal azram

03/24/2022, 11:33 AM
TypeError: Failed to convert return value for var o0 for function main.spark_start with error <class 'AssertionError'>: Python value cannot be None, expected <class 'str'>/simple: STRING I got this error when I have returned one variable that contains print statement. and when I have returned all three variables it gives following error. TypeError: Output(o0) in task__main__.spark_start received a tuple (None, None, None, None), instead of <class 'str'>
although I am getting results
s

Samhita Alla

03/24/2022, 12:34 PM
@komal azram, in the
df_expect
task, the output datatype is
str
, when it has to be a tuple with four values, like
Tuple(str, ..., ..., ...)
. You can use NamedTuple to assign names to the outputs (https://docs.flyte.org/projects/cookbook/en/latest/auto/core/flyte_basics/named_outputs.html#sphx-glr-auto-core-flyte-basics-named-outputs-py). Also, rather than returning print statements, you’ll have to return the actual value cause
print
doesn’t return anything.
k

komal azram

03/24/2022, 3:53 PM
it's working now
but when I add workflow it gives following error TypeError: __new__() missing 3 required positional arguments: 'CODE_res', 'DESCRIPTION_res', and 'Unique_res'
Copy code
wf_outputs = typing.NamedTuple( "op2",ID_res=str,CODE_res=str,DESCRIPTION_res=str,Unique_res=str)
@workflow
def my_wf() -> wf_outputs:
    return wf_outputs(spark_start)
if __name__ == "__main__":

    print(my_wf)
s

Samhita Alla

03/24/2022, 4:39 PM
Can you call the
spark_start
task like
return wf_outputs(spark_start())
? Also, you can unbox the outputs, like:
Copy code
def my_wf() -> wf_outputs:
    task_exec = spark_start()
    return wf_outputs(task_exec.ID_res, task_exec.CODE_rest)
considering, your task is returning the same NamedTuple.
k

komal azram

03/25/2022, 8:52 AM
Thankyou for helping @Samhita Alla it's working now.
🚀 2
👍 2
25 Views