import pyspark.sql
import flytekit
import flytekitplugins.hydra # noqa F401 # This is an internal package with a type transformer for omegaconf DictConfig objects
from flytekit import Resources, task, workflow, kwtypes
from flytekitplugins.spark import Spark
from flytekit.types.structured import StructuredDataset
from omegaconf import DictConfig, OmegaConf
import pyspark.sql.functions as F
urm_columns = kwtypes(user_id=int, item_id=int, rating=float)
id_mapping_columns = kwtypes(user_id=int, user_pseudo_id=int, ...)
@task(
task_config=Spark(
# this configuration is applied to the spark cluster
spark_conf={
"spark.driver.memory": "5000M",
"spark.executor.memory": "5000M",
"spark.executor.cores": "4",
"spark.executor.instances": "3",
"spark.driver.cores": "2",
}
),
retries=1,
timeout=timedelta(hours=1),
cache=True,
cache_version="1.8",
requests=Resources(mem="1Gi", cpu="1"),
container_image="{{.images.spark.fqn}}:{{.images.default.version}}",
)
def access_and_validate_data(
cfg: DictConfig,
) -> Tuple[
Annotated[StructuredDataset, urm_columns],
Annotated[StructuredDataset, urm_columns],
Annotated[StructuredDataset, urm_columns],
Annotated[StructuredDataset, urm_columns],
Annotated[StructuredDataset, id_mapping_columns],
]:
"""
Args:
cfg (DictConfig): data accessor hydra configuration.
Returns:
...
"""
sess = flytekit.current_context().spark_session # noqa F841
<http://logging.info|logging.info>(OmegaConf.to_yaml(cfg))
online_df = bq_to_spark(...)
offline_df = bq_to_spark(...)
# Data validation with great expectations
# Feature engineering
# Train test split
ds_online_train = StructuredDataset(dataframe=online_df_train, uri=df_online_train_uri)
ds_online_test = StructuredDataset(dataframe=online_df_test, uri=df_online_test_uri)
ds_offline_train = StructuredDataset(dataframe=offline_df_train, uri=df_offline_train_uri)
ds_offline_test = StructuredDataset(dataframe=offline_df_test, uri=df_offline_test_uri)
ds_id_mapping = StructuredDataset(dataframe=id_mapping_df, uri=id_mapping_df_uri)
return (
ds_online_train,
ds_online_test,
ds_offline_train,
ds_offline_test,
ds_id_mapping,
)