Unverified Commit baf1daa5 authored by Sylvain Gugger's avatar Sylvain Gugger Committed by GitHub
Browse files

Migrate Trainer from `Repository` to `upload_folder` (#25095)



* First draft

* Deal with progress bars

* Update src/transformers/utils/hub.py
Co-authored-by: default avatarLucain <lucainp@gmail.com>

* Address review comments

* Forgot one

* Pin hf_hub

* Add argument for push all and fix tests

* Fix tests

* Address review comments

---------
Co-authored-by: default avatarLucain <lucainp@gmail.com>
parent c177606f
......@@ -120,7 +120,7 @@ _deps = [
"fugashi>=1.0",
"GitPython<3.1.19",
"hf-doc-builder>=0.3.0",
"huggingface-hub>=0.14.1,<1.0",
"huggingface-hub>=0.15.1,<1.0",
"importlib_metadata",
"ipadic>=1.0.0,<2.0",
"isort>=5.5.4",
......
......@@ -25,7 +25,7 @@ deps = {
"fugashi": "fugashi>=1.0",
"GitPython": "GitPython<3.1.19",
"hf-doc-builder": "hf-doc-builder>=0.3.0",
"huggingface-hub": "huggingface-hub>=0.14.1,<1.0",
"huggingface-hub": "huggingface-hub>=0.15.1,<1.0",
"importlib_metadata": "importlib_metadata",
"ipadic": "ipadic>=1.0.0,<2.0",
"isort": "isort>=5.5.4",
......
......@@ -44,10 +44,11 @@ from .integrations import (
# isort: on
import huggingface_hub.utils as hf_hub_utils
import numpy as np
import torch
import torch.distributed as dist
from huggingface_hub import Repository, create_repo
from huggingface_hub import Repository, create_repo, upload_folder
from packaging import version
from torch import nn
from torch.utils.data import DataLoader, Dataset, RandomSampler, SequentialSampler
......@@ -127,6 +128,7 @@ from .utils import (
SAFE_WEIGHTS_NAME,
WEIGHTS_INDEX_NAME,
WEIGHTS_NAME,
PushInProgress,
can_return_loss,
find_labels,
is_accelerate_available,
......@@ -548,15 +550,10 @@ class Trainer:
# Will be set to True by `self._setup_loggers()` on first call to `self.log()`.
self._loggers_initialized = False
# Create clone of distant repo and output directory if needed
# Create distant repo and output directory if needed
self.hub_model_id = None
if self.args.push_to_hub:
self.init_git_repo(at_init=True)
# In case of pull, we need to make sure every process has the latest.
if is_torch_tpu_available():
xm.rendezvous("init git repo")
elif args.parallel_mode == ParallelMode.DISTRIBUTED:
dist.barrier()
self.init_hf_repo()
if self.args.should_save:
os.makedirs(self.args.output_dir, exist_ok=True)
......@@ -1531,6 +1528,19 @@ class Trainer:
inner_training_loop = find_executable_batch_size(
self._inner_training_loop, self._train_batch_size, args.auto_find_batch_size
)
if args.push_to_hub:
try:
# Disable progress bars when uploading models during checkpoints to avoid polluting stdout
hf_hub_utils.disable_progress_bars()
return inner_training_loop(
args=args,
resume_from_checkpoint=resume_from_checkpoint,
trial=trial,
ignore_keys_for_eval=ignore_keys_for_eval,
)
finally:
hf_hub_utils.enable_progress_bars()
else:
return inner_training_loop(
args=args,
resume_from_checkpoint=resume_from_checkpoint,
......@@ -1968,6 +1978,9 @@ class Trainer:
self.control = self.callback_handler.on_train_end(args, self.state, self.control)
# Wait for the checkpoint to be uploaded.
self._finish_current_push()
return TrainOutput(self.state.global_step, train_loss, metrics)
def _get_output_dir(self, trial):
......@@ -3386,16 +3399,43 @@ class Trainer:
else:
return 0
def init_hf_repo(self):
"""
Initializes a git repo in `self.args.hub_model_id`.
"""
# Only on process zero
if not self.is_world_process_zero():
return
if self.args.hub_model_id is None:
repo_name = Path(self.args.output_dir).absolute().name
else:
repo_name = self.args.hub_model_id
repo_url = create_repo(repo_name, token=self.args.hub_token, private=self.args.hub_private_repo, exist_ok=True)
self.hub_model_id = repo_url.repo_id
self.push_in_progress = None
def init_git_repo(self, at_init: bool = False):
"""
Initializes a git repo in `self.args.hub_model_id`.
<Tip warning={true}>
This function is deprecated and will be removed in v4.34.0 of Transformers.
</Tip>
Args:
at_init (`bool`, *optional*, defaults to `False`):
Whether this function is called before any training or not. If `self.args.overwrite_output_dir` is
`True` and `at_init` is `True`, the path to the repo (which is `self.args.output_dir`) might be wiped
out.
"""
warnings.warn(
"`Trainer.init_git_repo` is deprecated and will be removed in v4.34.0 of Transformers. Use "
"`Trainer.init_hf_repo` instead."
)
if not self.is_world_process_zero():
return
......@@ -3493,8 +3533,8 @@ class Trainer:
# Only push from one node.
if not self.is_world_process_zero() or self.args.hub_strategy == HubStrategy.END:
return
# If we haven't finished the last push, we don't do this one.
if self.push_in_progress is not None and not self.push_in_progress.is_done:
# If we haven't finished the last push, we don't do this one unless args.hub_always_push=True.
if not self.args.hub_always_push and self.push_in_progress is not None and not self.push_in_progress.is_done():
return
output_dir = self.args.output_dir
......@@ -3511,34 +3551,51 @@ class Trainer:
# Same for the training arguments
torch.save(self.args, os.path.join(output_dir, TRAINING_ARGS_NAME))
try:
if self.args.hub_strategy == HubStrategy.CHECKPOINT:
# Temporarily move the checkpoint just saved for the push
tmp_checkpoint = os.path.join(output_dir, "last-checkpoint")
# We have to remove the "last-checkpoint" dir if it exists, otherwise the checkpoint is moved as a
# subfolder.
if os.path.isdir(tmp_checkpoint):
shutil.rmtree(tmp_checkpoint)
shutil.move(checkpoint_folder, tmp_checkpoint)
if self.args.save_strategy == IntervalStrategy.STEPS:
commit_message = f"Training in progress, step {self.state.global_step}"
else:
commit_message = f"Training in progress, epoch {int(self.state.epoch)}"
push_work = self.repo.push_to_hub(commit_message=commit_message, blocking=False, auto_lfs_prune=True)
# Return type of `Repository.push_to_hub` is either None or a tuple.
if push_work is not None:
self.push_in_progress = push_work[1]
except Exception as e:
logger.error(f"Error when pushing to hub: {e}")
finally:
if self.args.hub_strategy == HubStrategy.CHECKPOINT:
# Move back the checkpoint to its place
shutil.move(tmp_checkpoint, checkpoint_folder)
model_push_job = upload_folder(
repo_id=self.hub_model_id,
folder_path=output_dir,
commit_message=commit_message,
token=self.args.hub_token,
run_as_future=True,
ignore_patterns=["_*", "**/*"],
)
push_jobs = [model_push_job]
if self.args.hub_strategy in [HubStrategy.CHECKPOINT, HubStrategy.ALL_CHECKPOINTS]:
path_in_repo = (
"last-checkpoint" if self.args.hub_strategy == HubStrategy.CHECKPOINT else Path(checkpoint_folder).name
)
checkpoint_push = upload_folder(
repo_id=self.hub_model_id,
folder_path=checkpoint_folder,
path_in_repo=path_in_repo,
commit_message=commit_message + ", checkpoint",
token=self.args.hub_token,
run_as_future=True,
)
push_jobs.append(checkpoint_push)
if self.push_in_progress is None or self.push_in_progress.is_done():
self.push_in_progress = PushInProgress(push_jobs)
else:
self.push_in_progress.jobs.extend(push_jobs)
def _finish_current_push(self):
if not hasattr(self, "push_in_progress"):
return
if self.push_in_progress is not None and not self.push_in_progress.is_done():
logger.info("Waiting for the current checkpoint push to be finished, this might take a couple of minutes.")
self.push_in_progress.wait_until_done()
def push_to_hub(self, commit_message: Optional[str] = "End of training", blocking: bool = True, **kwargs) -> str:
"""
Upload *self.model* and *self.tokenizer* to the 🤗 model hub on the repo *self.args.hub_model_id*.
Upload `self.model` and `self.tokenizer` to the 🤗 model hub on the repo `self.args.hub_model_id`.
Parameters:
commit_message (`str`, *optional*, defaults to `"End of training"`):
......@@ -3549,14 +3606,9 @@ class Trainer:
Additional keyword arguments passed along to [`~Trainer.create_model_card`].
Returns:
The url of the commit of your model in the given repository if `blocking=False`, a tuple with the url of
the commit and an object to track the progress of the commit if `blocking=True`
The URL of the repository where the model was pushed if `blocking=False`, or a `Future` object tracking the
progress of the commit if `blocking=True`.
"""
# If a user calls manually `push_to_hub` with `self.args.push_to_hub = False`, we try to create the repo but
# it might fail.
if not hasattr(self, "repo"):
self.init_git_repo()
model_name = kwargs.pop("model_name", None)
if model_name is None and self.args.should_save:
if self.args.hub_model_id is None:
......@@ -3564,6 +3616,10 @@ class Trainer:
else:
model_name = self.args.hub_model_id.split("/")[-1]
# In case the user calls this method with args.push_to_hub = False
if self.hub_model_id is None:
self.init_hf_repo()
# Needs to be executed on all processes for TPU training, but will only save on the processed determined by
# self.args.should_save.
self.save_model(_internal_call=True)
......@@ -3572,25 +3628,19 @@ class Trainer:
if not self.is_world_process_zero():
return
# Cancel any async push in progress if blocking=True. The commits will all be pushed together.
if blocking and self.push_in_progress is not None and not self.push_in_progress.is_done:
self.push_in_progress._process.kill()
self.push_in_progress = None
git_head_commit_url = self.repo.push_to_hub(
commit_message=commit_message, blocking=blocking, auto_lfs_prune=True
)
# push separately the model card to be independant from the rest of the model
if self.args.should_save:
self.create_model_card(model_name=model_name, **kwargs)
try:
self.repo.push_to_hub(
commit_message="update model card README.md", blocking=blocking, auto_lfs_prune=True
)
except EnvironmentError as exc:
logger.error(f"Error pushing update to the model card. Please read logs and retry.\n${exc}")
return git_head_commit_url
# Wait for the current upload to be finished.
self._finish_current_push()
return upload_folder(
repo_id=self.hub_model_id,
folder_path=self.args.output_dir,
commit_message=commit_message,
token=self.args.hub_token,
run_as_future=not blocking,
ignore_patterns=["_*", "**/*"],
)
#
# Deprecated code
......
......@@ -568,6 +568,8 @@ class TrainingArguments:
`huggingface-cli login`.
hub_private_repo (`bool`, *optional*, defaults to `False`):
If True, the Hub repo will be set to private.
hub_always_push (`bool`, *optional*, defaults to `False`):
Unless this is `True`, the `Trainer` will skip pushing a checkpoint when the previous push is not finished.
gradient_checkpointing (`bool`, *optional*, defaults to `False`):
If True, use gradient checkpointing to save memory at the expense of slower backward pass.
include_inputs_for_metrics (`bool`, *optional*, defaults to `False`):
......@@ -1110,6 +1112,10 @@ class TrainingArguments:
)
hub_token: Optional[str] = field(default=None, metadata={"help": "The token to use to push to the Model Hub."})
hub_private_repo: bool = field(default=False, metadata={"help": "Whether the model repository is private or not."})
hub_always_push: bool = field(
default=False,
metadata={"help": "Unless `True`, the Trainer will skip pushes if the previous one wasn't finished yet."},
)
gradient_checkpointing: bool = field(
default=False,
metadata={
......@@ -2367,6 +2373,7 @@ class TrainingArguments:
strategy: Union[str, HubStrategy] = "every_save",
token: Optional[str] = None,
private_repo: bool = False,
always_push: bool = False,
):
"""
A method that regroups all arguments linked to synchronizing checkpoints with the Hub.
......@@ -2407,6 +2414,9 @@ class TrainingArguments:
with `huggingface-cli login`.
private_repo (`bool`, *optional*, defaults to `False`):
If True, the Hub repo will be set to private.
always_push (`bool`, *optional*, defaults to `False`):
Unless this is `True`, the `Trainer` will skip pushing a checkpoint when the previous push is not
finished.
Example:
......@@ -2424,6 +2434,7 @@ class TrainingArguments:
self.hub_strategy = HubStrategy(strategy)
self.hub_token = token
self.hub_private_repo = private_repo
self.hub_always_push = always_push
return self
def set_optimizer(
......
......@@ -70,6 +70,7 @@ from .hub import (
TRANSFORMERS_CACHE,
TRANSFORMERS_DYNAMIC_MODULE_NAME,
EntryNotFoundError,
PushInProgress,
PushToHubMixin,
RepositoryNotFoundError,
RevisionNotFoundError,
......
......@@ -22,6 +22,7 @@ import sys
import tempfile
import traceback
import warnings
from concurrent import futures
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
......@@ -1177,6 +1178,29 @@ def move_cache(cache_dir=None, new_cache_dir=None, token=None):
)
class PushInProgress:
"""
Internal class to keep track of a push in progress (which might contain multiple `Future` jobs).
"""
def __init__(self, jobs: Optional[futures.Future] = None) -> None:
self.jobs = [] if jobs is None else jobs
def is_done(self):
return all(job.done() for job in self.jobs)
def wait_until_done(self):
futures.wait(self.jobs)
def cancel(self) -> None:
self.jobs = [
job
for job in self.jobs
# Cancel the job if it wasn't started yet and remove cancelled/done jobs from the list
if not (job.cancel() or job.done())
]
cache_version_file = os.path.join(TRANSFORMERS_CACHE, "version.txt")
if not os.path.isfile(cache_version_file):
cache_version = 0
......
......@@ -23,14 +23,13 @@ import re
import subprocess
import sys
import tempfile
import time
import unittest
from itertools import product
from pathlib import Path
from unittest.mock import Mock, patch
import numpy as np
from huggingface_hub import HfFolder, Repository, delete_repo
from huggingface_hub import HfFolder, delete_repo, list_repo_commits
from parameterized import parameterized
from requests.exceptions import HTTPError
......@@ -2226,21 +2225,17 @@ class TrainerIntegrationWithHubTester(unittest.TestCase):
output_dir=os.path.join(tmp_dir, "test-trainer-epoch"),
push_to_hub=True,
hub_token=self._token,
# To avoid any flakiness if the training goes faster than the uploads.
hub_always_push=True,
save_strategy="epoch",
)
trainer.train()
# Wait for the async pushes to be finished
while trainer.push_in_progress is not None and not trainer.push_in_progress.is_done:
time.sleep(0.5)
with tempfile.TemporaryDirectory() as tmp_dir:
_ = Repository(tmp_dir, clone_from=f"{USER}/test-trainer-epoch", token=self._token)
commits = self.get_commit_history(tmp_dir)
commits = list_repo_commits(f"{USER}/test-trainer-epoch", token=self._token)
commits = [c.title for c in commits]
self.assertIn("initial commit", commits)
# We can't test that epoch 2 and 3 are in the commits without being flaky as those might be skipped if
# the push for epoch 1 wasn't finished at the time.
self.assertIn("Training in progress, epoch 1", commits)
for i in range(1, 4):
self.assertIn(f"Training in progress, epoch {i}", commits)
def test_push_to_hub_with_saves_each_n_steps(self):
num_gpus = max(1, get_gpu_count())
......@@ -2252,22 +2247,21 @@ class TrainerIntegrationWithHubTester(unittest.TestCase):
output_dir=os.path.join(tmp_dir, "test-trainer-step"),
push_to_hub=True,
hub_token=self._token,
# To avoid any flakiness if the training goes faster than the uploads.
hub_always_push=True,
save_strategy="steps",
save_steps=5,
)
trainer.train()
# Wait for the async pushes to be finished
while trainer.push_in_progress is not None and not trainer.push_in_progress.is_done:
time.sleep(0.5)
with tempfile.TemporaryDirectory() as tmp_dir:
_ = Repository(tmp_dir, clone_from=f"{USER}/test-trainer-step", token=self._token)
commits = self.get_commit_history(tmp_dir)
commits = list_repo_commits(f"{USER}/test-trainer-step", token=self._token)
commits = [c.title for c in commits]
self.assertIn("initial commit", commits)
# We can't test that epoch 2 and 3 are in the commits without being flaky as those might be skipped if
# the push for epoch 1 wasn't finished at the time.
self.assertIn("Training in progress, step 5", commits)
# max_steps depend on the number of available GPUs
max_steps = math.ceil(trainer.args.num_train_epochs * len(trainer.get_train_dataloader()))
for i in range(5, max_steps, 5):
self.assertIn(f"Training in progress, step {i}", commits)
@require_torch
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment