Hi all! I have just started using Flyte today. I h...
# ask-the-community
s
Hi all! I have just started using Flyte today. I have created a workflow which reads a csv file and performs preprocessing task on it. When running it locally, I don't have any issues. But, when I run using --remote tag, I'm getting an error saying "No such file or directory". Any help would be greatly appreciated.
y
Could you share your workflow for more information?
s
from moviepy.editor import VideoFileClip, AudioFileClip import os import pandas as pd import datetime import numpy as np import typing from flytekit import task, workflow, ImageSpec pandas_image = ImageSpec( name="numpy", # default docker image name. base_image="ghcr.io/flyteorg/flytekit:py3.11-1.10.2", # the base image that flytekit will use to build your image. packages=["pandas","moviepy","numpy"], # python packages to install. registry="sumarek", # the registry your image will be pushed to. python_version="3.11" # Optional if python is installed in the base image. ) # Function to add audio to video @task(container_image=pandas_image) def add_audio_to_video(video_path: str, audio_path:str, output_path:str)-> int: video_clip = VideoFileClip(video_path) audio_clip = AudioFileClip(audio_path) if audio_clip.duration > video_clip.duration: audio_clip = audio_clip.subclip(audio_clip.duration - video_clip.duration) video_clip = video_clip.set_audio(audio_clip) video_clip.write_videofile(output_path, codec='libx264', audio_codec='aac') # Close the clips video_clip.close() audio_clip.close() return 0 def convert_epoch_time(epoch_time_ms): print('epoch time in ms',epoch_time_ms) epoch_time_s = int(epoch_time_ms)/ 1000 return datetime.datetime.fromtimestamp(epoch_time_s) def generate_empty_df(length, columns, index_number): df = pd.DataFrame(index=range(length), columns=columns) df[[columns]] = np.nan df[['index']] = index_number return df def get_grouped_df(df): df['group_id'] = (df['index'] != df['index'].shift()).cumsum() grouped_dataframes = {group_id: group_df for group_id, group_df in df.groupby('group_id')} return grouped_dataframes def insert_missing_rows(df, rows_per_packet): columns = df.columns.values.tolist() prev = df['index'].iloc[0] final_df = generate_empty_df(0, columns,0) grouped_dataframes = get_grouped_df(df) for id,group in grouped_dataframes.items(): idx = group['index'].iloc[0] if(idx != prev and idx !=prev+1 and idx!=0): print('idx',idx, 'prev', prev) if (idx > prev): print('idx-prev',idx-prev) for i in range(1, idx-prev): new_df = generate_empty_df(rows_per_packet, columns, prev+i) final_df = pd.concat([final_df, new_df], ignore_index=True) else: for i in range(0, idx): new_df = generate_empty_df(rows_per_packet,columns, i) final_df = pd.concat([final_df, new_df], ignore_index=True) if (len(group)!= rows_per_packet): new_df = generate_empty_df(rows_per_packet - len(group), columns, idx) group = pd.concat([group, new_df], ignore_index=True) final_df = pd.concat([final_df, group], ignore_index=True) prev = final_df['index'].iloc[-1] return final_df def calculate_duration1(df): start = convert_epoch_time(df['timestamp'].iloc[0]) end = convert_epoch_time(df['timestamp'].iloc[-1]) return (end - start).total_seconds() def calculate_duration(df): start = df['timestamp'].iloc[0] end = df['timestamp'].iloc[-1] return (end - start).total_seconds() def generate_timestamps1(df): print('df received before generating toimestamps', df) duration = calculate_duration(df) interval = duration/len(df) print('interava',interval) interval_timedelta = datetime.timedelta(seconds=interval) for row in range(len(df)): df['timestamp'].iloc[row] = df['timestamp'].iloc[0] + (interval_timedelta * row) return df, interval def generate_timestamps(df): duration = calculate_duration(df) interval = duration/len(df) for row in range(len(df)): df.loc[row, 'timestamp'] = df['timestamp'].iloc[0] + datetime.timedelta(seconds = interval * row) return df, interval def get_missing_tail(df, end, interval): current_end = df['timestamp'].iloc[-1] cols = df.columns.values.tolist() while(current_end <= end): row = pd.DataFrame(index=range(1), columns=df.columns.values.tolist()) row['timestamp'].iloc[0] = current_end + datetime.timedelta(seconds=interval) df = pd.concat([df, row], ignore_index=True) current_end = current_end + datetime.timedelta(seconds=interval) return df def get_missing_head(df, start, interval): current_start = df['timestamp'].iloc[0] while(current_start > start): row = pd.DataFrame(index=range(1), columns=df.columns.values.tolist()) row['timestamp'].iloc[0] = current_start - datetime.timedelta(seconds=interval) df = pd.concat([ row, df], ignore_index=True) current_start = current_start - datetime.timedelta(seconds=interval) return df def handle_missing_data(filepath, rows_per_packet, duration_df): df = pd.read_csv(filepath, index_col=False) start_time= convert_epoch_time(duration_df.iloc[0][0]) end_time= convert_epoch_time(duration_df.iloc[1][0]) all_df = insert_missing_rows(df, rows_per_packet) all_df['timestamp'] = pd.to_datetime(all_df['timestamp'], unit='ms') new_df, interval = generate_timestamps(all_df) print('new ', new_df) if (new_df ['timestamp'].iloc[-1] < end_time): new_df = get_missing_tail(new_df, end_time, interval) if (new_df ['timestamp'].iloc[0] > start_time): new_df = get_missing_head(new_df, start_time, interval) final_df = new_df.interpolate(method='linear', axis=0).ffill().bfill() print('final df is theiss before filtering',final_df) print('start', start_time) print('startenf', end_time) mask = (final_df ['timestamp'] >= start_time) & (final_df ['timestamp'] <= end_time) filtered_df = final_df [mask] filtered_df.to_csv(filepath, index = False) @task(container_image=pandas_image) def preprocesss_sensor_data(session_path:str)-> int: recording_file='recording_data.csv' accelerometer_file='accelerometer_data.csv' emg_file = 'emg_data.csv' bioz_file='bioz_data.csv' temperature_file = 'temperature_data.csv' pulseox_file='oxygen_data.csv' recording_path = os.path.join(session_path, recording_file) accelerometer_path = os.path.join(session_path, accelerometer_file) emg_path = os.path.join(session_path, emg_file) bioz_path = os.path.join(session_path, bioz_file) temperature_path = os.path.join(session_path, temperature_file) pulseox_path = os.path.join(session_path, pulseox_file) recording_df = pd.read_csv(recording_path, header= None) print('success') '''handle_missing_data(temperature_path, temp_sample_count_in_packet, recording_df) handle_missing_data(accelerometer_path, accel_sample_count_in_packet, recording_df) handle_missing_data(emg_path, emg_sample_count_in_packet, recording_df) handle_missing_data(bioz_path, bioz_sample_count_in_packet, recording_df) handle_missing_data(pulseox_path, pulseox_sample_count_in_packet, recording_df) ''' return 0 def get_packet_loss(duration, samples_count_in_packet, sampling_frequency, filepath): rows_per_second = sampling_frequency/samples_count_in_packet print('rows per seconds', rows_per_second) print('duration', duration) expected_samples = duration * sampling_frequency #rows_per_second print('expected', expected_samples) df = pd.read_csv(filepath) actual_samples = len(df) print('actual', actual_samples) packet_loss = ((expected_samples - actual_samples)/expected_samples)*100 return packet_loss @workflow def preprocess_data()-> int: # Iterate through each folder main_folder = "/mnt/hippastorage/temp" for ids in os.listdir(main_folder): id_folder = os.path.join(main_folder, ids) print(id_folder) if os.path.isdir(id_folder): for session in os.listdir(id_folder): session_path = os.path.join(id_folder, session) print(session_path) if os.path.isdir(session_path): video_file = os.path.join(session_path, 'video-1.mp4') audio_file = os.path.join(session_path, 'audio.m4a') if os.path.exists(video_file) and os.path.exists(audio_file): output_file = os.path.join(session_path, 'added_video.mp4') #a = add_audio_to_video(video_path=video_file, audio_path=audio_file, output_path=output_file) '''duration = AudioFileClip(audio_file).duration accel_file_path = os.path.join(session_path, 'accelerometer_data.csv') accel_packet_loss = get_packet_loss(duration, accel_sample_count_in_packet, accel_sampling_frequency,accel_file_path ) print('Accelerometer packet loss', accel_packet_loss, '%')''' b = preprocesss_sensor_data(session_path=session_path) return 0 # Directory containing folders with video and audio files # Accelerometer Data accel_sample_count_in_packet = 17 accel_sampling_frequency = 25 # Temperature Data temp_sample_count_in_packet = 4 temp_sampling_frequency = 2 # EMG Data emg_sample_count_in_packet = 120 emg_sampling_frequency = 100 # BioZ Data bioz_sample_count_in_packet = 4 bioz_sampling_frequency = 4 # PulseOx Data pulseox_sample_count_in_packet = 17 pulseox_sampling_frequency = 25 #preprocess_data(main_folder)
Thank you so much for responding 🙏 . Is there any way to have volumes in imageSpec? This is the imageSpec I'm using. The folder I'm trying to access is outside my project. pandas_image = ImageSpec( name="numpy", # default docker image name. base_image="ghcr.io/flyteorg/flytekit:py3.11-1.10.2", # the base image that flytekit will use to build your image. packages=["pandas","moviepy","numpy"], # python packages to install. registry="sumarek", # the registry your image will be pushed to. python_version="3.11" # Optional if python is installed in the base image. )
y
cc @Kevin Su
k
session_path is a local path, you should upload the file to s3
you can also change the data type of task’s input to flyteFile, flyte will automatically download your file while task is running. https://docs.flyte.org/en/latest/user_guide/data_types_and_io/flytefile.html
s
s
Thank you for your response. I cannot directly upload the data to s3 as my data is on server. I'm hosting Flyte on my server. When I tried to upload data to minio buckets through upload button on the UI, It is showing files in my local system. Due to security issues, I cannot download the data from server to my local machine. How can I work around this situation?