Hi Team for my below code am getting this error, c...
# ask-the-community
a
Hi Team for my below code am getting this error, can anyone suggest something
Copy code
from flytekit import task, workflow
import pandas as pd
from itertools import combinations
from typing import List
from typing import Union

# Function to read CSV files and perform transformations
@task
def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame:
    df = pd.read_csv(file_path, usecols=selected_columns)
    # Perform transformations here if needed
    return df



@task
def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value: Union[str, int, float] = 'EMPTY') -> pd.DataFrame:
    for col in columns:
        df[col].fillna(replace_value, inplace=True)
    return df


# Function to generate combinations of group by parameters
@task
def generate_group_by_combinations(columns):
    group_by_combination = []
    for r in range(1, len(columns) + 1):
        group_by_combination.extend([list(comb) for comb in combinations(columns, r)])
    return group_by_combination

# Function to apply SQL-like transformations and aggregation
@task
def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame:
    # Add savings_present column
    df['SAVINGS_PRESENT'] = df['SAVINGS_ACCOUNT_ID'].notnull().astype(int)
    # Extract year and month from transaction_date
    df['TRANSACTION_YEAR_MONTH'] = df['TRANSACTION_DATE'].str.slice(0, 7)
    return df

# Task to merge two dataframes
@task
def merge_dataframes(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
    return pd.merge(df1, df2, on='ACCOUNT_ID', how='left')

# Task to perform group by and aggregation for a single combination
@task
def group_by_and_aggregate_single(df: pd.DataFrame, group_by_cols: List[str]) -> pd.DataFrame:
    result_df = df.groupby(group_by_cols).agg({
        'AMOUNT': ['sum', 'mean'],
        'ACCOUNT_ID': 'nunique'
    }).reset_index()
    result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]
    return result_df

# Task to iterate over all group by combinations and aggregate
@task
def iterate_and_aggregate(group_by_combinationss: List[List[str]], transformed_df: pd.DataFrame) -> dict:
    result_dfs = {}
    for idx, group_by_cols in enumerate(group_by_combinationss):
        grouped_df = group_by_and_aggregate_single(transformed_df, list(group_by_cols))
        result_dfs[f"result{idx + 1}"] = grouped_df
    return result_dfs

# Main workflow
@workflow
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',    
        csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
        csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'], 
        csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'], 
        group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:

    # Read and transform CSV files
    df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
    df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)

    # Merge dataframes
    merged_df = merge_dataframes(df1=df1, df2=df2)

    # Apply SQL-like transformations
    transformed_df = apply_sql_transformations(df=merged_df)

    # Replace empty cells with 'EMPTY'
    transformed_df = replace_empty_cells(df=transformed_df, columns=group_by_columns, replace_value='EMPTY')

    # Generate all possible combinations of group by parameters
    group_by_combinations = generate_group_by_combinations(columns=group_by_columns)

    # Perform group by and aggregation for all combinations
    result_dfs = iterate_and_aggregate(group_by_combinationss=group_by_combinations, transformed_df=transformed_df)

    return result_dfs

if __name__ == "__main__":
    result_dataframes = wf1()
    print(result_dataframes)
g
You need to type-hint the output of task
generate_group_by_combinations