refined-dawn-96621
03/22/2022, 3:47 PMfreezing-airport-6809
refined-dawn-96621
03/22/2022, 3:53 PMmy_schema = FlyteSchema[kwtypes(ID=str,START_T=datetime.datetime,STOP=datetime.datetime,PATIENT=str,ENCOUNTER=str,CODE=str,intDESCRIPTION=str,REASONCODE=int,REASONDESCRIPTION=str)]
@task
def spark_start()-> my_schema:
spark = SparkSession.builder \
.master("local") \
.appName("snowflake-test") \
.config('spark.jars', 'jar/snowflake-jdbc-3.13.15.jar,jar/spark-snowflake_2.12-2.10.0-spark_3.2.jar') \
.getOrCreate()
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "SELECT * FROM CAREPLANS") \
.load()
return df
freezing-airport-6809
freezing-airport-6809
refined-dawn-96621
03/22/2022, 3:56 PMthankful-minister-83577
thankful-minister-83577
refined-dawn-96621
03/22/2022, 4:17 PMif __name__ == "__main__":
df=spark_start()
df_expect(df)
thankful-minister-83577
df_expect(df=df)
refined-dawn-96621
03/22/2022, 4:21 PMrefined-dawn-96621
03/22/2022, 4:21 PMthankful-minister-83577
refined-dawn-96621
03/22/2022, 4:33 PMthankful-minister-83577
tall-lock-23197
df_expect()
task code?tall-lock-23197
open()
on your spark dataframe within the df_expect()
code, something like df.open(pyspark.DataFrame).all()
refined-dawn-96621
03/23/2022, 5:43 AMdef df_expect(df:my_schema) -> str:
batch=SparkDFDataset(df)
ID_Null=batch.expect_column_values_to_not_be_null("ID")
ID_res=print("Null values not found" if ID_Null.success else "NUll values")
return ID_res
tall-lock-23197
batch=SparkDFDataset(df)
with batch=SparkDFDataset(df.open(pyspark.DataFrame).all())
?refined-dawn-96621
03/23/2022, 6:06 AMtall-lock-23197
pyspark.sql.DataFrame
, @User?refined-dawn-96621
03/23/2022, 6:23 AMtall-lock-23197
task_config=Spark()
for your task? Example:
@task(
task_config=Spark(
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
}
),
cache_version="1",
)
def create_spark_df() -> my_schema:
...
refined-dawn-96621
03/23/2022, 6:27 AMrefined-dawn-96621
03/23/2022, 6:28 AMtall-lock-23197
tall-lock-23197
refined-dawn-96621
03/23/2022, 6:43 AMrefined-dawn-96621
03/23/2022, 9:41 AMrefined-dawn-96621
03/23/2022, 9:41 AMrefined-dawn-96621
03/23/2022, 9:42 AMrefined-dawn-96621
03/23/2022, 9:42 AMtall-lock-23197
refined-dawn-96621
03/23/2022, 9:46 AMtall-lock-23197
tall-lock-23197
refined-dawn-96621
03/23/2022, 9:56 AMrefined-dawn-96621
03/23/2022, 9:56 AMrefined-dawn-96621
03/23/2022, 9:58 AMimport datetime
from flytekit import task, kwtypes
from typing import Tuple
import pyspark as spark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from great_expectations.dataset import SparkDFDataset
from flytekitplugins.spark import Spark
from flytekit.types.schema import FlyteSchema
my_schema = FlyteSchema[kwtypes(ID=str,START_T=datetime.datetime,STOP=datetime.datetime,PATIENT=str,ENCOUNTER=str,CODE=str,intDESCRIPTION=str,REASONCODE=int,REASONDESCRIPTION=str)]
@task(
task_config=Spark(
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
}
),
cache_version="1")
def spark_start()-> my_schema:
spark = SparkSession.builder \
.master("local") \
.appName("snowflake-test") \
.config('spark.jars', 'jar/snowflake-jdbc-3.13.15.jar,jar/spark-snowflake_2.12-2.10.0-spark_3.2.jar') \
.getOrCreate()
f = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "SELECT * FROM CAREPLANS") \
.load()
return df
@task
def df_expect(df:my_schema) -> str:
#df1 =df.open(df).all()
batch = SparkDFDataset(df.open(spark.sql.DataFrame).all())
#batch=SparkDFDataset(df1)
ID_Null=batch.expect_column_values_to_not_be_null("ID")
ID_res=print("Null values not found" if ID_Null.success else "NUll values")
CODE_Null=batch.expect_column_values_to_not_be_null("CODE")
CODE_res=print("Null values not found" if CODE_Null.success else "NUll values")
DESCRIPTION_NUll=batch.expect_column_values_to_not_be_null("DESCRIPTION")
DESCRIPTION_res=print("Null values not found" if DESCRIPTION_NUll.success else "NUll values")
ID_Unique= batch.expect_column_values_to_be_unique("ID")
Unique_res=print("Column has Unique values" if ID_Unique.success else "duplicate values")
return ID_res,CODE_res,DESCRIPTION_res,Unique_res
if __name__ == "__main__":
df=spark_start()
df_expect(df=df)
tall-lock-23197
spark_start
task, can you replace SparkSession
with sess
sess = flytekit.current_context().spark_session
refined-dawn-96621
03/24/2022, 9:26 AMdef spark_start()-> my_schema:
spark = flytekit.current_context().sparkSession.builder \
.master("local") \
.appName("snowflake-test") \
.config('spark.jars', 'jar/snowflake-jdbc-3.13.15.jar,jar/spark-snowflake_2.12-2.10.0-spark_3.2.jar') \
.getOrCreate(
refined-dawn-96621
03/24/2022, 9:27 AMtall-lock-23197
sess = flytekit.current_context().spark_session
spark = sess.builder...
refined-dawn-96621
03/24/2022, 10:46 AMrefined-dawn-96621
03/24/2022, 10:47 AMtall-lock-23197
tall-lock-23197
refined-dawn-96621
03/24/2022, 11:00 AMtall-lock-23197
refined-dawn-96621
03/24/2022, 11:33 AMrefined-dawn-96621
03/24/2022, 11:33 AMtall-lock-23197
df_expect
task, the output datatype is str
, when it has to be a tuple with four values, like Tuple(str, ..., ..., ...)
. You can use NamedTuple to assign names to the outputs (https://docs.flyte.org/projects/cookbook/en/latest/auto/core/flyte_basics/named_outputs.html#sphx-glr-auto-core-flyte-basics-named-outputs-py).
Also, rather than returning print statements, you’ll have to return the actual value cause print
doesn’t return anything.refined-dawn-96621
03/24/2022, 3:53 PMrefined-dawn-96621
03/24/2022, 3:54 PMwf_outputs = typing.NamedTuple( "op2",ID_res=str,CODE_res=str,DESCRIPTION_res=str,Unique_res=str)
@workflow
def my_wf() -> wf_outputs:
return wf_outputs(spark_start)
if __name__ == "__main__":
print(my_wf)
tall-lock-23197
spark_start
task like return wf_outputs(spark_start())
? Also, you can unbox the outputs, like:
def my_wf() -> wf_outputs:
task_exec = spark_start()
return wf_outputs(task_exec.ID_res, task_exec.CODE_rest)
considering, your task is returning the same NamedTuple.refined-dawn-96621
03/25/2022, 8:52 AM