from moviepy.editor import VideoFileClip, AudioFileClip
import os
import pandas as pd
import datetime
import numpy as np
import typing
from flytekit import task, workflow, ImageSpec
pandas_image = ImageSpec(
name="numpy", # default docker image name.
base_image="
ghcr.io/flyteorg/flytekit:py3.11-1.10.2", # the base image that flytekit will use to build your image.
packages=["pandas","moviepy","numpy"], # python packages to install.
registry="sumarek", # the registry your image will be pushed to.
python_version="3.11" # Optional if python is installed in the base image.
)
# Function to add audio to video
@task(container_image=pandas_image)
def add_audio_to_video(video_path: str, audio_path:str, output_path:str)-> int:
video_clip = VideoFileClip(video_path)
audio_clip = AudioFileClip(audio_path)
if audio_clip.duration > video_clip.duration:
audio_clip = audio_clip.subclip(audio_clip.duration - video_clip.duration)
video_clip = video_clip.set_audio(audio_clip)
video_clip.write_videofile(output_path, codec='libx264', audio_codec='aac')
# Close the clips
video_clip.close()
audio_clip.close()
return 0
def convert_epoch_time(epoch_time_ms):
print('epoch time in ms',epoch_time_ms)
epoch_time_s = int(epoch_time_ms)/ 1000
return datetime.datetime.fromtimestamp(epoch_time_s)
def generate_empty_df(length, columns, index_number):
df = pd.DataFrame(index=range(length), columns=columns)
df[[columns]] = np.nan
df[['index']] = index_number
return df
def get_grouped_df(df):
df['group_id'] = (df['index'] != df['index'].shift()).cumsum()
grouped_dataframes = {group_id: group_df for group_id, group_df in df.groupby('group_id')}
return grouped_dataframes
def insert_missing_rows(df, rows_per_packet):
columns = df.columns.values.tolist()
prev = df['index'].iloc[0]
final_df = generate_empty_df(0, columns,0)
grouped_dataframes = get_grouped_df(df)
for id,group in grouped_dataframes.items():
idx = group['index'].iloc[0]
if(idx != prev and idx !=prev+1 and idx!=0):
print('idx',idx, 'prev', prev)
if (idx > prev):
print('idx-prev',idx-prev)
for i in range(1, idx-prev):
new_df = generate_empty_df(rows_per_packet, columns, prev+i)
final_df = pd.concat([final_df, new_df], ignore_index=True)
else:
for i in range(0, idx):
new_df = generate_empty_df(rows_per_packet,columns, i)
final_df = pd.concat([final_df, new_df], ignore_index=True)
if (len(group)!= rows_per_packet):
new_df = generate_empty_df(rows_per_packet - len(group), columns, idx)
group = pd.concat([group, new_df], ignore_index=True)
final_df = pd.concat([final_df, group], ignore_index=True)
prev = final_df['index'].iloc[-1]
return final_df
def calculate_duration1(df):
start = convert_epoch_time(df['timestamp'].iloc[0])
end = convert_epoch_time(df['timestamp'].iloc[-1])
return (end - start).total_seconds()
def calculate_duration(df):
start = df['timestamp'].iloc[0]
end = df['timestamp'].iloc[-1]
return (end - start).total_seconds()
def generate_timestamps1(df):
print('df received before generating toimestamps', df)
duration = calculate_duration(df)
interval = duration/len(df)
print('interava',interval)
interval_timedelta = datetime.timedelta(seconds=interval)
for row in range(len(df)):
df['timestamp'].iloc[row] = df['timestamp'].iloc[0] + (interval_timedelta * row)
return df, interval
def generate_timestamps(df):
duration = calculate_duration(df)
interval = duration/len(df)
for row in range(len(df)):
df.loc[row, 'timestamp'] = df['timestamp'].iloc[0] + datetime.timedelta(seconds = interval * row)
return df, interval
def get_missing_tail(df, end, interval):
current_end = df['timestamp'].iloc[-1]
cols = df.columns.values.tolist()
while(current_end <= end):
row = pd.DataFrame(index=range(1), columns=df.columns.values.tolist())
row['timestamp'].iloc[0] = current_end + datetime.timedelta(seconds=interval)
df = pd.concat([df, row], ignore_index=True)
current_end = current_end + datetime.timedelta(seconds=interval)
return df
def get_missing_head(df, start, interval):
current_start = df['timestamp'].iloc[0]
while(current_start > start):
row = pd.DataFrame(index=range(1), columns=df.columns.values.tolist())
row['timestamp'].iloc[0] = current_start - datetime.timedelta(seconds=interval)
df = pd.concat([ row, df], ignore_index=True)
current_start = current_start - datetime.timedelta(seconds=interval)
return df
def handle_missing_data(filepath, rows_per_packet, duration_df):
df = pd.read_csv(filepath, index_col=False)
start_time= convert_epoch_time(duration_df.iloc[0][0])
end_time= convert_epoch_time(duration_df.iloc[1][0])
all_df = insert_missing_rows(df, rows_per_packet)
all_df['timestamp'] =
pd.to_datetime(all_df['timestamp'], unit='ms')
new_df, interval = generate_timestamps(all_df)
print('new ', new_df)
if (new_df ['timestamp'].iloc[-1] < end_time):
new_df = get_missing_tail(new_df, end_time, interval)
if (new_df ['timestamp'].iloc[0] > start_time):
new_df = get_missing_head(new_df, start_time, interval)
final_df = new_df.interpolate(method='linear', axis=0).ffill().bfill()
print('final df is theiss before filtering',final_df)
print('start', start_time)
print('startenf', end_time)
mask = (final_df ['timestamp'] >= start_time) & (final_df ['timestamp'] <= end_time)
filtered_df = final_df [mask]
filtered_df.to_csv(filepath, index = False)
@task(container_image=pandas_image)
def preprocesss_sensor_data(session_path:str)-> int:
recording_file='recording_data.csv'
accelerometer_file='accelerometer_data.csv'
emg_file = 'emg_data.csv'
bioz_file='bioz_data.csv'
temperature_file = 'temperature_data.csv'
pulseox_file='oxygen_data.csv'
recording_path = os.path.join(session_path, recording_file)
accelerometer_path = os.path.join(session_path, accelerometer_file)
emg_path = os.path.join(session_path, emg_file)
bioz_path = os.path.join(session_path, bioz_file)
temperature_path = os.path.join(session_path, temperature_file)
pulseox_path = os.path.join(session_path, pulseox_file)
recording_df = pd.read_csv(recording_path, header= None)
print('success')
'''handle_missing_data(temperature_path, temp_sample_count_in_packet, recording_df)
handle_missing_data(accelerometer_path, accel_sample_count_in_packet, recording_df)
handle_missing_data(emg_path, emg_sample_count_in_packet, recording_df)
handle_missing_data(bioz_path, bioz_sample_count_in_packet, recording_df)
handle_missing_data(pulseox_path, pulseox_sample_count_in_packet, recording_df)
'''
return 0
def get_packet_loss(duration, samples_count_in_packet, sampling_frequency, filepath):
rows_per_second = sampling_frequency/samples_count_in_packet
print('rows per seconds', rows_per_second)
print('duration', duration)
expected_samples = duration * sampling_frequency #rows_per_second
print('expected', expected_samples)
df = pd.read_csv(filepath)
actual_samples = len(df)
print('actual', actual_samples)
packet_loss = ((expected_samples - actual_samples)/expected_samples)*100
return packet_loss
@workflow
def preprocess_data()-> int:
# Iterate through each folder
main_folder = "/mnt/hippastorage/temp"
for ids in os.listdir(main_folder):
id_folder = os.path.join(main_folder, ids)
print(id_folder)
if os.path.isdir(id_folder):
for session in os.listdir(id_folder):
session_path = os.path.join(id_folder, session)
print(session_path)
if os.path.isdir(session_path):
video_file = os.path.join(session_path, 'video-1.mp4')
audio_file = os.path.join(session_path, 'audio.m4a')
if os.path.exists(video_file) and os.path.exists(audio_file):
output_file = os.path.join(session_path, 'added_video.mp4')
#a = add_audio_to_video(video_path=video_file, audio_path=audio_file, output_path=output_file)
'''duration = AudioFileClip(audio_file).duration
accel_file_path = os.path.join(session_path, 'accelerometer_data.csv')
accel_packet_loss = get_packet_loss(duration, accel_sample_count_in_packet, accel_sampling_frequency,accel_file_path )
print('Accelerometer packet loss', accel_packet_loss, '%')'''
b = preprocesss_sensor_data(session_path=session_path)
return 0
# Directory containing folders with video and audio files
# Accelerometer Data
accel_sample_count_in_packet = 17
accel_sampling_frequency = 25
# Temperature Data
temp_sample_count_in_packet = 4
temp_sampling_frequency = 2
# EMG Data
emg_sample_count_in_packet = 120
emg_sampling_frequency = 100
# BioZ Data
bioz_sample_count_in_packet = 4
bioz_sampling_frequency = 4
# PulseOx Data
pulseox_sample_count_in_packet = 17
pulseox_sampling_frequency = 25
#preprocess_data(main_folder)