https://flyte.org logo
#ask-the-community
Title
# ask-the-community
e

Eric Peter Wairagala

03/21/2024, 9:23 AM
I get this error
tar: experiments/__init__.py: Cannot open: File exists
tar: experiments/workflows/__init__.py: Cannot open: File exists
tar: experiments/workflows/workflow.py: Cannot open: File exists
tar: experiments/workflows: Cannot utime: Operation not permitted
tar: experiments: Cannot utime: Operation not permitted
tar: Exiting with failure status due to previous errors
╭───────────────────── Traceback (most recent call last) ──────────────────────╮
│ /usr/local/bin/pyflyte-fast-execute:8 in <module>                            │
│                                                                              │
│ ❱ 8 │   sys.exit(fast_execute_task_cmd())                                    │
│                                                                              │
│ /usr/local/lib/python3.8/site-packages/click/core.py:1130 in __call__        │
│                                                                              │
│ ❱ 1130 │   │   return self.main(*args, **kwargs)                             │
│                                                                              │
│ /usr/local/lib/python3.8/site-packages/click/core.py:1055 in main            │
│                                                                              │
│ ❱ 1055 │   │   │   │   │   rv = self.invoke(ctx)                             │
│                                                                              │
│ /usr/local/lib/python3.8/site-packages/click/core.py:1404 in invoke          │
│                                                                              │
│ ❱ 1404 │   │   │   return ctx.invoke(self.callback, **ctx.params)            │
│                                                                              │
│ /usr/local/lib/python3.8/site-packages/click/core.py:760 in invoke           │
│                                                                              │
│ ❱  760 │   │   │   │   return __callback(*args, **kwargs)                    │
│                                                                              │
│ /usr/local/lib/python3.8/site-packages/flytekit/bin/entrypoint.py:497 in     │
│ fast_execute_task_cmd                                                        │
│                                                                              │
│ ❱ 497 │   │   _download_distribution(additional_distribution, dest_dir)      │
│                                                                              │
│ /usr/local/lib/python3.8/site-packages/flytekit/core/utils.py:295 in wrapper │
│ │ when i run my workflow remote .
l

L godlike

03/21/2024, 9:25 AM
Can you provide the whole code?
e

Eric Peter Wairagala

03/21/2024, 9:27 AM
ok let me share here
from flytekit import task, workflow,ImageSpec
import logging
import os
import requests
from typing import Any, Dict, Optional,Tuple
import os
from experiments.utils.loading import load_data
from experiments.configs.config import settings
from huggingface_hub import HfApi
from transformers import AutoTokenizer
from experiments.utils.training import train
from datasets import load_dataset,Dataset,DatasetDict
import wandb
# Setting up logging
logger = logging.getLogger(__name__)
logger.setLevel(<http://logging.INFO|logging.INFO>)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
#Setting up the custom image for running this task on Flyte
custom_image = ImageSpec(
name="flytekit",
base_image="<http://ghcr.io/flyteorg/flytekit:py3.8-1.6.0|ghcr.io/flyteorg/flytekit:py3.8-1.6.0>",
registry="<http://us-east1-docker.pkg.dev/|us-east1-docker.pkg.dev/><project>/flyte",
platform = "linux/amd64",
source_root=".",
packages=['flytekit', 'pydantic-settings',
'datasets', 'huggingface-hub',
'wandb', 'flytekitplugins-envd',
'flytekitplugins-pod', 'flytekitplugins-deck-standard',
'transformers', 'matplotlib','torch','accelerate','scikit-learn']
)
# Instantiate the HfApi class
hf_api = HfApi(token=settings.HF_API_KEY)
wandb.login(key=settings.WANDB_API_KEY)
# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained(settings.MODEL_CHECKPOINTS)
# Preprocess function to tokenize data and return a dictionary with input_ids and attention_masks as keys
def preprocess_function(examples):
return tokenizer(examples["text"], truncation=True, padding=True, max_length=512)
# Convert textual labels to integers
def convert_label(label) -> int:
"""Convert textual labels to integers."""
label_map = {'positive': 1, 'negative': 0, 'neutral': 2}
return label_map[label]
#Task to load or the datasets from the Hugging Face Hub
@task(cache=True, cache_version="1.0", retries=1,container_image=custom_image)  # Assuming this is a decorator from a specific library you are using
def loading_data(dataset_name:str)->Any:
dataset = load_data(dataset_name=dataset_name, logger=logger)
<http://logger.info|logger.info>("Dataset loaded and labels converted successfully.")
return dataset
@task(cache=True, cache_version="1", retries=3, container_image=custom_image)
def preprocess(dataset: Dataset) -> Any:
"""
Preprocesses the data for training.
Returns:
The dataset object.
"""
# Function to convert labels for a batch of examples
def convert_labels_batch(examples):
return {'labels': [convert_label(label) for label in examples['final_label']]}
# Convert labels in the train, test, and validation sets and preprocess
train = dataset['train'].map(convert_labels_batch, batched=True)
test = dataset['test'].map(convert_labels_batch, batched=True)
dev = dataset['validation'].map(convert_labels_batch, batched=True)
tokenized_train = train.map(preprocess_function, batched=True)
tokenized_test = test.map(preprocess_function, batched=True)
# tokenized_dev = dev.map(preprocess_function, batched=True)
response = {"train": tokenized_train, "dev": tokenized_test}
<http://logger.info|logger.info>(tokenized_train)
return response
@task(cache=True, cache_version="1", retries=3, container_image=custom_image)
def sentiment_analysis(tokenized_train: Any,
tokenized_dev: Any,
settings: Any,
tokenizer: AutoTokenizer) -> str:
'''
Trains the model and returns the path to the saved model.
Args:
settings: The settings configuration.
`The dataset configuration object from
preprocess_data
.`
Returns:
model_path (str): Path to the saved model.
'''
wandb.init(project=settings.WANDB_PROJECT)
trainer = train(tokenizer=tokenizer,
tokenized_train=tokenized_train,
tokenized_test=tokenized_dev,
num_labels=settings.NUM_LABELS,
model_name=settings.MODEL_CHECKPOINTS,
repo_name=settings.REPO_NAME,
learning_rate=settings.LEARNING_RATE,
per_device_train_batch_size=settings.PER_DEVICE_TRAIN_BATCH_SIZE,
weight_decay=settings.WEIGHT_DECAY,
per_device_eval_batch_size=settings.PER_DEVICE_EVAL_BATCH_SIZE,
num_train_epochs=settings.EPOCHS)
# Save the model and return the path
model_path = os.path.join(settings.REPO_NAME, "saved_model")
trainer.save_model(model_path)
wandb.finish()
return model_path
@task(cache=True, cache_version="1", retries=3,container_image=custom_image)
def post_training_tasks(discord_webhook_url:str,model:str) -> None:
"""
Performs post-training tasks.
Args:
settings: The settings configuration.
model_path (str): Path to the trained model.
"""
# Example post-training task: sending a notification
url = ""
task = ""
<http://requests.post|requests.post>(discord_webhook_url, json={"content":
f":tada: *Training Complete!* :rocket:\n\n:white_check_mark: {task} Model training has successfully finished.\n:bar_chart:
Check out the results and performance metrics in the dashboard {url}."})
@workflow
def sentiment_workflow(dataset:str=settings.DATA_NAME):
"""
Executes the training workflow.
Args:
settings: The settings configuration.
"""
loaded_data = loading_data(dataset_name=dataset)
processed_data = preprocess(dataset=loaded_data)
sentiment = sentiment_analysis(tokenized_train=processed_data['train'],
tokenized_dev=processed_data['dev'],
settings=settings,
tokenizer=tokenizer)
post_training_tasks(discord_webhook_url=settings.DISCORD_WEBHOOK_URL,model=sentiment)
if __name__ == "__main__":
<http://logger.info|logger.info>(f"Running {__file__} main...")
<http://logger.info|logger.info>(sentiment_workflow())
and this is how am running it
poetry run pyflyte run --remote  -d development experiments/workflows/workflow.py sentiment_workflow
Is there something wrong with the code
l

L godlike

03/21/2024, 9:47 AM
working on other issues, will take a look when available
thx!
e

Eric Peter Wairagala

03/21/2024, 9:48 AM
ok cool
Hi anything
l

L godlike

03/25/2024, 2:33 PM
I'm not pretty sure what's going on, maybe the data is not correct?
4 Views