better-market-70367
05/02/2024, 2:50 PMfrom flytekit import task, workflow, dynamic
import pandas as pd
import os
import pandas as pd
from itertools import combinations
from typing import List
from flytekit import task, workflow, dynamic
#Function to read CSV files and perform transformations
@task
def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame:
    df = pd.read_csv(file_path, usecols=selected_columns)
    return df
# Function to replace empty cells in specified columns
@task
def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value='EMPTY') -> pd.DataFrame:
    for col in columns:
        df[col].fillna(replace_value, inplace=True)
    return df
# Function to generate combinations of group by parameters
@task
def generate_group_by_combinations(group_by_columns: List[str]) -> List[List[str]]:
    group_by_combinations = []
    for r in range(1, len(group_by_columns) + 1):
        group_by_combinations.extend(combinations(group_by_columns, r))
    return group_by_combinations
# Function to apply SQL-like transformations and aggregation
@task
def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame:
    # Add savings_present column
    df['S_PRESENT'] = df['S_ID'].notnull().astype(int)
    # Extract year and month from transaction_date
    df['TRANSACTION_YEAR_MONTH'] = df['T'].str.slice(0, 7)
    return df
@workflow
# Main function
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',    
        csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
        csv1_columns: List[str]=['A', 'P', 'T', 'CE', 'AMT'], 
        csv2_columns: List[str]=['A', 'S_ID'], 
        group_by_columns: List[str]=['P', 'S_PRESENT', 'C', 'TRANSACTION_YEAR_MONTH']) -> dict:
     # Read and transform CSV files
    df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
    df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)
    # Merge dataframes
    merged_df = pd.merge(df1, df2, on='A', how='left')
    # Apply SQL-like transformations
    transformed_df = apply_sql_transformations(df=merged_df)
    # Define group by columns
    #group_by_columns = ['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']
    # Replace empty cells with 'EMPTY'
    transformed_df = replace_empty_cells(df=transformed_df,columns=group_by_columns, replace_value='EMPTY')
    # Generate all possible combinations of group by parameters
    group_by_combinations = generate_group_by_combinations(group_by_columns=group_by_columns )
    # Dictionary to store result dataframes with their names
    result_dfs = {}
    # Perform group by and aggregation
    for idx, group_by_cols in enumerate(group_by_combinations):
        result_df = transformed_df.groupby(list(group_by_cols)).agg({
            'AMT': ['sum', 'mean'],
            'A': 'nunique'
        }).reset_index()
        # Rename columns as needed
        result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]
        # Name for the dataframe
        df_name = f"result{idx + 1}"
        # Store dataframe in the dictionary
        result_dfs[df_name] = result_df
    return result_dfsproud-answer-87162
05/02/2024, 3:24 PM# Perform group by and aggregationwonderful-pizza-65944
05/02/2024, 4:39 PMfreezing-airport-6809
# Merge dataframes
    merged_df = pd.merge(df1, df2, on='A', how='left')# Dictionary to store result dataframes with their names
    result_dfs = {}
    # Perform group by and aggregation
    for idx, group_by_cols in enumerate(group_by_combinations):
        result_df = transformed_df.groupby(list(group_by_cols)).agg({
            'AMT': ['sum', 'mean'],
            'A': 'nunique'
        }).reset_index()
        # Rename columns as needed
        result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]
        # Name for the dataframe
        df_name = f"result{idx + 1}"
        # Store dataframe in the dictionary
        result_dfs[df_name] = result_dfbetter-market-70367
05/02/2024, 7:03 PMfrom flytekit import task, workflow
import pandas as pd
from itertools import combinations
from typing import List
from typing import Union
# Function to read CSV files and perform transformations
@task
def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame:
    df = pd.read_csv(file_path, usecols=selected_columns)
    # Perform transformations here if needed
    return df
@task
def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value: Union[str, int, float] = 'EMPTY') -> pd.DataFrame:
    for col in columns:
        df[col].fillna(replace_value, inplace=True)
    return df
# Function to generate combinations of group by parameters
@task
def generate_group_by_combinations(columns):
    group_by_combination = []
    for r in range(1, len(columns) + 1):
        group_by_combination.extend([list(comb) for comb in combinations(columns, r)])
    return group_by_combination
# Function to apply SQL-like transformations and aggregation
@task
def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame:
    # Add savings_present column
    df['SAVINGS_PRESENT'] = df['SAVINGS_ACCOUNT_ID'].notnull().astype(int)
    # Extract year and month from transaction_date
    df['TRANSACTION_YEAR_MONTH'] = df['TRANSACTION_DATE'].str.slice(0, 7)
    return df
# Task to merge two dataframes
@task
def merge_dataframes(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
    return pd.merge(df1, df2, on='ACCOUNT_ID', how='left')
# Task to perform group by and aggregation for a single combination
@task
def group_by_and_aggregate_single(df: pd.DataFrame, group_by_cols: List[str]) -> pd.DataFrame:
    result_df = df.groupby(group_by_cols).agg({
        'AMOUNT': ['sum', 'mean'],
        'ACCOUNT_ID': 'nunique'
    }).reset_index()
    result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]
    return result_df
# Task to iterate over all group by combinations and aggregate
@task
def iterate_and_aggregate(group_by_combinationss: List[List[str]], transformed_df: pd.DataFrame) -> dict:
    result_dfs = {}
    for idx, group_by_cols in enumerate(group_by_combinationss):
        grouped_df = group_by_and_aggregate_single(transformed_df, list(group_by_cols))
        result_dfs[f"result{idx + 1}"] = grouped_df
    return result_dfs
# Main workflow
@workflow
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',    
        csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
        csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'], 
        csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'], 
        group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:
    # Read and transform CSV files
    df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
    df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)
    # Merge dataframes
    merged_df = merge_dataframes(df1=df1, df2=df2)
    # Apply SQL-like transformations
    transformed_df = apply_sql_transformations(df=merged_df)
    # Replace empty cells with 'EMPTY'
    transformed_df = replace_empty_cells(df=transformed_df, columns=group_by_columns, replace_value='EMPTY')
    # Generate all possible combinations of group by parameters
    group_by_combinations = generate_group_by_combinations(columns=group_by_columns)
    # Perform group by and aggregation for all combinations
    result_dfs = iterate_and_aggregate(group_by_combinationss=group_by_combinations, transformed_df=transformed_df)
    return result_dfs
if __name__ == "__main__":
    result_dataframes = wf1()
    print(result_dataframes)better-market-70367
05/02/2024, 7:03 PMerrror
AssertionError: Error encountered while executing 'wf1':
  Failed to Bind variable group_by_combinationss for function test7.iterate_and_aggregate.freezing-airport-6809
freezing-airport-6809
# Merge dataframes
    merged_df = merge_dataframes(df1=df1, df2=df2)
    # Apply SQL-like transformations
    transformed_df = apply_sql_transformations(df=merged_df)
    # Replace empty cells with 'EMPTY'
    transformed_df = replace_empty_cells(df=transformed_df, columns=group_by_columns, replace_value='EMPTY')
    # Generate all possible combinations of group by parameters
    group_by_combinations = generate_group_by_combinations(columns=group_by_columns)
    # Perform group by and aggregation for all combinations
    result_dfs = iterate_and_aggregate(group_by_combinationss=group_by_combinations, transformed_df=transformed_df)
    return result_dfsfreezing-airport-6809
freezing-airport-6809
@workflow
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',    
        csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
        csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'], 
        csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'], 
        group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:
    # Read and transform CSV files
    df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
    df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)
    return merge_and_aggregate(df1=df1, df2=df2, group_by_columns=group_by_columns)freezing-airport-6809
