https://flyte.org logo
#ask-the-community
Title
# ask-the-community
a

Adedeji Ayinde

02/01/2023, 8:15 AM
Hello team, While applying a map function on spark rdd, job failed unexpectedly. flyte Spark job is executed on local machine. Any suggestion on how to resolve this failure.
Copy code
flytekit.exceptions.scopes.FlyteScopedUserException: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 3) (localhost executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)


INFO:py4j.clientserver:Closing down clientserver connection
s

Samhita Alla

02/01/2023, 11:00 AM
Could you share the code snippet you tried to run?
a

Adedeji Ayinde

02/01/2023, 3:54 PM
The transform_data function implements the main logic used in the flyte task below
Copy code
@udf
def extract(row):
    return tuple(row) + tuple(row["features"].toArray().tolist())

def transform_data(fitted_model, spark_df, ohe_cols: List): # -> ps.sql.DataFrame:
    """
    @param fitted_model:
    @param spark_df:
    @param ohe_cols:
    @return:
    """
    original_cols = spark_df.columns
    fixed_col = [col for col in original_cols if col not in ohe_cols]  # pass-through cols
    transformed_cols = []

    for col, label_array in zip(ohe_cols, fitted_model.stages[0].labelsArray):
        col_x = list(map(lambda label: col + "_" + label, label_array))
        transformed_cols.extend(col_x)

    final_cols = fixed_col + transformed_cols

    spark_df = fitted_model.transform(spark_df)
    spark_df = spark_df.select(fixed_col + ["features"])
    spark_df = spark_df.rdd.map(extract).toDF()
    
    vector_col = list(filter(lambda x: x[1] == "vector", spark_df.dtypes))[0][0]
    spark_df = spark_df.drop(vector_col).toDF(*final_cols)

    # return spark_df
    return spark_df
Copy code
@task(
    # container_image="<http://613630599026.dkr.ecr.us-east-1.amazonaws.com/flyte-pyspark:latest|613630599026.dkr.ecr.us-east-1.amazonaws.com/flyte-pyspark:latest>",
    task_config=Spark(
        spark_conf={
            "spark.driver.memory": "6000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "2",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
            "spark.hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
            # The following is needed only when running spark task in dev's local PC.
            # Also need to do this locally: export SPARK_LOCAL_IP="127.0.0.1"
            "spark.jars.packages": "org.apache.hadoop:hadoop-aws:3.3.2",
            "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
            "spark.hadoop.fs.s3a.access.key": "",
            "spark.hadoop.fs.s3a.secret.key": "",
            "spark.hadoop.fs.s3a.session.token": "",
        },
    ),
    limits=Resources(mem="2000M"),
    cache_version="1",
)
def estimate(config: Config) -> list[Any]:
    spark = flytekit.current_context().spark_session

    # read train_data
    train_df = read_data(spark,
                         path=config.train_data_path,
                         filetype=config.train_filetype)

    # fit ohe_pipeline model on train_data
    # transform train_data
    fitted_model = fit_model(train_df, config)
    transformed_train_df = transform_data(fitted_model,
                                          train_df,
                                          config.ohe_cols
                                          )
    return transformed_train_df.toPandas()
s

Samhita Alla

02/02/2023, 5:58 AM
Could you fetch logs from the concerning pod? If you're testing on a demo cluster, run:
Copy code
docker exec -it <container-id> sh
kubectl get pods -n flytesnacks-development
kubectl logs <pod-name> -n flytesnacks-development
The return type of
estimate
task is
list[Any]
, which is generic and during serialization, gets pickled. Could you specify the exact type? You can use
typing.Union
.
a

Adedeji Ayinde

02/02/2023, 6:26 AM
Thanks Samhita for your response. I fixed the return type and have the workflow run successfully locally. However, when I tried to run in a remote flyte cluster(company hosted) with a custom image it return a moduleNotFound error. The referenced module is the enclosing directory:
s

Samhita Alla

02/02/2023, 6:44 AM
Could you send me the spark-submit command? You can find that in the task details.
a

Adedeji Ayinde

02/02/2023, 7:09 AM
I couldn't find submit command in the task details, am I missing something?
s

Samhita Alla

02/02/2023, 7:11 AM
Oh. My bad. You should be able to fetch it from the pod log.
a

Adedeji Ayinde

02/02/2023, 7:25 AM
logs-from-spark-kubernetes-driver-in-fada3aec4d86042ab9f8-n0-0-driver.txt
i just shared with you the pod log
s

Samhita Alla

02/02/2023, 8:54 AM
Are you trying to import code from other modules?
In this case, the dai-mle-workflow module?
a

Adedeji Ayinde

02/02/2023, 3:26 PM
No I was not trying to import code from other module. My workflow script is in the dai-mle-workflow directory. path to my workflow: .../dai-mle-workflow/ml_project_1/onehotencoder_engine.wf
s

Samhita Alla

02/07/2023, 5:32 AM
Sorry I missed your message. Can you try running https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_spark/pyspark_pi.html example? Want to verify if the configuration isn't right or if there's a problem with the code.
I tried running the linked example on my demo cluster a couple of weeks ago and it worked fine.
Also, how are you registering your code on the Flyte backend?
a

Adedeji Ayinde

02/07/2023, 6:02 AM
thanks Samhita for your help. The moduleNotFound error in my workflow was caused by the udf function (a scope related issue). I was able to get the workflow to run after refactoring my script.
s

Samhita Alla

02/07/2023, 6:02 AM
Good to know that you got it running.
What was the refactor you did?
a

Adedeji Ayinde

02/07/2023, 6:06 AM
I moved the extract function(udf) which was originally outside the transform_data function into the transform data function. Specifically, I passed it as a lambda into spark_df.rdd.map()
4 Views