better-market-70367
05/02/2024, 2:39 PMand\or
use &\|
(bitwise) instead. Expr Comp(Promise(node:n0.o0.['_typ']) == <FlyteLiteral scalar { primitive { string_value: "dataframe" } }>)average-finland-92144
05/02/2024, 5:18 PMand
or
directly in Promise
objects, which is not supported by Flyte. This is also a Python limitation as indicated (see https://peps.python.org/pep-0335/)
Instead, try using &
|
better-market-70367
05/02/2024, 5:19 PMaverage-finland-92144
05/02/2024, 5:21 PMbetter-market-70367
05/02/2024, 5:23 PMfrom flytekit import task, workflow, dynamic
import pandas as pd
import os
# Import necessary libraries
import pandas as pd
from itertools import combinations
from typing import List
# from Tasks import read_and_transform_csv, apply_sql_transformations,replace_empty_cells,generate_group_by_combinations
from flytekit import task, workflow, dynamic
# Import necessary libraries
import pandas as pd
from itertools import combinations
from typing import List
#Function to read CSV files and perform transformations
@task
def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame:
df = pd.read_csv(file_path, usecols=selected_columns)
# Perform transformations here if needed
return df
# Function to replace empty cells in specified columns
@task
def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value='EMPTY') -> pd.DataFrame:
for col in columns:
df[col].fillna(replace_value, inplace=True)
return df
# Function to generate combinations of group by parameters
@task
def generate_group_by_combinations(group_by_columns: List[str]) -> List[List[str]]:
group_by_combinations = []
for r in range(1, len(group_by_columns) + 1):
group_by_combinations.extend(combinations(group_by_columns, r))
return group_by_combinations
# Function to apply SQL-like transformations and aggregation
@task
def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame:
# Add savings_present column
df['SAVINGS_PRESENT'] = df['SAVINGS_ACCOUNT_ID'].notnull().astype(int)
# Extract year and month from transaction_date
df['TRANSACTION_YEAR_MONTH'] = df['TRANSACTION_DATE'].str.slice(0, 7)
return df
# Main workflow
# @workflow
# def main_workflow(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',
# csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv',
# csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'],
# csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'],
# group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:
# # Read and transform CSV files
# df1 = read_and_transform_csv(file_path='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv', selected_columns=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'])
# df2 = read_and_transform_csv(file_path='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv', selected_columns=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'])
# group_by_cols=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']
# # Merge dataframes
# merged_df = pd.merge(df1, df2, on='ACCOUNT_ID', how='left')
# # Apply SQL-like transformations
# transformed_df = apply_sql_transformations(df=merged_df)
# # Replace empty cells with 'EMPTY'
# transformed_df = replace_empty_cells(df=transformed_df, columns=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH'], replace_value='EMPTY')
# # Generate all possible combinations of group by parameters
# group_by_combinations = generate_group_by_combinations(group_by_columns=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH'])
# # Dictionary to store result dataframes with their names
# result_dfs = {}
# # Perform group by and aggregation
# for idx, group_by_cols in enumerate(group_by_combinations):
# result_df = transformed_df.groupby(list(group_by_cols)).agg({
# 'AMOUNT': ['sum', 'mean'],
# 'ACCOUNT_ID': 'nunique'
# }).reset_index()
# # Calculate avg_amount_per_customer
# result_df['avg_amount_per_customer'] = result_df[('AMOUNT', 'sum')] / result_df[('ACCOUNT_ID', 'nunique')]
# # Rename columns as needed
# result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]
# # Name for the dataframe
# df_name = f"result{idx + 1}"
# # Store dataframe in the dictionary
# result_dfs[df_name] = result_df
# # return result_dfs
# print(result_dfs)
# # Provide the values directly to the main_workflow function
# # result_dataframes = main_workflow()
# # Now you can access each dataframe by its name in the result_dataframes dictionary
# # Provide the values directly to the main_workflow function
# result_dataframes = main_workflow()
# # Now you can access each dataframe by its name in the result_dataframes dictionary
# print(result_dataframes)
@workflow
# Main function
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',
csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv',
csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'],
csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'],
group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:
# Input CSV file paths and columns
# csv1_path = '/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv'
# csv2_path = '/home/ubuntu/flyte/my_project/workflows/ods_customers.csv'
# csv1_columns = ['ACCOUNT_ID','PRODUCT_TYPE','TRANSACTION_DATE','CATEGORY_NAME','AMOUNT']
# csv2_columns = ['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID']
# Read and transform CSV files
df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)
# Merge dataframes
merged_df = pd.merge(df1, df2, on='ACCOUNT_ID', how='left')
# Apply SQL-like transformations
transformed_df = apply_sql_transformations(df=merged_df)
# Define group by columns
#group_by_columns = ['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']
# Replace empty cells with 'EMPTY'
transformed_df = replace_empty_cells(df=transformed_df,columns=group_by_columns, replace_value='EMPTY')
# Generate all possible combinations of group by parameters
group_by_combinations = generate_group_by_combinations(group_by_columns=group_by_columns )
# Dictionary to store result dataframes with their names
result_dfs = {}
# Perform group by and aggregation
for idx, group_by_cols in enumerate(group_by_combinations):
result_df = transformed_df.groupby(list(group_by_cols)).agg({
'AMOUNT': ['sum', 'mean'],
'ACCOUNT_ID': 'nunique'
}).reset_index()
# Calculate avg_amount_per_customer
#result_df['avg_amount_per_customer'] = result_df[('AMOUNT', 'sum')] / result_df[('ACCOUNT_ID', 'nunique')]
# Rename columns as needed
result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]
# Name for the dataframe
df_name = f"result{idx + 1}"
# Store dataframe in the dictionary
result_dfs[df_name] = result_df
return result_dfs
# if __name__ == "__main__":
# result_dataframes = main()
# # Now you can access each dataframe by its name in the result_dataframes dictionary
# Provide the values directly to the main_workflow function
# result_dataframes = wf1()
# # Now you can access each dataframe by its name in the result_dataframes dictionary
# print(result_dataframes)
better-market-70367
05/02/2024, 5:23 PM