Hi Team, I am getting the below error while execu...
# ask-the-community
a
Hi Team, I am getting the below error while executing my flyte pipleine Error encountered while executing 'wf1': Cannot perform truth value testing, This is a limitation in python. For Logical
and\or
use
&\|
(bitwise) instead. Expr Comp(Promise(node:n0.o0.['_typ']) == <FlyteLiteral scalar { primitive { string_value: "dataframe" } }>) here is my code from flytekit import task, workflow, dynamic import pandas as pd import os import pandas as pd from itertools import combinations from typing import List from flytekit import task, workflow, dynamic #Function to read CSV files and perform transformations @task def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame: df = pd.read_csv(file_path, usecols=selected_columns) return df # Function to replace empty cells in specified columns @task def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value='EMPTY') -> pd.DataFrame: for col in columns: df[col].fillna(replace_value, inplace=True) return df # Function to generate combinations of group by parameters @task def generate_group_by_combinations(group_by_columns: List[str]) -> List[List[str]]: group_by_combinations = [] for r in range(1, len(group_by_columns) + 1): group_by_combinations.extend(combinations(group_by_columns, r)) return group_by_combinations # Function to apply SQL-like transformations and aggregation @task def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame: # Add savings_present column df['S_PRESENT'] = df['S_ID'].notnull().astype(int) # Extract year and month from transaction_date df['TRANSACTION_YEAR_MONTH'] = df['T'].str.slice(0, 7) return df @workflow # Main function def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv', csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', csv1_columns: List[str]=['A', 'P', 'T', 'CE', 'AMT'], csv2_columns: List[str]=['A', 'S_ID'], group_by_columns: List[str]=['P', 'S_PRESENT', 'C', 'TRANSACTION_YEAR_MONTH']) -> dict: # Read and transform CSV files df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns) df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns) # Merge dataframes merged_df = pd.merge(df1, df2, on='A', how='left') # Apply SQL-like transformations transformed_df = apply_sql_transformations(df=merged_df) # Define group by columns #group_by_columns = ['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH'] # Replace empty cells with 'EMPTY' transformed_df = replace_empty_cells(df=transformed_df,columns=group_by_columns, replace_value='EMPTY') # Generate all possible combinations of group by parameters group_by_combinations = generate_group_by_combinations(group_by_columns=group_by_columns ) # Dictionary to store result dataframes with their names result_dfs = {} # Perform group by and aggregation for idx, group_by_cols in enumerate(group_by_combinations): result_df = transformed_df.groupby(list(group_by_cols)).agg({ 'AMT': ['sum', 'mean'], 'A': 'nunique' }).reset_index() # Rename columns as needed result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values] # Name for the dataframe df_name = f"result{idx + 1}" # Store dataframe in the dictionary result_dfs[df_name] = result_df return result_dfs Can anyone pls support on this
k
Copy code
for idx, group_by_cols in enumerate(group_by_combinations):
enumerate a promise is not allowed in the workflow
group_by_combinations = generate_group_by_combinations(group_by_columns=group_by_columns )
generate_group_by_combinations won’t return a list, it always return a promise at compile time
@Akash Dubey ^^^
a
Hi Kevin, Can you suggest any solutions to the code itself as to how i can overcome the current issues
k
something like
Copy code
@dynamic
def d1(group_by_combinations: List[int]) ->
    for idx, group_by_cols in enumerate(group_by_combinations):
a
Copy code
from flytekit import task, workflow, dynamic
import pandas as pd
from itertools import combinations
from typing import List

# Function to read CSV files and perform transformations
@task
def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame:
    df = pd.read_csv(file_path, usecols=selected_columns)
    # Perform transformations here if needed
    return df

# Function to replace empty cells in specified columns
@task
def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value='EMPTY') -> pd.DataFrame:
    for col in columns:
        df[col].fillna(replace_value, inplace=True)
    return df

# Function to generate combinations of group by parameters
@task
def generate_group_by_combinations(group_by_columns: List[str]) -> List[List[str]]:
    group_by_combinations = []
    for r in range(1, len(group_by_columns) + 1):
        group_by_combinations.extend(combinations(group_by_columns, r))
    return group_by_combinations

# Function to apply SQL-like transformations and aggregation
@task
def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame:
    # Add savings_present column
    df['SAVINGS_PRESENT'] = df['SAVINGS_ACCOUNT_ID'].notnull().astype(int)
    # Extract year and month from transaction_date
    df['TRANSACTION_YEAR_MONTH'] = df['TRANSACTION_DATE'].str.slice(0, 7)
    return df

@workflow
# Main function
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',    
        csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
        csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'], 
        csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'], 
        group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:
    
    # Read and transform CSV files
    df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
    df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)

    # Merge dataframes
    merged_df = pd.merge(df1, df2, on='ACCOUNT_ID', how='left')

    # Apply SQL-like transformations
    transformed_df = apply_sql_transformations(df=merged_df)

    # Replace empty cells with 'EMPTY'
    transformed_df = replace_empty_cells(df=transformed_df,columns=group_by_columns, replace_value='EMPTY')

    # Generate all possible combinations of group by parameters
    @dynamic
    def process_group_by_combinations(group_by_combinations: List[List[str]]):
        result_dfs = {}
        for idx, group_by_cols in enumerate(group_by_combinations):
            result_df = transformed_df.groupby(list(group_by_cols)).agg({
                'AMOUNT': ['sum', 'mean'],
                'ACCOUNT_ID': 'nunique'
            }).reset_index()

            # Rename columns as needed
            result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]

            # Name for the dataframe
            df_name = f"result{idx + 1}"

            # Store dataframe in the dictionary
            result_dfs[df_name] = result_df

        return result_dfs

    return process_group_by_combinations(group_by_combinations=generate_group_by_combinations(group_by_columns))

# Provide the values directly to the main_workflow function
result_dataframes = wf1()
# Now you can access each dataframe by its name in the result_dataframes dictionary
print(result_dataframes)
would this help?
Its again giving the same error