stale-megabyte-99751
01/10/2024, 2:42 PMcannot unpack non-iterable VoidPromise object
_import_ logging
_import_ os
_from_ typing _import_ Tuple
_from_ flytekit _import_ current_context, task, Resources, workflow
_from_ datasets _import_ load_dataset
_from_ transformers _import_ AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
_from_ transformers.integrations _import_ FlyteCallback
_import_ numpy _as_ np
_import_ evaluate
logger = logging.getLogger(__file__)
model_name = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
metric = evaluate.load("accuracy")
def tokenize_function(examples):
_return_ tokenizer(examples["text"], padding="max_length", truncation=True)
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
_return_ metric.compute(predictions=predictions, references=labels)
@task
def get_and_tokenize_datasets():
dataset = load_dataset("yelp_review_full")
tokenized_datasets = dataset.map(tokenize_function, batched=True)
small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(1000))
small_eval_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(1000))
_return_ small_train_dataset, small_eval_dataset
@task(requests=Resources(gpu="1"), disable_deck=False, interruptible=False)
def train_hf_transformer(train_dataset, eval_dataset):
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=5)
cp = current_context().checkpoint
training_args = TrainingArguments(
"trainer",
save_strategy="epoch",
num_train_epochs=5,
evaluation_strategy="epoch",
train_dataset=train_dataset,
eval_dataset=eval_dataset,
compute_metrics=compute_metrics,
)
trainer = Trainer(model,training_args, callbacks=[FlyteCallback()])
trainer.train(resume_from_checkpoint=cp.restore())
path = os.path.join(current_context().working_directory, "example_hf_flyte_model")
trainer.save_model(path)
_return_ path
@workflow
def wf() -> Tuple[str, int]:
train_dataset, eval_dataset = get_and_tokenize_datasets()
path = train_hf_transformer(train_dataset=train_dataset, eval_dataset=eval_dataset)
_return_ path
_if_ __name__ == "__main__":
wf()
tall-lock-23197
stale-megabyte-99751
01/10/2024, 8:13 PMstale-megabyte-99751
01/11/2024, 11:29 AM"Traceback (most recent call last):\n\n File \"/usr/local/lib/python3.10/dist-packages/flytekit/exceptions/scopes.py\", line 219, in user_entry_point\n return wrapped(*args, **kwargs)\n File \"/root/workflows/hf_example.py\", line 58, in wf\n train_dataset, eval_dataset = get_and_tokenize_datasets()\n\nMessage:\n\n cannot unpack non-iterable VoidPromise object\n\nUser error."}
Any pointers @tall-lock-23197 ?
_import_ logging
_import_ os
_from_ typing _import_ Tuple, Dict, List
_from_ flytekit _import_ current_context, task, Resources, workflow
_from_ datasets _import_ load_dataset, Dataset
_from_ transformers _import_ AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
_from_ transformers.integrations _import_ FlyteCallback
_import_ numpy _as_ np
_import_ evaluate
logger = logging.getLogger(__file__)
model_name = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
metric = evaluate.load("accuracy")
def tokenize_function(examples: Dict[str, List[int]]) -> Dict[str, List[List[int]]]:
_return_ tokenizer(examples["text"], padding="max_length", truncation=True)
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
_return_ metric.compute(predictions=predictions, references=labels)
@task
def get_and_tokenize_datasets() -> Tuple[Dataset, Dataset]:
dataset = load_dataset("yelp_review_full")
tokenized_datasets = dataset.map(tokenize_function, batched=True)
small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(1000))
small_eval_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(1000))
_return_ small_train_dataset, small_eval_dataset
@task(requests=Resources(gpu="1"), disable_deck=False, interruptible=False)
def train_hf_transformer(train_dataset: Dataset, eval_dataset: Dataset) -> str:
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=5)
cp = current_context().checkpoint
training_args = TrainingArguments(
"trainer",
save_strategy="epoch",
num_train_epochs=5,
evaluation_strategy="epoch",
train_dataset=train_dataset,
eval_dataset=eval_dataset,
compute_metrics=compute_metrics,
)
trainer = Trainer(model,training_args, callbacks=[FlyteCallback()])
trainer.train(resume_from_checkpoint=cp.restore())
path = os.path.join(current_context().working_directory, "example_hf_flyte_model")
trainer.save_model(path)
_return_ path
@workflow
def wf() -> str:
train_dataset, eval_dataset = get_and_tokenize_datasets()
path = train_hf_transformer(train_dataset=train_dataset, eval_dataset=eval_dataset)
_return_ path
_if_ __name__ == "__main__":
wf()
tall-lock-23197
from flytekit import task, workflow
from typing import Tuple
from datasets import Dataset, load_dataset
from transformers import AutoTokenizer
model_name = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
def tokenize_function(examples: dict[str, list[int]]) -> dict[str, list[list[int]]]:
return tokenizer(examples["text"], padding="max_length", truncation=True)
@task
def get_and_tokenize_datasets() -> Tuple[Dataset, Dataset]:
dataset = load_dataset("yelp_review_full")
tokenized_datasets = dataset.map(tokenize_function, batched=True)
small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(1000))
small_eval_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(1000))
return small_train_dataset, small_eval_dataset
@workflow
def wf() -> Tuple[Dataset, Dataset]:
return get_and_tokenize_datasets()
and it didn't result in any error. what's the flytekit version you installed and how are you executing the code?stale-megabyte-99751
01/11/2024, 2:13 PMpyflyte register
and ran with
_from_ flytekit.configuration _import_ Config
_from_ flytekit.remote _import_ FlyteRemote
remote = FlyteRemote(
config=Config.auto(),
default_project="foo",
default_domain="development",
)
hf_wf = remote.fetch_task(name="workflows.<http://hf_example.wf|hf_example.wf>")
execution = remote.execute(hf_wf, inputs={})
stale-megabyte-99751
01/11/2024, 2:22 PMstale-megabyte-99751
01/11/2024, 2:35 PMFROM <http://nvcr.io/nvidia/pytorch:23.10-py3|nvcr.io/nvidia/pytorch:23.10-py3>
WORKDIR /root
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root
COPY requirements.txt /root
RUN pip install -r /root/requirements.txt
COPY . /root
# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
ARG tag
ENV FLYTE_INTERNAL_IMAGE $tag
stale-megabyte-99751
01/11/2024, 2:43 PMpyflyte --pkgs workflows package
with flytectl register files
gives the same errorstale-megabyte-99751
01/11/2024, 2:45 PMWARNING {"asctime": "2024-01-11 16:41:05,397", "name": "flytekit", "levelname": "WARNING", "message": "Unsupported Type <class 'datasets.arrow_dataset.Dataset'> found, Flyte will default to use PickleFile as the transport. Pickle can only be used to send objects between the exact same version of Python, and we strongly recommend to use python type that flyte support."}
However, I would assume the tasks run with the same python version since they use the same imagefreezing-airport-6809
tall-lock-23197
pyflyte run <your-python-file> <workflow>
to run the workflow i shared with you? does that work?tall-lock-23197
import logging
import os
from typing import Tuple, Dict, List
from flytekit import current_context, task, Resources, workflow
from datasets import load_dataset, Dataset
from transformers import AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
from transformers.integrations import FlyteCallback
import numpy as np
import evaluate
logger = logging.getLogger(__file__)
model_name = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
metric = evaluate.load("accuracy")
def tokenize_function(examples: Dict[str, List[int]]) -> Dict[str, List[List[int]]]:
return tokenizer(examples["text"], padding="max_length", truncation=True)
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
return metric.compute(predictions=predictions, references=labels)
@task
def get_and_tokenize_datasets() -> Tuple[Dataset, Dataset]:
dataset = load_dataset("yelp_review_full")
tokenized_datasets = dataset.map(tokenize_function, batched=True)
small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(1000))
small_eval_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(1000))
return small_train_dataset, small_eval_dataset
@task(requests=Resources(gpu="1"), disable_deck=False, interruptible=False)
def train_hf_transformer(train_dataset: Dataset, eval_dataset: Dataset) -> str:
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=5)
cp = current_context().checkpoint
training_args = TrainingArguments(
"trainer",
save_strategy="epoch",
num_train_epochs=5,
evaluation_strategy="epoch",
)
trainer = Trainer(
model,
training_args,
callbacks=[FlyteCallback()],
train_dataset=train_dataset,
eval_dataset=eval_dataset,
compute_metrics=compute_metrics,
)
trainer.train(resume_from_checkpoint=cp.restore())
path = os.path.join(current_context().working_directory, "example_hf_flyte_model")
trainer.save_model(path)
return path
@workflow
def wf() -> str:
train_dataset, eval_dataset = get_and_tokenize_datasets()
path = train_hf_transformer(train_dataset=train_dataset, eval_dataset=eval_dataset)
return path
if __name__ == "__main__":
wf()
works for me. i don't see any void promise error.stale-megabyte-99751
01/11/2024, 7:06 PMpyflyte run hf_example.py wf
gives Running Execution on local
and seems to run fine, but I need to run the workload on our kubernetes cluster. If I run the following with pyflyte run
it gives the same VoidPromise error
_from_ flytekit.configuration _import_ Config
_from_ flytekit.remote _import_ FlyteRemote
remote = FlyteRemote(
config=Config.auto(),
default_project="foo",
default_domain="development",
)
hf_wf = remote.fetch_task(name="workflows.<http://hf_example.wf|hf_example.wf>")
execution = remote.execute(hf_wf, inputs={})
tall-lock-23197
pyflyte run --remote hf_example.py wf
command and check if that's working? the first task runs successfully when i trigger the execution. we'll look at flyte remote later. also, install flytekitplugins-huggingface so that dataset is not pickled.stale-megabyte-99751
01/12/2024, 9:36 AMstale-megabyte-99751
01/12/2024, 9:36 AMtall-lock-23197
stale-megabyte-99751
01/12/2024, 9:40 AMstale-megabyte-99751
01/12/2024, 9:41 AMtall-lock-23197
remote.terminate(...)
stale-megabyte-99751
01/12/2024, 10:40 AMstale-megabyte-99751
01/12/2024, 10:42 AMpyflyte run --remote
will finish successfully now. How can we debug pyflyte register
and remote.execute
combo?tall-lock-23197
FlyteFile(path)
as the return value and the type as FlyteFile
.
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote
remote = FlyteRemote(
config=Config.auto(),
default_project="foo",
default_domain="development",
)
hf_wf = remote.fetch_task(name="<http://workflows.hf_example.wf|workflows.hf_example.wf>")
execution = remote.execute(hf_wf, inputs={})
^^ you aren't fetching a task, correct? can you modify it to remote.fetch_launch_plan(name="...")
tall-lock-23197
stale-megabyte-99751
01/12/2024, 11:02 AM@task(requests=Resources(gpu="1"), disable_deck=False, interruptible=False)
def train_hf_transformer(train_dataset: Dataset, eval_dataset: Dataset) -> FlyteFile:
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=5)
cp = current_context().checkpoint
training_args = TrainingArguments(
"trainer",
save_strategy="epoch",
num_train_epochs=5,
evaluation_strategy="epoch",
train_dataset=train_dataset,
eval_dataset=eval_dataset,
compute_metrics=compute_metrics,
resume_from_checkpoint=cp.restore(),
output_dir=cp.checkpoint_dir,
overwrite_output_dir=True,
)
trainer = Trainer(model,training_args, callbacks=[FlyteCallback()])
trainer.train()
path = os.path.join(current_context().working_directory, "example_hf_flyte_model")
trainer.save_model(path)
_return_ FlyteFile(path)
tall-lock-23197
stale-megabyte-99751
01/12/2024, 11:51 AM'SyncCheckpoint' object has no attribute 'checkpoint_dir'.
Will try current_context().working_directory
as output_dir
tall-lock-23197
current_context().checkpoint
.stale-megabyte-99751
01/12/2024, 11:58 AMoutput_dir=cp
?tall-lock-23197