Hi Team , the above code is giving the below error...
# flyte-support
b
Hi Team , the above code is giving the below error Error encountered while executing 'wf1': Cannot perform truth value testing, This is a limitation in python. For Logical
and\or
use
&\|
(bitwise) instead. Expr Comp(Promise(node:n0.o0.['_typ']) == <FlyteLiteral scalar { primitive { string_value: "dataframe" } }>)
a
as the error suggests it seems like your code is trying to use
and
or
directly in
Promise
objects, which is not supported by Flyte. This is also a Python limitation as indicated (see https://peps.python.org/pep-0335/) Instead, try using
&
|
b
David am not using and or in my code
a
got it. could you share here the latest code snippet you're trying to run? Also, according to our Slack guidelines, please keep a single topic on a single thread. I'm getting a bit lost on the multiple messages
b
Copy code
from flytekit import task, workflow, dynamic
import pandas as pd
import os
# Import necessary libraries
import pandas as pd
from itertools import combinations
from typing import List
# from Tasks import read_and_transform_csv, apply_sql_transformations,replace_empty_cells,generate_group_by_combinations
from flytekit import task, workflow, dynamic

# Import necessary libraries
import pandas as pd
from itertools import combinations
from typing import List

#Function to read CSV files and perform transformations
@task
def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame:
    df = pd.read_csv(file_path, usecols=selected_columns)
    # Perform transformations here if needed
    return df

# Function to replace empty cells in specified columns
@task
def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value='EMPTY') -> pd.DataFrame:
    for col in columns:
        df[col].fillna(replace_value, inplace=True)
    return df

# Function to generate combinations of group by parameters
@task
def generate_group_by_combinations(group_by_columns: List[str]) -> List[List[str]]:
    group_by_combinations = []
    for r in range(1, len(group_by_columns) + 1):
        group_by_combinations.extend(combinations(group_by_columns, r))
    return group_by_combinations

# Function to apply SQL-like transformations and aggregation
@task
def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame:
    # Add savings_present column
    df['SAVINGS_PRESENT'] = df['SAVINGS_ACCOUNT_ID'].notnull().astype(int)
    # Extract year and month from transaction_date
    df['TRANSACTION_YEAR_MONTH'] = df['TRANSACTION_DATE'].str.slice(0, 7)
    return df

# Main workflow
# @workflow
# def main_workflow(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv', 
#                   csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
#                   csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'], 
#                   csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'], 
#                   group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:
#     # Read and transform CSV files
#     df1 = read_and_transform_csv(file_path='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv', selected_columns=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'])
#     df2 = read_and_transform_csv(file_path='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', selected_columns=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'])

#     group_by_cols=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']
#     # Merge dataframes
#     merged_df = pd.merge(df1, df2, on='ACCOUNT_ID', how='left')


#     # Apply SQL-like transformations
#     transformed_df = apply_sql_transformations(df=merged_df)

#     # Replace empty cells with 'EMPTY'
#     transformed_df = replace_empty_cells(df=transformed_df, columns=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH'], replace_value='EMPTY')

#     # Generate all possible combinations of group by parameters
#     group_by_combinations = generate_group_by_combinations(group_by_columns=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH'])

#     # Dictionary to store result dataframes with their names
#     result_dfs = {}

#     # Perform group by and aggregation
#     for idx, group_by_cols in enumerate(group_by_combinations):
#         result_df = transformed_df.groupby(list(group_by_cols)).agg({
#             'AMOUNT': ['sum', 'mean'],
#             'ACCOUNT_ID': 'nunique'
#         }).reset_index()

#         # Calculate avg_amount_per_customer
#         result_df['avg_amount_per_customer'] = result_df[('AMOUNT', 'sum')] / result_df[('ACCOUNT_ID', 'nunique')]

#         # Rename columns as needed
#         result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]

#         # Name for the dataframe
#         df_name = f"result{idx + 1}"

#         # Store dataframe in the dictionary
#         result_dfs[df_name] = result_df

#     # return result_dfs
#     print(result_dfs)

# # Provide the values directly to the main_workflow function
# # result_dataframes = main_workflow()
# # Now you can access each dataframe by its name in the result_dataframes dictionary
# # Provide the values directly to the main_workflow function
# result_dataframes = main_workflow()
# # Now you can access each dataframe by its name in the result_dataframes dictionary
# print(result_dataframes)


@workflow
# Main function
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',    
        csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
        csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'], 
        csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'], 
        group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:
    

    # Input CSV file paths and columns
    # csv1_path = '/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv'
    # csv2_path = '/home/ubuntu/flyte/my_project/workflows/ods_customers.csv'
    # csv1_columns = ['ACCOUNT_ID','PRODUCT_TYPE','TRANSACTION_DATE','CATEGORY_NAME','AMOUNT']
    # csv2_columns = ['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID']

     # Read and transform CSV files
    df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
    df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)

    # Merge dataframes
    merged_df = pd.merge(df1, df2, on='ACCOUNT_ID', how='left')

    # Apply SQL-like transformations
    transformed_df = apply_sql_transformations(df=merged_df)

    # Define group by columns
    #group_by_columns = ['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']

    # Replace empty cells with 'EMPTY'
    transformed_df = replace_empty_cells(df=transformed_df,columns=group_by_columns, replace_value='EMPTY')

    # Generate all possible combinations of group by parameters
    group_by_combinations = generate_group_by_combinations(group_by_columns=group_by_columns )

    # Dictionary to store result dataframes with their names
    result_dfs = {}

    # Perform group by and aggregation
    for idx, group_by_cols in enumerate(group_by_combinations):
        result_df = transformed_df.groupby(list(group_by_cols)).agg({
            'AMOUNT': ['sum', 'mean'],
            'ACCOUNT_ID': 'nunique'
        }).reset_index()

        # Calculate avg_amount_per_customer
        #result_df['avg_amount_per_customer'] = result_df[('AMOUNT', 'sum')] / result_df[('ACCOUNT_ID', 'nunique')]

        # Rename columns as needed
        result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]

        # Name for the dataframe
        df_name = f"result{idx + 1}"

        # Store dataframe in the dictionary
        result_dfs[df_name] = result_df

    return result_dfs

# if __name__ == "__main__":
#      result_dataframes = main()
#      # Now you can access each dataframe by its name in the result_dataframes dictionary


# Provide the values directly to the main_workflow function
# result_dataframes = wf1()
# # Now you can access each dataframe by its name in the result_dataframes dictionary
# print(result_dataframes)
Sure David!