Hey everyone :slightly_smiling_face: I have some p...
# announcements
f
Hey everyone 🙂 I have some problem configuring a
StructuredDataset
to pass a Spark Dataframe around between tasks. I’m following this guide and get this error:
Copy code
{
  "asctime": "2022-08-30 16:15:09,048",
  "name": "flytekit",
  "levelname": "ERROR",
  "message": "Failed to convert return value for var o0 with error <class 'ValueError'>: Failed to find a handler for <class 'pyspark.sql.dataframe.DataFrame'>, protocol gs, fmt parquet"
}
I added this to my spark config but it doesn’t solve the problem:
Copy code
spark-config-default:
          - "spark.jars.packages": "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.2"
          - "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
          - "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
          - "spark.hadoop.google.cloud.auth.service.account.enable": "true"
Would be great to get some pointers in case somebody has seen this error before, thanks!
y
which version are you on?
f
v1.1.0
y
do you think you could bump to 1.1.1 please?
we noticed an issue with the transformer on gcp. should be updated in 1.1.1 to read the protocol from the raw output prefix instead.
also you have
from flytekitplugins.spark import Spark
somewhere in your code right? or something that imports from that module.
(the init file there triggers the loading of the transformer)
f
Updated to 1.2.0-b1 as the 1.1.1 chart appears to not exist. I am importing
from flytekitplugins.spark import Spark
but still do see the same error.
I solved my problem now by simply writing the dataframe to a parquet file using the gcs connector. Using structured datasets would have been nicer i feel but the solution works well, so no more problem here.
s
@Yee, is this a bug cause the latest flytekit version doesn’t resolve the issue?
y
sorry, didn’t see this. yeah using the structured dataset pattern and just returning the dataframe would’ve been nice. we will look into this.
would you mind copy/pasting a bit of redacted code? primarily interested in the task signature and return statement.
f
Copy code
import pyspark.sql
import flytekit
import flytekitplugins.hydra  # noqa F401  # This is an internal package with a type transformer for omegaconf DictConfig objects

from flytekit import Resources, task, workflow, kwtypes
from flytekitplugins.spark import Spark
from flytekit.types.structured import StructuredDataset
from omegaconf import DictConfig, OmegaConf
import pyspark.sql.functions as F

urm_columns = kwtypes(user_id=int, item_id=int, rating=float)
id_mapping_columns = kwtypes(user_id=int, user_pseudo_id=int, ...)


@task(
    task_config=Spark(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "5000M",
            "spark.executor.memory": "5000M",
            "spark.executor.cores": "4",
            "spark.executor.instances": "3",
            "spark.driver.cores": "2",
        }
    ),
    retries=1,
    timeout=timedelta(hours=1),
    cache=True,
    cache_version="1.8",
    requests=Resources(mem="1Gi", cpu="1"),
    container_image="{{.images.spark.fqn}}:{{.images.default.version}}",
)
def access_and_validate_data(
    cfg: DictConfig,
) -> Tuple[
    Annotated[StructuredDataset, urm_columns],
    Annotated[StructuredDataset, urm_columns],
    Annotated[StructuredDataset, urm_columns],
    Annotated[StructuredDataset, urm_columns],
    Annotated[StructuredDataset, id_mapping_columns],
]:
    """
    Args:
        cfg (DictConfig): data accessor hydra configuration.

    Returns:
       ...
    """
    sess = flytekit.current_context().spark_session  # noqa F841
    <http://logging.info|logging.info>(OmegaConf.to_yaml(cfg))

    online_df = bq_to_spark(...)
    offline_df = bq_to_spark(...)

    # Data validation with great expectations
    # Feature engineering
    # Train test split

    ds_online_train = StructuredDataset(dataframe=online_df_train, uri=df_online_train_uri)
    ds_online_test = StructuredDataset(dataframe=online_df_test, uri=df_online_test_uri)
    ds_offline_train = StructuredDataset(dataframe=offline_df_train, uri=df_offline_train_uri)
    ds_offline_test = StructuredDataset(dataframe=offline_df_test, uri=df_offline_test_uri)
    ds_id_mapping = StructuredDataset(dataframe=id_mapping_df, uri=id_mapping_df_uri)
    
    return (
        ds_online_train,
        ds_online_test,
        ds_offline_train,
        ds_offline_test,
        ds_id_mapping,
    )
Does this help? (I had to redact quite a bit of things but am happy to answer any questions you might have)
y
can you run it again with
FLYTE_SDK_LOGGING_LEVEL=10
?
can add it to the task definition
flytekit has logging that should display what’s happening
i’m wondering if maybe the problem is that the data persistence layer isn’t picking up gcs as an option
165 Views