Adedeji Ayinde
01/26/2023, 6:42 PMTypeError: Failed to convert return value for var o0 for function ml_project_1.ohe.estimator with error <class 'pyarrow.lib.ArrowInvalid'>: ('Could not convert SparseVector(5, {2: 1.0}) with type SparseVector: did not recognize Python value type when inferring an Arrow data type', 'Conversion failed for column encoded_col_feat1 with type object')
Samhita Alla
01/27/2023, 6:38 AMml_project_1.ohe.estimator
task code?Adedeji Ayinde
01/30/2023, 7:48 AMdef extract(row):
return tuple(row) + tuple(row["features"].toArray().tolist())
def fit_model(spark_df, ohe_cols: List):
# string_indexing
indexed_cols = ["indexed_" + col for col in ohe_cols]
encoded_cols = ["encoded_" + col for col in ohe_cols]
indexer = StringIndexer(inputCols=ohe_cols,
outputCols=indexed_cols,
handleInvalid="keep"
)
encoder = OneHotEncoder(inputCols=indexed_cols,
outputCols=encoded_cols,
dropLast=True
)
assembler = VectorAssembler(inputCols=encoded_cols, outputCol="features")
fitted_model = Pipeline(stages=[indexer, encoder, assembler]).fit(spark_df)
return fitted_model
def transform_data(fitted_model, spark_df, ohe_cols):
original_cols = spark_df.columns
fixed_col = [col for col in original_cols if col not in ohe_cols] # pass-through cols
transformed_cols = []
for col, label_array in zip(ohe_cols, fitted_model.stages[0].labelsArray):
col_x = list(map(lambda label: col + "_" + label, label_array))
transformed_cols.extend(col_x)
final_cols = fixed_col + transformed_cols
spark_df = fitted_model.transform(spark_df) #transform spark_df using fitted_model
spark_df = spark_df.select(fixed_col + ["features"])
<http://logging.info|logging.info>(f'datatype = {type(spark_df)}')
# spark_df = spark_df.rdd.map(extract).toDF()
# vector_col = list(filter(lambda x: x[1] == "vector", spark_df.dtypes))[0][0]
# spark_df = spark_df.drop(vector_col).toDF(*final_cols)
return spark_df
@task(
task_config=Spark(
spark_conf={
"spark.driver.memory": "2000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "4",
"spark.executor.instances": "1",
"spark.driver.cores": "1",
},
),
limits=Resources(mem="2000M"),
cache_version="1",
)
def estimator(config: Dict[str, Union[str, List[str]]]) -> pd.DataFrame:
spark = flytekit.current_context().spark_session
input_df = read_data(spark,
path=config["input_data_path"],
datatype=config["input_datatype"])
fitted_model = fit_model(input_df, ohe_cols=config["ohe_cols"])
transformed_data = transform_data(fitted_model, input_df, config['ohe_cols'])
return transformed_data.toPandas()
# spark_df = spark_df.rdd.map(extract).toDF()
# vector_col = list(filter(lambda x: x[1] == "vector", spark_df.dtypes))[0][0]
# spark_df = spark_df.drop(vector_col).toDF(*final_cols)
Samhita Alla
01/31/2023, 4:48 AM