Fabio Grätz
08/30/2022, 4:24 PMStructuredDataset
to pass a Spark Dataframe around between tasks.
I’m following this guide and get this error:
{
"asctime": "2022-08-30 16:15:09,048",
"name": "flytekit",
"levelname": "ERROR",
"message": "Failed to convert return value for var o0 with error <class 'ValueError'>: Failed to find a handler for <class 'pyspark.sql.dataframe.DataFrame'>, protocol gs, fmt parquet"
}
I added this to my spark config but it doesn’t solve the problem:
spark-config-default:
- "spark.jars.packages": "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.2"
- "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
- "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
- "spark.hadoop.google.cloud.auth.service.account.enable": "true"
Would be great to get some pointers in case somebody has seen this error before, thanks!Yee
Fabio Grätz
08/30/2022, 10:15 PMYee
from flytekitplugins.spark import Spark
somewhere in your code right? or something that imports from that module.Fabio Grätz
08/31/2022, 10:48 AMfrom flytekitplugins.spark import Spark
but still do see the same error.Samhita Alla
Yee
Fabio Grätz
09/05/2022, 1:07 PMimport pyspark.sql
import flytekit
import flytekitplugins.hydra # noqa F401 # This is an internal package with a type transformer for omegaconf DictConfig objects
from flytekit import Resources, task, workflow, kwtypes
from flytekitplugins.spark import Spark
from flytekit.types.structured import StructuredDataset
from omegaconf import DictConfig, OmegaConf
import pyspark.sql.functions as F
urm_columns = kwtypes(user_id=int, item_id=int, rating=float)
id_mapping_columns = kwtypes(user_id=int, user_pseudo_id=int, ...)
@task(
task_config=Spark(
# this configuration is applied to the spark cluster
spark_conf={
"spark.driver.memory": "5000M",
"spark.executor.memory": "5000M",
"spark.executor.cores": "4",
"spark.executor.instances": "3",
"spark.driver.cores": "2",
}
),
retries=1,
timeout=timedelta(hours=1),
cache=True,
cache_version="1.8",
requests=Resources(mem="1Gi", cpu="1"),
container_image="{{.images.spark.fqn}}:{{.images.default.version}}",
)
def access_and_validate_data(
cfg: DictConfig,
) -> Tuple[
Annotated[StructuredDataset, urm_columns],
Annotated[StructuredDataset, urm_columns],
Annotated[StructuredDataset, urm_columns],
Annotated[StructuredDataset, urm_columns],
Annotated[StructuredDataset, id_mapping_columns],
]:
"""
Args:
cfg (DictConfig): data accessor hydra configuration.
Returns:
...
"""
sess = flytekit.current_context().spark_session # noqa F841
<http://logging.info|logging.info>(OmegaConf.to_yaml(cfg))
online_df = bq_to_spark(...)
offline_df = bq_to_spark(...)
# Data validation with great expectations
# Feature engineering
# Train test split
ds_online_train = StructuredDataset(dataframe=online_df_train, uri=df_online_train_uri)
ds_online_test = StructuredDataset(dataframe=online_df_test, uri=df_online_test_uri)
ds_offline_train = StructuredDataset(dataframe=offline_df_train, uri=df_offline_train_uri)
ds_offline_test = StructuredDataset(dataframe=offline_df_test, uri=df_offline_test_uri)
ds_id_mapping = StructuredDataset(dataframe=id_mapping_df, uri=id_mapping_df_uri)
return (
ds_online_train,
ds_online_test,
ds_offline_train,
ds_offline_test,
ds_id_mapping,
)
Yee
FLYTE_SDK_LOGGING_LEVEL=10
?