Adedeji Ayinde
02/01/2023, 8:15 AMflytekit.exceptions.scopes.FlyteScopedUserException: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 3) (localhost executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
INFO:py4j.clientserver:Closing down clientserver connection
Samhita Alla
Adedeji Ayinde
02/01/2023, 3:54 PM@udf
def extract(row):
return tuple(row) + tuple(row["features"].toArray().tolist())
def transform_data(fitted_model, spark_df, ohe_cols: List): # -> ps.sql.DataFrame:
"""
@param fitted_model:
@param spark_df:
@param ohe_cols:
@return:
"""
original_cols = spark_df.columns
fixed_col = [col for col in original_cols if col not in ohe_cols] # pass-through cols
transformed_cols = []
for col, label_array in zip(ohe_cols, fitted_model.stages[0].labelsArray):
col_x = list(map(lambda label: col + "_" + label, label_array))
transformed_cols.extend(col_x)
final_cols = fixed_col + transformed_cols
spark_df = fitted_model.transform(spark_df)
spark_df = spark_df.select(fixed_col + ["features"])
spark_df = spark_df.rdd.map(extract).toDF()
vector_col = list(filter(lambda x: x[1] == "vector", spark_df.dtypes))[0][0]
spark_df = spark_df.drop(vector_col).toDF(*final_cols)
# return spark_df
return spark_df
@task(
# container_image="<http://613630599026.dkr.ecr.us-east-1.amazonaws.com/flyte-pyspark:latest|613630599026.dkr.ecr.us-east-1.amazonaws.com/flyte-pyspark:latest>",
task_config=Spark(
spark_conf={
"spark.driver.memory": "6000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "2",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
"spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
# The following is needed only when running spark task in dev's local PC.
# Also need to do this locally: export SPARK_LOCAL_IP="127.0.0.1"
"spark.jars.packages": "org.apache.hadoop:hadoop-aws:3.3.2",
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
"spark.hadoop.fs.s3a.access.key": "",
"spark.hadoop.fs.s3a.secret.key": "",
"spark.hadoop.fs.s3a.session.token": "",
},
),
limits=Resources(mem="2000M"),
cache_version="1",
)
def estimate(config: Config) -> list[Any]:
spark = flytekit.current_context().spark_session
# read train_data
train_df = read_data(spark,
path=config.train_data_path,
filetype=config.train_filetype)
# fit ohe_pipeline model on train_data
# transform train_data
fitted_model = fit_model(train_df, config)
transformed_train_df = transform_data(fitted_model,
train_df,
config.ohe_cols
)
return transformed_train_df.toPandas()
Samhita Alla
docker exec -it <container-id> sh
kubectl get pods -n flytesnacks-development
kubectl logs <pod-name> -n flytesnacks-development
estimate
task is list[Any]
, which is generic and during serialization, gets pickled. Could you specify the exact type? You can use typing.Union
.Adedeji Ayinde
02/02/2023, 6:26 AMSamhita Alla
Adedeji Ayinde
02/02/2023, 7:09 AMSamhita Alla
Adedeji Ayinde
02/02/2023, 7:25 AMSamhita Alla
Adedeji Ayinde
02/02/2023, 3:26 PMSamhita Alla
Adedeji Ayinde
02/07/2023, 6:02 AMSamhita Alla
Adedeji Ayinde
02/07/2023, 6:06 AM