Help appreciated with a basic Huggingface example....
# ask-the-community
a
Help appreciated with a basic Huggingface example. The following gives
cannot unpack non-iterable VoidPromise object
_import_ logging
_import_ os
_from_ typing _import_ Tuple
_from_ flytekit _import_ current_context, task, Resources, workflow
_from_ datasets _import_ load_dataset
_from_ transformers _import_ AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
_from_ transformers.integrations _import_ FlyteCallback
_import_ numpy _as_ np
_import_ evaluate
logger = logging.getLogger(__file__)
model_name = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
metric = evaluate.load("accuracy")
def tokenize_function(examples):
_return_ tokenizer(examples["text"], padding="max_length", truncation=True)
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
_return_ metric.compute(predictions=predictions, references=labels)
@task
def get_and_tokenize_datasets():
dataset = load_dataset("yelp_review_full")
tokenized_datasets = dataset.map(tokenize_function, batched=True)
small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(1000))
small_eval_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(1000))
_return_ small_train_dataset, small_eval_dataset
@task(requests=Resources(gpu="1"), disable_deck=False, interruptible=False)
def train_hf_transformer(train_dataset, eval_dataset):
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=5)
cp = current_context().checkpoint
training_args = TrainingArguments(
"trainer",
save_strategy="epoch",
num_train_epochs=5,
evaluation_strategy="epoch",
train_dataset=train_dataset,
eval_dataset=eval_dataset,
compute_metrics=compute_metrics,
)
trainer = Trainer(model,training_args, callbacks=[FlyteCallback()])
trainer.train(resume_from_checkpoint=cp.restore())
path = os.path.join(current_context().working_directory, "example_hf_flyte_model")
trainer.save_model(path)
_return_ path
@workflow
def wf() -> Tuple[str, int]:
train_dataset, eval_dataset = get_and_tokenize_datasets()
path = train_hf_transformer(train_dataset=train_dataset, eval_dataset=eval_dataset)
_return_ path
_if_ __name__ == "__main__":
wf()
s
hey @avesuni, it seems like you haven't annotated the inputs and outputs of flyte tasks with types. have you reviewed the flyte fundamentals guide?
a
Many thanks @Samhita Alla. That makes sense
Still getting the same error:
"Traceback (most recent call last):\n\n      File \"/usr/local/lib/python3.10/dist-packages/flytekit/exceptions/scopes.py\", line 219, in user_entry_point\n        return wrapped(*args, **kwargs)\n      File \"/root/workflows/hf_example.py\", line 58, in wf\n        train_dataset, eval_dataset = get_and_tokenize_datasets()\n\nMessage:\n\n    cannot unpack non-iterable VoidPromise object\n\nUser error."}
Any pointers @Samhita Alla ?
_import_ logging
_import_ os
_from_ typing _import_ Tuple, Dict, List
_from_ flytekit _import_ current_context, task, Resources, workflow
_from_ datasets _import_ load_dataset, Dataset
_from_ transformers _import_ AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
_from_ transformers.integrations _import_ FlyteCallback
_import_ numpy _as_ np
_import_ evaluate
logger = logging.getLogger(__file__)
model_name = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
metric = evaluate.load("accuracy")
def tokenize_function(examples: Dict[str, List[int]]) -> Dict[str, List[List[int]]]:
_return_ tokenizer(examples["text"], padding="max_length", truncation=True)
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
_return_ metric.compute(predictions=predictions, references=labels)
@task
def get_and_tokenize_datasets() -> Tuple[Dataset, Dataset]:
dataset = load_dataset("yelp_review_full")
tokenized_datasets = dataset.map(tokenize_function, batched=True)
small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(1000))
small_eval_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(1000))
_return_ small_train_dataset, small_eval_dataset
@task(requests=Resources(gpu="1"), disable_deck=False, interruptible=False)
def train_hf_transformer(train_dataset: Dataset, eval_dataset: Dataset) -> str:
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=5)
cp = current_context().checkpoint
training_args = TrainingArguments(
"trainer",
save_strategy="epoch",
num_train_epochs=5,
evaluation_strategy="epoch",
train_dataset=train_dataset,
eval_dataset=eval_dataset,
compute_metrics=compute_metrics,
)
trainer = Trainer(model,training_args, callbacks=[FlyteCallback()])
trainer.train(resume_from_checkpoint=cp.restore())
path = os.path.join(current_context().working_directory, "example_hf_flyte_model")
trainer.save_model(path)
_return_ path
@workflow
def wf() -> str:
train_dataset, eval_dataset = get_and_tokenize_datasets()
path = train_hf_transformer(train_dataset=train_dataset, eval_dataset=eval_dataset)
_return_ path
_if_ __name__ == "__main__":
wf()
s
@avesuni i ran the following workflow:
Copy code
from flytekit import task, workflow
from typing import Tuple

from datasets import Dataset, load_dataset
from transformers import AutoTokenizer

model_name = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)


def tokenize_function(examples: dict[str, list[int]]) -> dict[str, list[list[int]]]:
    return tokenizer(examples["text"], padding="max_length", truncation=True)


@task
def get_and_tokenize_datasets() -> Tuple[Dataset, Dataset]:
    dataset = load_dataset("yelp_review_full")
    tokenized_datasets = dataset.map(tokenize_function, batched=True)
    small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(1000))
    small_eval_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(1000))
    return small_train_dataset, small_eval_dataset


@workflow
def wf() -> Tuple[Dataset, Dataset]:
    return get_and_tokenize_datasets()
and it didn't result in any error. what's the flytekit version you installed and how are you executing the code?
a
I registered with
pyflyte register
and ran with
_from_ flytekit.configuration _import_ Config
_from_ flytekit.remote _import_ FlyteRemote
remote = FlyteRemote(
config=Config.auto(),
default_project="foo",
default_domain="development",
)
hf_wf = remote.fetch_task(name="workflows.<http://hf_example.wf|hf_example.wf>")
execution = remote.execute(hf_wf, inputs={})
flytekit version is 1.10.2. @Samhita Alla Do you mind trying the code without changes?
Trying your version of the script gives the exact same error. Here is my Dockerfile
FROM <http://nvcr.io/nvidia/pytorch:23.10-py3|nvcr.io/nvidia/pytorch:23.10-py3>
WORKDIR /root
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root
COPY requirements.txt /root
RUN pip install -r /root/requirements.txt
COPY . /root
# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
ARG tag
ENV FLYTE_INTERNAL_IMAGE $tag
Also
pyflyte --pkgs workflows package
with
flytectl register files
gives the same error
I also see
WARNING  {"asctime": "2024-01-11 16:41:05,397", "name": "flytekit", "levelname": "WARNING", "message": "Unsupported Type <class 'datasets.arrow_dataset.Dataset'> found, Flyte will default to use PickleFile as the transport. Pickle can only be used to send objects between the exact same version of Python, and we strongly recommend to use python type that flyte support."}
However, I would assume the tasks run with the same python version since they use the same image
k
Cc @Niels Bantilan
s
you can ignore the warning. you need to install flytekitplugins-huggingface library to use the pre-defined dataset type transformer. can you try running the command:
pyflyte run <your-python-file> <workflow>
to run the workflow i shared with you? does that work?
Copy code
import logging
import os
from typing import Tuple, Dict, List
from flytekit import current_context, task, Resources, workflow
from datasets import load_dataset, Dataset
from transformers import AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
from transformers.integrations import FlyteCallback
import numpy as np
import evaluate

logger = logging.getLogger(__file__)
model_name = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
metric = evaluate.load("accuracy")


def tokenize_function(examples: Dict[str, List[int]]) -> Dict[str, List[List[int]]]:
    return tokenizer(examples["text"], padding="max_length", truncation=True)


def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)


@task
def get_and_tokenize_datasets() -> Tuple[Dataset, Dataset]:
    dataset = load_dataset("yelp_review_full")
    tokenized_datasets = dataset.map(tokenize_function, batched=True)
    small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(1000))
    small_eval_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(1000))
    return small_train_dataset, small_eval_dataset


@task(requests=Resources(gpu="1"), disable_deck=False, interruptible=False)
def train_hf_transformer(train_dataset: Dataset, eval_dataset: Dataset) -> str:
    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=5)
    cp = current_context().checkpoint
    training_args = TrainingArguments(
        "trainer",
        save_strategy="epoch",
        num_train_epochs=5,
        evaluation_strategy="epoch",
    )
    trainer = Trainer(
        model,
        training_args,
        callbacks=[FlyteCallback()],
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        compute_metrics=compute_metrics,
    )
    trainer.train(resume_from_checkpoint=cp.restore())
    path = os.path.join(current_context().working_directory, "example_hf_flyte_model")
    trainer.save_model(path)
    return path


@workflow
def wf() -> str:
    train_dataset, eval_dataset = get_and_tokenize_datasets()
    path = train_hf_transformer(train_dataset=train_dataset, eval_dataset=eval_dataset)
    return path


if __name__ == "__main__":
    wf()
works for me. i don't see any void promise error.
a
pyflyte run hf_example.py wf
gives
Running Execution on local
and seems to run fine, but I need to run the workload on our kubernetes cluster. If I run the following with
pyflyte run
it gives the same
VoidPromise error
_from_ flytekit.configuration _import_ Config
_from_ flytekit.remote _import_ FlyteRemote
remote = FlyteRemote(
config=Config.auto(),
default_project="foo",
default_domain="development",
)
hf_wf = remote.fetch_task(name="workflows.<http://hf_example.wf|hf_example.wf>")
execution = remote.execute(hf_wf, inputs={})
s
why don't you first trigger the execution on the flyte cluster using
pyflyte run --remote hf_example.py wf
command and check if that's working? the first task runs successfully when i trigger the execution. we'll look at flyte remote later. also, install flytekitplugins-huggingface so that dataset is not pickled.
a
The job seems to run, but doesn't get past the first step. I guess the GPU annotation is wrong for GCP and it gets stuck in waiting for GPU node
How do I delete or cancel a running workflow?
s
you should be able to terminate it on the UI
a
Thanks @Samhita Alla
Is that the only way? We have one blind developer in the team. UI is not going to work for him.
s
you can use flytekit remote to terminate an execution: https://github.com/flyteorg/flytekit/blob/ced0daf2c970cadd3008311b48eb4b0bd9261b7e/flytekit/remote/remote.py#L1860-L1866; you just have to run
remote.terminate(...)
a
Nice, thanks!
I have a good feeling that
pyflyte run --remote
will finish successfully now. How can we debug
pyflyte register
and
remote.execute
combo?
s
okay. just wanted to mention that you aren't writing to a checkpoint within a flyte task. you may want to do that, right? also, you may want to return a FlyteFile from the second task, so that the local file is uploaded to the blob storage. you need to include
FlyteFile(path)
as the return value and the type as
FlyteFile
.
Copy code
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote

remote = FlyteRemote(
    config=Config.auto(),
    default_project="foo",
    default_domain="development",
    )

hf_wf = remote.fetch_task(name="<http://workflows.hf_example.wf|workflows.hf_example.wf>")
execution = remote.execute(hf_wf, inputs={})
^^ you aren't fetching a task, correct? can you modify it to
remote.fetch_launch_plan(name="...")
an FYI — name of the default launch plan is the same as the workflow.
a
Thanks @Samhita Alla. Would the following work and write/resume to/from checkpoints?
@task(requests=Resources(gpu="1"), disable_deck=False, interruptible=False)
def train_hf_transformer(train_dataset: Dataset, eval_dataset: Dataset) -> FlyteFile:
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=5)
cp = current_context().checkpoint
training_args = TrainingArguments(
"trainer",
save_strategy="epoch",
num_train_epochs=5,
evaluation_strategy="epoch",
train_dataset=train_dataset,
eval_dataset=eval_dataset,
compute_metrics=compute_metrics,
resume_from_checkpoint=cp.restore(),
output_dir=cp.checkpoint_dir,
overwrite_output_dir=True,
)
trainer = Trainer(model,training_args, callbacks=[FlyteCallback()])
trainer.train()
path = os.path.join(current_context().working_directory, "example_hf_flyte_model")
trainer.save_model(path)
_return_ FlyteFile(path)
s
you need to save and restore: https://github.com/flyteorg/flytekit/blob/ced0daf2c970cadd3008311b48eb4b0bd9261b7e/flytekit/core/checkpointer.py#L92-L141. hopefully, the interface is clear enough; let me know if there's anything unclear to you. yet to document these methods, apologies. would you mind creating a docs issue for this?
a
I get
'SyncCheckpoint' object has no attribute 'checkpoint_dir'.
Will try
current_context().working_directory
as
output_dir
s
i don't think you need to initialize synccheckpoint because that's what you get when you run
current_context().checkpoint
.
a
OK, do you mean I should simply use
output_dir=cp
?
s
looks like read and write should just work because they use save and restore methods under the hood: https://docs.flyte.org/en/latest/flytesnacks/examples/advanced_composition/checkpoint.html#intratask-checkpoints you need to first check if the checkpoint exists, and thereby read it and use the same. you also need to write to it as the execution progresses. makes sense?