Hi here, I am getting the following type error whi...
# ask-the-community
a
Hi here, I am getting the following type error while trying to perform onehotencoding spark operation. The operation executes successfully in my local spark env but fails when I try to run with flyte:
Copy code
TypeError: Failed to convert return value for var o0 for function ml_project_1.ohe.estimator with error <class 'pyarrow.lib.ArrowInvalid'>: ('Could not convert SparseVector(5, {2: 1.0}) with type SparseVector: did not recognize Python value type when inferring an Arrow data type', 'Conversion failed for column encoded_col_feat1 with type object')
s
Could you send
ml_project_1.ohe.estimator
task code?
I'm guessing you're specifying columns in Structured Dataset, and the type is set to an unknown type that isn't understood by Flyte.
a
Hi Samhita, thanks for reaching out. Here is the code snippet.
Copy code
def extract(row):
    return tuple(row) + tuple(row["features"].toArray().tolist())

def fit_model(spark_df, ohe_cols: List):

    # string_indexing
    indexed_cols = ["indexed_" + col for col in ohe_cols]
    encoded_cols = ["encoded_" + col for col in ohe_cols]

    indexer = StringIndexer(inputCols=ohe_cols,
                            outputCols=indexed_cols,
                            handleInvalid="keep"
                            )

    encoder = OneHotEncoder(inputCols=indexed_cols,
                            outputCols=encoded_cols,
                            dropLast=True
                            )

    assembler = VectorAssembler(inputCols=encoded_cols, outputCol="features")
    fitted_model = Pipeline(stages=[indexer, encoder, assembler]).fit(spark_df)

    return fitted_model


def transform_data(fitted_model, spark_df, ohe_cols):
    original_cols = spark_df.columns
    fixed_col = [col for col in original_cols if col not in ohe_cols]  # pass-through cols
    transformed_cols = []

    for col, label_array in zip(ohe_cols, fitted_model.stages[0].labelsArray):
        col_x = list(map(lambda label: col + "_" + label, label_array))
        transformed_cols.extend(col_x)

    final_cols = fixed_col + transformed_cols

    spark_df = fitted_model.transform(spark_df)      #transform spark_df using fitted_model
    spark_df = spark_df.select(fixed_col + ["features"])
    <http://logging.info|logging.info>(f'datatype = {type(spark_df)}')

    # spark_df = spark_df.rdd.map(extract).toDF()
    
    # vector_col = list(filter(lambda x: x[1] == "vector", spark_df.dtypes))[0][0]
    # spark_df = spark_df.drop(vector_col).toDF(*final_cols)

    return spark_df


@task(
    task_config=Spark(
        spark_conf={
            "spark.driver.memory": "2000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "4",
            "spark.executor.instances": "1",
            "spark.driver.cores": "1",
        },
    ),
    limits=Resources(mem="2000M"),
    cache_version="1",
)
def estimator(config: Dict[str, Union[str, List[str]]]) -> pd.DataFrame:

    spark = flytekit.current_context().spark_session
    input_df = read_data(spark,
                         path=config["input_data_path"],
                         datatype=config["input_datatype"])

    fitted_model = fit_model(input_df, ohe_cols=config["ohe_cols"])
    transformed_data = transform_data(fitted_model, input_df, config['ohe_cols'])

    return transformed_data.toPandas()
Also, I get a ModuleNotFoundError: when I uncomment the following in the above code snippet
Copy code
# spark_df = spark_df.rdd.map(extract).toDF()
    
    # vector_col = list(filter(lambda x: x[1] == "vector", spark_df.dtypes))[0][0]
    # spark_df = spark_df.drop(vector_col).toDF(*final_cols)
Hi Samhita, I have been able to resolve the problem. Thanks
s
Awesome! What was the issue?
155 Views