mysterious-painter-66441
10/03/2024, 7:59 PM@task()
def fetch_and_upload_data(load_type: str) -> FlyteFile:
import os
timestamps = pd.date_range(start="2024-01-01", periods=96, freq='15T')
# Sample data for site PV and temperature
np.random.seed(0)
pv_data = np.random.uniform(low=0, high=100, size=len(timestamps)) # PV generation in kW
temperature_data = np.random.uniform(low=-10, high=35, size=len(timestamps)) # Temperature in Celsius
# Create a DataFrame
data = pd.DataFrame({
'Timestamp': timestamps,
'site': pv_data,
'temperature': temperature_data
})
execution_id = current_context().execution_id.name
raw_data_path_remote = f"<s3://jielian-dev/{execution_id}/raw_data_{load_type}_2022-08-31T17:00:00Z_2023-03-03T11:00:00Z.csv>"
# write to local path
raw_data_path_local = Path(flytekit.current_context().working_directory) / f"{execution_id}/raw_data_{load_type}_2022-08-31T17:00:00Z_2023-03-03T11:00:00Z.csv"
directory = Path(flytekit.current_context().working_directory) / f"{execution_id}"
if not os.path.exists(directory):
os.makedirs(directory)
data.to_csv(raw_data_path_local)
return FlyteFile(path=raw_data_path_local.__str__(),
remote_path=raw_data_path_remote
)
damp-lion-88352
10/04/2024, 2:51 AMs3 path
directlydamp-lion-88352
10/04/2024, 2:51 AMdamp-lion-88352
10/04/2024, 2:51 AMstr
worksdamp-lion-88352
10/04/2024, 2:52 AMdamp-lion-88352
10/04/2024, 2:58 AM@task(enable_deck=True, container_image=custom_image)
def t_deck() -> FlyteFile:
return "<s3://my-s3-bucket/example.txt>"
return FlyteFile("<s3://my-s3-bucket/example.txt>")
@task(enable_deck=True, container_image=custom_image)
def t_read_file(file: FlyteFile) -> str:
with open(file, "r") as f:
return f.read()
@workflow
def wf() -> str:
ff = t_deck()
return t_read_file(file=ff)
damp-lion-88352
10/04/2024, 3:04 AMdamp-lion-88352
10/04/2024, 3:06 AM