from flytekit import task, workflow, dynamic impor...
# ask-the-community
from flytekit import task, workflow, dynamic import pandas as pd import os import pandas as pd from itertools import combinations from typing import List from flytekit import task, workflow, dynamic #Function to read CSV files and perform transformations @task def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame: df = pd.read_csv(file_path, usecols=selected_columns) return df # Function to replace empty cells in specified columns @task def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value='EMPTY') -> pd.DataFrame: for col in columns: df[col].fillna(replace_value, inplace=True) return df # Function to generate combinations of group by parameters @task def generate_group_by_combinations(group_by_columns: List[str]) -> List[List[str]]: group_by_combinations = [] for r in range(1, len(group_by_columns) + 1): group_by_combinations.extend(combinations(group_by_columns, r)) return group_by_combinations # Function to apply SQL-like transformations and aggregation @task def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame: # Add savings_present column df['S_PRESENT'] = df['S_ID'].notnull().astype(int) # Extract year and month from transaction_date df['TRANSACTION_YEAR_MONTH'] = df['T'].str.slice(0, 7) return df @workflow # Main function def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv', csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', csv1_columns: List[str]=['A', 'P', 'T', 'CE', 'AMT'], csv2_columns: List[str]=['A', 'S_ID'], group_by_columns: List[str]=['P', 'S_PRESENT', 'C', 'TRANSACTION_YEAR_MONTH']) -> dict: # Read and transform CSV files df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns) df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns) # Merge dataframes merged_df = pd.merge(df1, df2, on='A', how='left') # Apply SQL-like transformations transformed_df = apply_sql_transformations(df=merged_df) # Define group by columns #group_by_columns = ['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH'] # Replace empty cells with 'EMPTY' transformed_df = replace_empty_cells(df=transformed_df,columns=group_by_columns, replace_value='EMPTY') # Generate all possible combinations of group by parameters group_by_combinations = generate_group_by_combinations(group_by_columns=group_by_columns ) # Dictionary to store result dataframes with their names result_dfs = {} # Perform group by and aggregation for idx, group_by_cols in enumerate(group_by_combinations): result_df = transformed_df.groupby(list(group_by_cols)).agg({ 'AMT': ['sum', 'mean'], 'A': 'nunique' }).reset_index() # Rename columns as needed result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values] # Name for the dataframe df_name = f"result{idx + 1}" # Store dataframe in the dictionary result_dfs[df_name] = result_df return result_dfs