better-market-70367
05/02/2024, 2:39 PMand\or&\|average-finland-92144
05/02/2024, 5:18 PMandorPromise&|better-market-70367
05/02/2024, 5:19 PMaverage-finland-92144
05/02/2024, 5:21 PMbetter-market-70367
05/02/2024, 5:23 PMfrom flytekit import task, workflow, dynamic
import pandas as pd
import os
# Import necessary libraries
import pandas as pd
from itertools import combinations
from typing import List
# from Tasks import read_and_transform_csv, apply_sql_transformations,replace_empty_cells,generate_group_by_combinations
from flytekit import task, workflow, dynamic
# Import necessary libraries
import pandas as pd
from itertools import combinations
from typing import List
#Function to read CSV files and perform transformations
@task
def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame:
    df = pd.read_csv(file_path, usecols=selected_columns)
    # Perform transformations here if needed
    return df
# Function to replace empty cells in specified columns
@task
def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value='EMPTY') -> pd.DataFrame:
    for col in columns:
        df[col].fillna(replace_value, inplace=True)
    return df
# Function to generate combinations of group by parameters
@task
def generate_group_by_combinations(group_by_columns: List[str]) -> List[List[str]]:
    group_by_combinations = []
    for r in range(1, len(group_by_columns) + 1):
        group_by_combinations.extend(combinations(group_by_columns, r))
    return group_by_combinations
# Function to apply SQL-like transformations and aggregation
@task
def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame:
    # Add savings_present column
    df['SAVINGS_PRESENT'] = df['SAVINGS_ACCOUNT_ID'].notnull().astype(int)
    # Extract year and month from transaction_date
    df['TRANSACTION_YEAR_MONTH'] = df['TRANSACTION_DATE'].str.slice(0, 7)
    return df
# Main workflow
# @workflow
# def main_workflow(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv', 
#                   csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
#                   csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'], 
#                   csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'], 
#                   group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:
#     # Read and transform CSV files
#     df1 = read_and_transform_csv(file_path='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv', selected_columns=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'])
#     df2 = read_and_transform_csv(file_path='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', selected_columns=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'])
#     group_by_cols=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']
#     # Merge dataframes
#     merged_df = pd.merge(df1, df2, on='ACCOUNT_ID', how='left')
#     # Apply SQL-like transformations
#     transformed_df = apply_sql_transformations(df=merged_df)
#     # Replace empty cells with 'EMPTY'
#     transformed_df = replace_empty_cells(df=transformed_df, columns=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH'], replace_value='EMPTY')
#     # Generate all possible combinations of group by parameters
#     group_by_combinations = generate_group_by_combinations(group_by_columns=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH'])
#     # Dictionary to store result dataframes with their names
#     result_dfs = {}
#     # Perform group by and aggregation
#     for idx, group_by_cols in enumerate(group_by_combinations):
#         result_df = transformed_df.groupby(list(group_by_cols)).agg({
#             'AMOUNT': ['sum', 'mean'],
#             'ACCOUNT_ID': 'nunique'
#         }).reset_index()
#         # Calculate avg_amount_per_customer
#         result_df['avg_amount_per_customer'] = result_df[('AMOUNT', 'sum')] / result_df[('ACCOUNT_ID', 'nunique')]
#         # Rename columns as needed
#         result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]
#         # Name for the dataframe
#         df_name = f"result{idx + 1}"
#         # Store dataframe in the dictionary
#         result_dfs[df_name] = result_df
#     # return result_dfs
#     print(result_dfs)
# # Provide the values directly to the main_workflow function
# # result_dataframes = main_workflow()
# # Now you can access each dataframe by its name in the result_dataframes dictionary
# # Provide the values directly to the main_workflow function
# result_dataframes = main_workflow()
# # Now you can access each dataframe by its name in the result_dataframes dictionary
# print(result_dataframes)
@workflow
# Main function
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',    
        csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
        csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'], 
        csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'], 
        group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:
    
    # Input CSV file paths and columns
    # csv1_path = '/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv'
    # csv2_path = '/home/ubuntu/flyte/my_project/workflows/ods_customers.csv'
    # csv1_columns = ['ACCOUNT_ID','PRODUCT_TYPE','TRANSACTION_DATE','CATEGORY_NAME','AMOUNT']
    # csv2_columns = ['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID']
     # Read and transform CSV files
    df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
    df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)
    # Merge dataframes
    merged_df = pd.merge(df1, df2, on='ACCOUNT_ID', how='left')
    # Apply SQL-like transformations
    transformed_df = apply_sql_transformations(df=merged_df)
    # Define group by columns
    #group_by_columns = ['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']
    # Replace empty cells with 'EMPTY'
    transformed_df = replace_empty_cells(df=transformed_df,columns=group_by_columns, replace_value='EMPTY')
    # Generate all possible combinations of group by parameters
    group_by_combinations = generate_group_by_combinations(group_by_columns=group_by_columns )
    # Dictionary to store result dataframes with their names
    result_dfs = {}
    # Perform group by and aggregation
    for idx, group_by_cols in enumerate(group_by_combinations):
        result_df = transformed_df.groupby(list(group_by_cols)).agg({
            'AMOUNT': ['sum', 'mean'],
            'ACCOUNT_ID': 'nunique'
        }).reset_index()
        # Calculate avg_amount_per_customer
        #result_df['avg_amount_per_customer'] = result_df[('AMOUNT', 'sum')] / result_df[('ACCOUNT_ID', 'nunique')]
        # Rename columns as needed
        result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]
        # Name for the dataframe
        df_name = f"result{idx + 1}"
        # Store dataframe in the dictionary
        result_dfs[df_name] = result_df
    return result_dfs
# if __name__ == "__main__":
#      result_dataframes = main()
#      # Now you can access each dataframe by its name in the result_dataframes dictionary
# Provide the values directly to the main_workflow function
# result_dataframes = wf1()
# # Now you can access each dataframe by its name in the result_dataframes dictionary
# print(result_dataframes)better-market-70367
05/02/2024, 5:23 PM