from flytekit import task, workflow, dynamic import pandas as pd import os import pandas as pd from itertools import combinations from typing import List from flytekit import task, workflow, dynamic #Function to read CSV files and perform transformations @task def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame: df = pd.read_csv(file_path, usecols=selected_columns) return df # Function to replace empty cells in specified columns @task def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value='EMPTY') -> pd.DataFrame: for col in columns: df[col].fillna(replace_value, inplace=True) return df # Function to generate combinations of group by parameters @task def generate_group_by_combinations(group_by_columns: List[str]) -> List[List[str]]: group_by_combinations = [] for r in range(1, len(group_by_columns) + 1): group_by_combinations.extend(combinations(group_by_columns, r)) return group_by_combinations # Function to apply SQL-like transformations and aggregation @task def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame: # Add savings_present column df['S_PRESENT'] = df['S_ID'].notnull().astype(int) # Extract year and month from transaction_date df['TRANSACTION_YEAR_MONTH'] = df['T'].str.slice(0, 7) return df @workflow # Main function def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv', csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', csv1_columns: List[str]=['A', 'P', 'T', 'CE', 'AMT'], csv2_columns: List[str]=['A', 'S_ID'], group_by_columns: List[str]=['P', 'S_PRESENT', 'C', 'TRANSACTION_YEAR_MONTH']) -> dict: # Read and transform CSV files df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns) df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns) # Merge dataframes merged_df = pd.merge(df1, df2, on='A', how='left') # Apply SQL-like transformations transformed_df = apply_sql_transformations(df=merged_df) # Define group by columns #group_by_columns = ['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH'] # Replace empty cells with 'EMPTY' transformed_df = replace_empty_cells(df=transformed_df,columns=group_by_columns, replace_value='EMPTY') # Generate all possible combinations of group by parameters group_by_combinations = generate_group_by_combinations(group_by_columns=group_by_columns ) # Dictionary to store result dataframes with their names result_dfs = {} # Perform group by and aggregation for idx, group_by_cols in enumerate(group_by_combinations): result_df = transformed_df.groupby(list(group_by_cols)).agg({ 'AMT': ['sum', 'mean'], 'A': 'nunique' }).reset_index() # Rename columns as needed result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values] # Name for the dataframe df_name = f"result{idx + 1}" # Store dataframe in the dictionary result_dfs[df_name] = result_df return result_dfs