Hi I am trying to return a spark data frame but ha...
# announcements
k
Hi I am trying to return a spark data frame but having this issue _user_exceptions.FlyteAssertion( flytekit.exceptions.user.FlyteAssertion: When calling tasks, only keyword args are supported. Aborting execution as detected 1 positional args (FlyteSchema.__class_getitem__.<locals>._TypedSchema(remote_path='/tmp/flyte/20220322_204000/raw/82eaa7c6b77680e044d794beaf4e01cb'),) I have defined column names with their data types as my_schema = FlyteSchema[kwtypes(ID=str,START_T=datetime.datetime,STOP=datetime.datetime,PATIENT=str,ENCOUNTER=str,CODE=str,intDESCRIPTION=str,REASONCODE=int,REASONDESCRIPTION=str)]. Following this https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_spark/dataframe_passing.html guide
k
Can you share a code snippet
k
Copy code
my_schema = FlyteSchema[kwtypes(ID=str,START_T=datetime.datetime,STOP=datetime.datetime,PATIENT=str,ENCOUNTER=str,CODE=str,intDESCRIPTION=str,REASONCODE=int,REASONDESCRIPTION=str)]
@task
def spark_start()-> my_schema:

    spark = SparkSession.builder \
        .master("local") \
        .appName("snowflake-test") \
        .config('spark.jars', 'jar/snowflake-jdbc-3.13.15.jar,jar/spark-snowflake_2.12-2.10.0-spark_3.2.jar') \
        .getOrCreate()
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "SELECT * FROM CAREPLANS") \
    .load()
return df
k
This is not the problem
The workflow
k
haven't assigned workflow to it yet
y
how are you calling this task?
can you paste the code where you call it ()?
k
Copy code
if __name__ == "__main__":
    df=spark_start()
    df_expect(df)
y
df_expect(df=df)
k
AttributeError: '_TypedSchema' object has no attribute 'persist'
getting this error now
y
can you show the whole stack trace?
k
{"asctime": "2022-03-22 212035,124", "name": "flytekit", "levelname": "ERROR", "message": "Exception when executing '_TypedSchema' object has no attribute 'persist'", "exc_info": "Traceback (most recent call last):\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py\", line 478, in dispatch_execute\n native_outputs = self.execute(**native_inputs)\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/python_function_task.py\", line 160, in execute\n return exception_scopes.user_entry_point(self._task_function)(**kwargs)\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py\", line 198, in user_entry_point\n return wrapped(*args, **kwargs)\n File \"/home/komal/PycharmProjects/spark-ge/main.py\", line 46, in df_expect\n batch=SparkDFDataset(df)\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/great_expectations/dataset/sparkdf_dataset.py\", line 617, in __init__\n self.spark_df.persist()\nAttributeError: '_TypedSchema' object has no attribute 'persist'"} Traceback (most recent call last): File "/home/komal/PycharmProjects/spark-ge/main.py", line 64, in <module> df_expect(df=df) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 288, in call return flyte_entity_call_handler(self, *args, **kwargs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/promise.py", line 991, in flyte_entity_call_handler result = cast(LocallyExecutable, entity).local_execute(child_ctx, **kwargs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 269, in local_execute outputs_literal_map = self.dispatch_execute(ctx, input_literal_map) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 481, in dispatch_execute raise e File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 478, in dispatch_execute native_outputs = self.execute(**native_inputs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/python_function_task.py", line 160, in execute return exception_scopes.user_entry_point(self._task_function)(**kwargs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 198, in user_entry_point return wrapped(*args, **kwargs) File "/home/komal/PycharmProjects/spark-ge/main.py", line 46, in df_expect batch=SparkDFDataset(df) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/great_expectations/dataset/sparkdf_dataset.py", line 617, in init self.spark_df.persist() AttributeError: '_TypedSchema' object has no attribute 'persist'
y
@Samhita Alla do you think you could help take a look? i’m not as familiar with great expectations code.
s
Sure! @komal azram, would you mind sharing me the
df_expect()
task code?
I think you may have to call
open()
on your spark dataframe within the
df_expect()
code, something like
df.open(pyspark.DataFrame).all()
k
Copy code
def df_expect(df:my_schema) -> str:

    


    batch=SparkDFDataset(df)


    ID_Null=batch.expect_column_values_to_not_be_null("ID")
ID_res=print("Null values not found" if ID_Null.success else "NUll values")
return ID_res
s
Can you replace
batch=SparkDFDataset(df)
with
batch=SparkDFDataset(df.open(pyspark.DataFrame).all())
?
k
AttributeError: module 'pyspark' has no attribute 'dataframe'
s
Sorry, can you try
pyspark.sql.DataFrame
, @User?
k
AssertionError: SPARK_SESSION not available as a parameter in Flyte context - are you in right task-type?
s
Are you using
task_config=Spark()
for your task? Example:
Copy code
@task(
    task_config=Spark(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        }
    ),
    cache_version="1",
)
def create_spark_df() -> my_schema:
    ...
k
No
is it mandatory?
s
Oh, you’ll have to, to initialize the Spark session. You can see https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_spark/dataframe_passing.html example, the one you’re already referring to.
k
Thankyou @Samhita Alla
Hey I have added task_config variables as mentioned in the tutorials but it gives following error
raise AssertionError(f"{attr_name} not available as a parameter in Flyte context - are you in right task-type?") AssertionError: SPARK_SESSION not available as a parameter in Flyte context - are you in right task-type?
In my use case I have to build a spark session and connect snowflake and spark to query data from snowflake
I have added the task_configuration in the task where I am starting the spark session
s
This error has to do with configuring Spark. Can you paste the stacktrace? I’d like to see where exactly this error is popping up.
k
{"asctime": "2022-03-23 144000,136", "name": "flytekit", "levelname": "ERROR", "message": "Exception when executing SPARK_SESSION not available as a parameter in Flyte context - are you in right task-type?", "exc_info": "Traceback (most recent call last):\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py\", line 478, in dispatch_execute\n native_outputs = self.execute(**native_inputs)\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/python_function_task.py\", line 160, in execute\n return exception_scopes.user_entry_point(self._task_function)(**kwargs)\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py\", line 198, in user_entry_point\n return wrapped(*args, **kwargs)\n File \"/home/komal/PycharmProjects/spark-ge/main.py\", line 58, in df_expect\n batch = SparkDFDataset(df.open(spark.sql.DataFrame).all())\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekitplugins/spark/schema.py\", line 27, in all\n return ctx.spark_session.read.parquet(self.from_path)\n File \"/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/context_manager.py\", line 313, in __getattr__\n raise AssertionError(f\"{attr_name} not available as a parameter in Flyte context - are you in right task-type?\")\nAssertionError: SPARK_SESSION not available as a parameter in Flyte context - are you in right task-type?"} Traceback (most recent call last): File "/home/komal/PycharmProjects/spark-ge/main.py", line 81, in <module> df_expect(df=df) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 288, in call return flyte_entity_call_handler(self, *args, **kwargs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/promise.py", line 991, in flyte_entity_call_handler result = cast(LocallyExecutable, entity).local_execute(child_ctx, **kwargs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 269, in local_execute outputs_literal_map = self.dispatch_execute(ctx, input_literal_map) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 481, in dispatch_execute raise e File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 478, in dispatch_execute native_outputs = self.execute(**native_inputs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/python_function_task.py", line 160, in execute return exception_scopes.user_entry_point(self._task_function)(**kwargs) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 198, in user_entry_point return wrapped(*args, **kwargs) File "/home/komal/PycharmProjects/spark-ge/main.py", line 58, in df_expect batch = SparkDFDataset(df.open(spark.sql.DataFrame).all()) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekitplugins/spark/schema.py", line 27, in all return ctx.spark_session.read.parquet(self.from_path) File "/home/komal/PycharmProjects/spark-ge/venv/lib/python3.8/site-packages/flytekit/core/context_manager.py", line 313, in getattr raise AssertionError(f"{attr_name} not available as a parameter in Flyte context - are you in right task-type?") AssertionError: SPARK_SESSION not available as a parameter in Flyte context - are you in right task-type? Process finished with exit code 1
s
Are you running this locally or in a configured Flyte environment with Spark backend?
Can you also share me the whole code, wherever you’re initializing the task config?
k
running it locally
give me a min
Copy code
import datetime

from flytekit import task, kwtypes
from typing import Tuple
import pyspark as spark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from great_expectations.dataset import SparkDFDataset
from flytekitplugins.spark import Spark
from flytekit.types.schema import FlyteSchema




my_schema = FlyteSchema[kwtypes(ID=str,START_T=datetime.datetime,STOP=datetime.datetime,PATIENT=str,ENCOUNTER=str,CODE=str,intDESCRIPTION=str,REASONCODE=int,REASONDESCRIPTION=str)]
@task(
task_config=Spark(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        }
    ),
    cache_version="1")
def spark_start()-> my_schema:

    spark = SparkSession.builder \
        .master("local") \
        .appName("snowflake-test") \
        .config('spark.jars', 'jar/snowflake-jdbc-3.13.15.jar,jar/spark-snowflake_2.12-2.10.0-spark_3.2.jar') \
        .getOrCreate()


f = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
        .options(**sfOptions) \
        .option("query", "SELECT * FROM CAREPLANS") \
        .load()
    return df
@task

def df_expect(df:my_schema) -> str:

    #df1 =df.open(df).all()
    batch = SparkDFDataset(df.open(spark.sql.DataFrame).all())




    #batch=SparkDFDataset(df1)


    ID_Null=batch.expect_column_values_to_not_be_null("ID")
    ID_res=print("Null values not found" if ID_Null.success else "NUll values")

    CODE_Null=batch.expect_column_values_to_not_be_null("CODE")
    CODE_res=print("Null values not found" if CODE_Null.success else "NUll values")

    DESCRIPTION_NUll=batch.expect_column_values_to_not_be_null("DESCRIPTION")
    DESCRIPTION_res=print("Null values not found" if DESCRIPTION_NUll.success else "NUll values")

    ID_Unique= batch.expect_column_values_to_be_unique("ID")
    Unique_res=print("Column has Unique values" if ID_Unique.success else "duplicate values")
    return ID_res,CODE_res,DESCRIPTION_res,Unique_res

if __name__ == "__main__":
    df=spark_start()
    df_expect(df=df)
s
In the
spark_start
task, can you replace
SparkSession
with
sess
Copy code
sess = flytekit.current_context().spark_session
k
Copy code
def spark_start()-> my_schema:
    

    spark = flytekit.current_context().sparkSession.builder \
        .master("local") \
        .appName("snowflake-test") \
        .config('spark.jars', 'jar/snowflake-jdbc-3.13.15.jar,jar/spark-snowflake_2.12-2.10.0-spark_3.2.jar') \
        .getOrCreate(
you mean like that
s
Yes!
Copy code
sess = flytekit.current_context().spark_session 
    spark = sess.builder...
k
sertionError: SPARK_SESSION not available as a parameter in Flyte context - are you in right task-type?
still getting same error
s
Not sure why that’s happening. Can you share me your code once again?
Can you merge both of them into a single task? I think because we’re trying to open our dataframe through the Spark mode, it needs that session to be available.
k
ok
s
Thank you being patient. Let me know after you run your modified code.
k
TypeError: Failed to convert return value for var o0 for function main.spark_start with error <class 'AssertionError'>: Python value cannot be None, expected <class 'str'>/simple: STRING I got this error when I have returned one variable that contains print statement. and when I have returned all three variables it gives following error. TypeError: Output(o0) in task__main__.spark_start received a tuple (None, None, None, None), instead of <class 'str'>
although I am getting results
s
@komal azram, in the
df_expect
task, the output datatype is
str
, when it has to be a tuple with four values, like
Tuple(str, ..., ..., ...)
. You can use NamedTuple to assign names to the outputs (https://docs.flyte.org/projects/cookbook/en/latest/auto/core/flyte_basics/named_outputs.html#sphx-glr-auto-core-flyte-basics-named-outputs-py). Also, rather than returning print statements, you’ll have to return the actual value cause
print
doesn’t return anything.
k
it's working now
but when I add workflow it gives following error TypeError: __new__() missing 3 required positional arguments: 'CODE_res', 'DESCRIPTION_res', and 'Unique_res'
Copy code
wf_outputs = typing.NamedTuple( "op2",ID_res=str,CODE_res=str,DESCRIPTION_res=str,Unique_res=str)
@workflow
def my_wf() -> wf_outputs:
    return wf_outputs(spark_start)
if __name__ == "__main__":

    print(my_wf)
s
Can you call the
spark_start
task like
return wf_outputs(spark_start())
? Also, you can unbox the outputs, like:
Copy code
def my_wf() -> wf_outputs:
    task_exec = spark_start()
    return wf_outputs(task_exec.ID_res, task_exec.CODE_rest)
considering, your task is returning the same NamedTuple.
k
Thankyou for helping @Samhita Alla it's working now.
🚀 2
👍 2
191 Views