Hi Team for my below code am getting this error, c...
# ask-the-community
Hi Team for my below code am getting this error, can anyone suggest something
Copy code
from flytekit import task, workflow
import pandas as pd
from itertools import combinations
from typing import List
from typing import Union

# Function to read CSV files and perform transformations
def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame:
    df = pd.read_csv(file_path, usecols=selected_columns)
    # Perform transformations here if needed
    return df

def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value: Union[str, int, float] = 'EMPTY') -> pd.DataFrame:
    for col in columns:
        df[col].fillna(replace_value, inplace=True)
    return df

# Function to generate combinations of group by parameters
def generate_group_by_combinations(columns):
    group_by_combination = []
    for r in range(1, len(columns) + 1):
        group_by_combination.extend([list(comb) for comb in combinations(columns, r)])
    return group_by_combination

# Function to apply SQL-like transformations and aggregation
def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame:
    # Add savings_present column
    df['SAVINGS_PRESENT'] = df['SAVINGS_ACCOUNT_ID'].notnull().astype(int)
    # Extract year and month from transaction_date
    df['TRANSACTION_YEAR_MONTH'] = df['TRANSACTION_DATE'].str.slice(0, 7)
    return df

# Task to merge two dataframes
def merge_dataframes(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
    return pd.merge(df1, df2, on='ACCOUNT_ID', how='left')

# Task to perform group by and aggregation for a single combination
def group_by_and_aggregate_single(df: pd.DataFrame, group_by_cols: List[str]) -> pd.DataFrame:
    result_df = df.groupby(group_by_cols).agg({
        'AMOUNT': ['sum', 'mean'],
        'ACCOUNT_ID': 'nunique'
    result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]
    return result_df

# Task to iterate over all group by combinations and aggregate
def iterate_and_aggregate(group_by_combinationss: List[List[str]], transformed_df: pd.DataFrame) -> dict:
    result_dfs = {}
    for idx, group_by_cols in enumerate(group_by_combinationss):
        grouped_df = group_by_and_aggregate_single(transformed_df, list(group_by_cols))
        result_dfs[f"result{idx + 1}"] = grouped_df
    return result_dfs

# Main workflow
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',    
        csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
        csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'], 
        csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'], 
        group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:

    # Read and transform CSV files
    df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
    df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)

    # Merge dataframes
    merged_df = merge_dataframes(df1=df1, df2=df2)

    # Apply SQL-like transformations
    transformed_df = apply_sql_transformations(df=merged_df)

    # Replace empty cells with 'EMPTY'
    transformed_df = replace_empty_cells(df=transformed_df, columns=group_by_columns, replace_value='EMPTY')

    # Generate all possible combinations of group by parameters
    group_by_combinations = generate_group_by_combinations(columns=group_by_columns)

    # Perform group by and aggregation for all combinations
    result_dfs = iterate_and_aggregate(group_by_combinationss=group_by_combinations, transformed_df=transformed_df)

    return result_dfs

if __name__ == "__main__":
    result_dataframes = wf1()
You need to type-hint the output of task