better-market-70367
05/02/2024, 4:18 AMand\or
use &\|
(bitwise) instead. Expr Comp(Promise(node:n0.o0.['_typ']) == <FlyteLiteral scalar { primitive { string_value: "dataframe" } }>)
here is my code
from flytekit import task, workflow, dynamic
import pandas as pd
import os
import pandas as pd
from itertools import combinations
from typing import List
from flytekit import task, workflow, dynamic
#Function to read CSV files and perform transformations
@task
def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame:
df = pd.read_csv(file_path, usecols=selected_columns)
return df
# Function to replace empty cells in specified columns
@task
def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value='EMPTY') -> pd.DataFrame:
for col in columns:
df[col].fillna(replace_value, inplace=True)
return df
# Function to generate combinations of group by parameters
@task
def generate_group_by_combinations(group_by_columns: List[str]) -> List[List[str]]:
group_by_combinations = []
for r in range(1, len(group_by_columns) + 1):
group_by_combinations.extend(combinations(group_by_columns, r))
return group_by_combinations
# Function to apply SQL-like transformations and aggregation
@task
def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame:
# Add savings_present column
df['S_PRESENT'] = df['S_ID'].notnull().astype(int)
# Extract year and month from transaction_date
df['TRANSACTION_YEAR_MONTH'] = df['T'].str.slice(0, 7)
return df
@workflow
# Main function
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',
csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv',
csv1_columns: List[str]=['A', 'P', 'T', 'CE', 'AMT'],
csv2_columns: List[str]=['A', 'S_ID'],
group_by_columns: List[str]=['P', 'S_PRESENT', 'C', 'TRANSACTION_YEAR_MONTH']) -> dict:
# Read and transform CSV files
df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)
# Merge dataframes
merged_df = pd.merge(df1, df2, on='A', how='left')
# Apply SQL-like transformations
transformed_df = apply_sql_transformations(df=merged_df)
# Define group by columns
#group_by_columns = ['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']
# Replace empty cells with 'EMPTY'
transformed_df = replace_empty_cells(df=transformed_df,columns=group_by_columns, replace_value='EMPTY')
# Generate all possible combinations of group by parameters
group_by_combinations = generate_group_by_combinations(group_by_columns=group_by_columns )
# Dictionary to store result dataframes with their names
result_dfs = {}
# Perform group by and aggregation
for idx, group_by_cols in enumerate(group_by_combinations):
result_df = transformed_df.groupby(list(group_by_cols)).agg({
'AMT': ['sum', 'mean'],
'A': 'nunique'
}).reset_index()
# Rename columns as needed
result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]
# Name for the dataframe
df_name = f"result{idx + 1}"
# Store dataframe in the dictionary
result_dfs[df_name] = result_df
return result_dfs
Can anyone pls support on thisglamorous-carpet-83516
05/02/2024, 3:01 PMfor idx, group_by_cols in enumerate(group_by_combinations):
enumerate a promise is not allowed in the workflowglamorous-carpet-83516
05/02/2024, 3:02 PMgroup_by_combinations = generate_group_by_combinations(group_by_columns=group_by_columns )generate_group_by_combinations won’t return a list, it always return a promise at compile time
glamorous-carpet-83516
05/02/2024, 3:03 PMglamorous-carpet-83516
05/02/2024, 3:03 PMbetter-market-70367
05/02/2024, 3:07 PMglamorous-carpet-83516
05/02/2024, 3:08 PM@dynamic
def d1(group_by_combinations: List[int]) ->
for idx, group_by_cols in enumerate(group_by_combinations):
better-market-70367
05/02/2024, 3:18 PMfrom flytekit import task, workflow, dynamic
import pandas as pd
from itertools import combinations
from typing import List
# Function to read CSV files and perform transformations
@task
def read_and_transform_csv(file_path: str, selected_columns: List[str]) -> pd.DataFrame:
df = pd.read_csv(file_path, usecols=selected_columns)
# Perform transformations here if needed
return df
# Function to replace empty cells in specified columns
@task
def replace_empty_cells(df: pd.DataFrame, columns: List[str], replace_value='EMPTY') -> pd.DataFrame:
for col in columns:
df[col].fillna(replace_value, inplace=True)
return df
# Function to generate combinations of group by parameters
@task
def generate_group_by_combinations(group_by_columns: List[str]) -> List[List[str]]:
group_by_combinations = []
for r in range(1, len(group_by_columns) + 1):
group_by_combinations.extend(combinations(group_by_columns, r))
return group_by_combinations
# Function to apply SQL-like transformations and aggregation
@task
def apply_sql_transformations(df: pd.DataFrame) -> pd.DataFrame:
# Add savings_present column
df['SAVINGS_PRESENT'] = df['SAVINGS_ACCOUNT_ID'].notnull().astype(int)
# Extract year and month from transaction_date
df['TRANSACTION_YEAR_MONTH'] = df['TRANSACTION_DATE'].str.slice(0, 7)
return df
@workflow
# Main function
def wf1(csv1_path: str='/home/ubuntu/flyte/my_project/workflows/ODS_TRANSACTIONS.csv',
csv2_path: str='/home/ubuntu/flyte/my_project/workflows/ods_customers.csv',
csv1_columns: List[str]=['ACCOUNT_ID', 'PRODUCT_TYPE', 'TRANSACTION_DATE', 'CATEGORY_NAME', 'AMOUNT'],
csv2_columns: List[str]=['ACCOUNT_ID', 'SAVINGS_ACCOUNT_ID'],
group_by_columns: List[str]=['PRODUCT_TYPE', 'SAVINGS_PRESENT', 'CATEGORY_NAME', 'TRANSACTION_YEAR_MONTH']) -> dict:
# Read and transform CSV files
df1 = read_and_transform_csv(file_path=csv1_path, selected_columns=csv1_columns)
df2 = read_and_transform_csv(file_path=csv2_path, selected_columns=csv2_columns)
# Merge dataframes
merged_df = pd.merge(df1, df2, on='ACCOUNT_ID', how='left')
# Apply SQL-like transformations
transformed_df = apply_sql_transformations(df=merged_df)
# Replace empty cells with 'EMPTY'
transformed_df = replace_empty_cells(df=transformed_df,columns=group_by_columns, replace_value='EMPTY')
# Generate all possible combinations of group by parameters
@dynamic
def process_group_by_combinations(group_by_combinations: List[List[str]]):
result_dfs = {}
for idx, group_by_cols in enumerate(group_by_combinations):
result_df = transformed_df.groupby(list(group_by_cols)).agg({
'AMOUNT': ['sum', 'mean'],
'ACCOUNT_ID': 'nunique'
}).reset_index()
# Rename columns as needed
result_df.columns = ['_'.join(col).strip() for col in result_df.columns.values]
# Name for the dataframe
df_name = f"result{idx + 1}"
# Store dataframe in the dictionary
result_dfs[df_name] = result_df
return result_dfs
return process_group_by_combinations(group_by_combinations=generate_group_by_combinations(group_by_columns))
# Provide the values directly to the main_workflow function
result_dataframes = wf1()
# Now you can access each dataframe by its name in the result_dataframes dictionary
print(result_dataframes)
better-market-70367
05/02/2024, 3:19 PMbetter-market-70367
05/02/2024, 3:19 PM