Hi Team, Please help me out with below code and i...
# flyte-support
b
Hi Team, Please help me out with below code and its error
Copy code
from flytekit import task, workflow, dynamic
import pandas as pd
import os

import pandas as pd
from itertools import combinations
from typing import List
from flytekit import task, workflow, dynamic


#Function to read CSV files and perform transformations
@task
def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame:
    df = pd.read_csv(file_path, usecols=selected_columns)
    return df

# Function to replace empty cells in specified columns
@task
def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value='EMPTY') -> pd.DataFrame:
    for col in columns:
        df[col].fillna(replace_value, inplace=True)
    return df

# Function to generate combinations of group by parameters
@task
def generate_group_by_combinations(group_by_columns: List[str]) -> List[List[str]]:
    group_by_combinations = []
    for r in range(1, len(group_by_columns) + 1):
        group_by_combinations.extend(combinations(group_by_columns, r))
    return group_by_combinations

# Function to apply SQL-like transformations and aggregation
@task
def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame:
    # Add savings_present column
    df['S_PRESENT'] = df['S_ID'].notnull().astype(int)
    # Extract year and month from transaction_date
    df['TRANSACTION_YEAR_MONTH'] = df['T'].str.slice(0, 7)
    return df


@workflow
# Main function
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',    
        csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
        csv1_columns: List[str]=['A', 'P', 'T', 'CE', 'AMT'], 
        csv2_columns: List[str]=['A', 'S_ID'], 
        group_by_columns: List[str]=['P', 'S_PRESENT', 'C', 'TRANSACTION_YEAR_MONTH']) -> dict:

     # Read and transform CSV files
    df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
    df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)

    # Merge dataframes
    merged_df = pd.merge(df1, df2, on='A', how='left')

    # Apply SQL-like transformations
    transformed_df = apply_sql_transformations(df=merged_df)

    # Define group by columns
    #group_by_columns = ['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']

    # Replace empty cells with 'EMPTY'
    transformed_df = replace_empty_cells(df=transformed_df,columns=group_by_columns, replace_value='EMPTY')

    # Generate all possible combinations of group by parameters
    group_by_combinations = generate_group_by_combinations(group_by_columns=group_by_columns )

    # Dictionary to store result dataframes with their names
    result_dfs = {}

    # Perform group by and aggregation
    for idx, group_by_cols in enumerate(group_by_combinations):
        result_df = transformed_df.groupby(list(group_by_cols)).agg({
            'AMT': ['sum', 'mean'],
            'A': 'nunique'
        }).reset_index()


        # Rename columns as needed
        result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]

        # Name for the dataframe
        df_name = f"result{idx + 1}"

        # Store dataframe in the dictionary
        result_dfs[df_name] = result_df

    return result_dfs
p
i'm not certain on the specific error, but you generally can't do operations on the task results (promises) within the workflow. i'd try breaking out the logic in
# Perform group by and aggregation
to a new task
👍 1
gratitude thank you 1
w
@average-finland-92144 @better-market-70367 @freezing-airport-6809 @glamorous-carpet-83516....here are the issues we are currently having....thanks
f
@wonderful-pizza-65944 / @better-market-70367 this is a challenge with Flyte's python DSL. It is a domain specific language, it looks like python, but not really full python. If you want to do any computation on inputs/outputs or work on data you have to do that in a task. For example in this,
Copy code
# Merge dataframes
    merged_df = pd.merge(df1, df2, on='A', how='left')
OR
Copy code
# Dictionary to store result dataframes with their names
    result_dfs = {}

    # Perform group by and aggregation
    for idx, group_by_cols in enumerate(group_by_combinations):
        result_df = transformed_df.groupby(list(group_by_cols)).agg({
            'AMT': ['sum', 'mean'],
            'A': 'nunique'
        }).reset_index()


        # Rename columns as needed
        result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]

        # Name for the dataframe
        df_name = f"result{idx + 1}"

        # Store dataframe in the dictionary
        result_dfs[df_name] = result_df
Should be moved to tasks.
b
Hi @freezing-airport-6809 @average-finland-92144 After your suggestions I updated my code and now on the last line of my workflow am getting this final error
Copy code
from flytekit import task, workflow
import pandas as pd
from itertools import combinations
from typing import List
from typing import Union

# Function to read CSV files and perform transformations
@task
def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame:
    df = pd.read_csv(file_path, usecols=selected_columns)
    # Perform transformations here if needed
    return df



@task
def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value: Union[str, int, float] = 'EMPTY') -> pd.DataFrame:
    for col in columns:
        df[col].fillna(replace_value, inplace=True)
    return df


# Function to generate combinations of group by parameters
@task
def generate_group_by_combinations(columns):
    group_by_combination = []
    for r in range(1, len(columns) + 1):
        group_by_combination.extend([list(comb) for comb in combinations(columns, r)])
    return group_by_combination

# Function to apply SQL-like transformations and aggregation
@task
def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame:
    # Add savings_present column
    df['SAVINGS_PRESENT'] = df['SAVINGS_ACCOUNT_ID'].notnull().astype(int)
    # Extract year and month from transaction_date
    df['TRANSACTION_YEAR_MONTH'] = df['TRANSACTION_DATE'].str.slice(0, 7)
    return df

# Task to merge two dataframes
@task
def merge_dataframes(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
    return pd.merge(df1, df2, on='ACCOUNT_ID', how='left')

# Task to perform group by and aggregation for a single combination
@task
def group_by_and_aggregate_single(df: pd.DataFrame, group_by_cols: List[str]) -> pd.DataFrame:
    result_df = df.groupby(group_by_cols).agg({
        'AMOUNT': ['sum', 'mean'],
        'ACCOUNT_ID': 'nunique'
    }).reset_index()
    result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]
    return result_df

# Task to iterate over all group by combinations and aggregate
@task
def iterate_and_aggregate(group_by_combinationss: List[List[str]], transformed_df: pd.DataFrame) -> dict:
    result_dfs = {}
    for idx, group_by_cols in enumerate(group_by_combinationss):
        grouped_df = group_by_and_aggregate_single(transformed_df, list(group_by_cols))
        result_dfs[f"result{idx + 1}"] = grouped_df
    return result_dfs

# Main workflow
@workflow
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',    
        csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
        csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'], 
        csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'], 
        group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:

    # Read and transform CSV files
    df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
    df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)

    # Merge dataframes
    merged_df = merge_dataframes(df1=df1, df2=df2)

    # Apply SQL-like transformations
    transformed_df = apply_sql_transformations(df=merged_df)

    # Replace empty cells with 'EMPTY'
    transformed_df = replace_empty_cells(df=transformed_df, columns=group_by_columns, replace_value='EMPTY')

    # Generate all possible combinations of group by parameters
    group_by_combinations = generate_group_by_combinations(columns=group_by_columns)

    # Perform group by and aggregation for all combinations
    result_dfs = iterate_and_aggregate(group_by_combinationss=group_by_combinations, transformed_df=transformed_df)

    return result_dfs

if __name__ == "__main__":
    result_dataframes = wf1()
    print(result_dataframes)
Copy code
errror
AssertionError: Error encountered while executing 'wf1':
  Failed to Bind variable group_by_combinationss for function test7.iterate_and_aggregate.
f
@better-market-70367 quick question, why split into so many tasks?
just make all of this into one task simply
Copy code
# Merge dataframes
    merged_df = merge_dataframes(df1=df1, df2=df2)

    # Apply SQL-like transformations
    transformed_df = apply_sql_transformations(df=merged_df)

    # Replace empty cells with 'EMPTY'
    transformed_df = replace_empty_cells(df=transformed_df, columns=group_by_columns, replace_value='EMPTY')

    # Generate all possible combinations of group by parameters
    group_by_combinations = generate_group_by_combinations(columns=group_by_columns)

    # Perform group by and aggregation for all combinations
    result_dfs = iterate_and_aggregate(group_by_combinationss=group_by_combinations, transformed_df=transformed_df)

    return result_dfs
this is good enough
Copy code
@workflow
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',    
        csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', 
        csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'], 
        csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'], 
        group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:

    # Read and transform CSV files
    df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
    df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)

    return merge_and_aggregate(df1=df1, df2=df2, group_by_columns=group_by_columns)
gratitude thank you 1
@better-market-70367 ^