Unverified Commit 85125fcf authored by Rafał Jankowski's avatar Rafał Jankowski Committed by GitHub
Browse files

Neptune.ai integration improvements (#18934)



* NeptuneCallback improvements

* After review suggestions and deduplication of initial run

* Added volatile checkpoints support due to missing post-rebase commit

* Update README per review comments

- Remove list formatting
- Correct Neptune docs link
Co-authored-by: default avatarSabine <sabine.nyholm@neptune.ai>
parent e6f221c8
...@@ -32,6 +32,7 @@ By default a [`Trainer`] will use the following callbacks: ...@@ -32,6 +32,7 @@ By default a [`Trainer`] will use the following callbacks:
- [`~integrations.WandbCallback`] if [wandb](https://www.wandb.com/) is installed. - [`~integrations.WandbCallback`] if [wandb](https://www.wandb.com/) is installed.
- [`~integrations.CometCallback`] if [comet_ml](https://www.comet.ml/site/) is installed. - [`~integrations.CometCallback`] if [comet_ml](https://www.comet.ml/site/) is installed.
- [`~integrations.MLflowCallback`] if [mlflow](https://www.mlflow.org/) is installed. - [`~integrations.MLflowCallback`] if [mlflow](https://www.mlflow.org/) is installed.
- [`~integrations.NeptuneCallback`] if [neptune](https://neptune.ai/) is installed.
- [`~integrations.AzureMLCallback`] if [azureml-sdk](https://pypi.org/project/azureml-sdk/) is - [`~integrations.AzureMLCallback`] if [azureml-sdk](https://pypi.org/project/azureml-sdk/) is
installed. installed.
- [`~integrations.CodeCarbonCallback`] if [codecarbon](https://pypi.org/project/codecarbon/) is - [`~integrations.CodeCarbonCallback`] if [codecarbon](https://pypi.org/project/codecarbon/) is
...@@ -70,6 +71,8 @@ Here is the list of the available [`TrainerCallback`] in the library: ...@@ -70,6 +71,8 @@ Here is the list of the available [`TrainerCallback`] in the library:
[[autodoc]] integrations.CodeCarbonCallback [[autodoc]] integrations.CodeCarbonCallback
[[autodoc]] integrations.NeptuneCallback
## TrainerCallback ## TrainerCallback
[[autodoc]] TrainerCallback [[autodoc]] TrainerCallback
......
...@@ -198,6 +198,7 @@ You can easily log and monitor your runs code. The following are currently suppo ...@@ -198,6 +198,7 @@ You can easily log and monitor your runs code. The following are currently suppo
* [TensorBoard](https://www.tensorflow.org/tensorboard) * [TensorBoard](https://www.tensorflow.org/tensorboard)
* [Weights & Biases](https://docs.wandb.ai/integrations/huggingface) * [Weights & Biases](https://docs.wandb.ai/integrations/huggingface)
* [Comet ML](https://www.comet.ml/docs/python-sdk/huggingface/) * [Comet ML](https://www.comet.ml/docs/python-sdk/huggingface/)
* [Neptune](https://docs.neptune.ai/integrations-and-supported-tools/model-training/hugging-face)
### Weights & Biases ### Weights & Biases
...@@ -251,3 +252,86 @@ or if in a Conda environment: ...@@ -251,3 +252,86 @@ or if in a Conda environment:
```bash ```bash
conda install -c comet_ml -c anaconda -c conda-forge comet_ml conda install -c comet_ml -c anaconda -c conda-forge comet_ml
``` ```
### Neptune
First, install the Neptune client library. You can do it with either `pip` or `conda`:
`pip`:
```bash
pip install neptune-client
```
`conda`:
```bash
conda install -c conda-forge neptune-client
```
Next, in your model training script, import `NeptuneCallback`:
```python
from transformers.integrations import NeptuneCallback
```
To enable Neptune logging, in your `TrainingArguments`, set the `report_to` argument to `"neptune"`:
```python
training_args = TrainingArguments(
"quick-training-distilbert-mrpc",
evaluation_strategy="steps",
eval_steps = 20,
report_to = "neptune",
)
trainer = Trainer(
model,
training_args,
...
)
```
Alternatively, for more logging options, create a Neptune callback:
```python
neptune_callback = NeptuneCallback()
```
To add more detail to the tracked run, you can supply optional arguments to `NeptuneCallback`.
Some examples:
```python
neptune_callback = NeptuneCallback(
name = "DistilBERT",
description = "DistilBERT fine-tuned on GLUE/MRPC",
tags = ["args-callback", "fine-tune", "MRPC"], # tags help you manage runs in Neptune
base_namespace="callback", # the default is "finetuning"
log_checkpoints = "best", # other options are "last", "same", and None
capture_hardware_metrics = False, # additional keyword arguments for a Neptune run
)
```
Pass the callback to the Trainer:
```python
training_args = TrainingArguments(..., report_to = None)
trainer = Trainer(
model,
training_args,
...
callbacks=[neptune_callback],
)
```
Now, when you start the training with `trainer.train()`, your metadata will be logged in Neptune.
**Note:** Although you can pass your **Neptune API token** and **project name** as arguments when creating the callback, the recommended way is to save them as environment variables:
| Environment variable | Value |
| :------------------- | :--------------------------------------------------- |
| `NEPTUNE_API_TOKEN` | Your Neptune API token. To find and copy it, click your Neptune avatar and select **Get your API token**. |
| `NEPTUNE_PROJECT` | The full name of your Neptune project (`workspace-name/project-name`). To find and copy it, head to **project settings** &rarr; **Properties**. |
For detailed instructions and examples, see the [Neptune docs](https://docs.neptune.ai/integrations-and-supported-tools/model-training/hugging-face).
...@@ -100,6 +100,7 @@ _import_structure = { ...@@ -100,6 +100,7 @@ _import_structure = {
"hf_argparser": ["HfArgumentParser"], "hf_argparser": ["HfArgumentParser"],
"integrations": [ "integrations": [
"is_comet_available", "is_comet_available",
"is_neptune_available",
"is_optuna_available", "is_optuna_available",
"is_ray_available", "is_ray_available",
"is_ray_tune_available", "is_ray_tune_available",
...@@ -2986,6 +2987,7 @@ if TYPE_CHECKING: ...@@ -2986,6 +2987,7 @@ if TYPE_CHECKING:
# Integrations # Integrations
from .integrations import ( from .integrations import (
is_comet_available, is_comet_available,
is_neptune_available,
is_optuna_available, is_optuna_available,
is_ray_available, is_ray_available,
is_ray_tune_available, is_ray_tune_available,
......
...@@ -19,10 +19,15 @@ import importlib.util ...@@ -19,10 +19,15 @@ import importlib.util
import json import json
import numbers import numbers
import os import os
import shutil
import sys import sys
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Dict, Optional
import numpy as np
from . import __version__ as version
from .utils import flatten_dict, is_datasets_available, logging from .utils import flatten_dict, is_datasets_available, logging
...@@ -44,6 +49,10 @@ if _has_comet: ...@@ -44,6 +49,10 @@ if _has_comet:
except (ImportError, ValueError): except (ImportError, ValueError):
_has_comet = False _has_comet = False
_has_neptune = importlib.util.find_spec("neptune") is not None
if TYPE_CHECKING and _has_neptune:
from neptune.new.metadata_containers.run import Run
from .trainer_callback import ProgressCallback, TrainerCallback # noqa: E402 from .trainer_callback import ProgressCallback, TrainerCallback # noqa: E402
from .trainer_utils import PREFIX_CHECKPOINT_DIR, BestRun, IntervalStrategy # noqa: E402 from .trainer_utils import PREFIX_CHECKPOINT_DIR, BestRun, IntervalStrategy # noqa: E402
from .utils import ENV_VARS_TRUE_VALUES, is_torch_tpu_available # noqa: E402 from .utils import ENV_VARS_TRUE_VALUES, is_torch_tpu_available # noqa: E402
...@@ -106,7 +115,7 @@ def is_fairscale_available(): ...@@ -106,7 +115,7 @@ def is_fairscale_available():
def is_neptune_available(): def is_neptune_available():
return importlib.util.find_spec("neptune") is not None return _has_neptune
def is_codecarbon_available(): def is_codecarbon_available():
...@@ -449,6 +458,8 @@ def get_available_reporting_integrations(): ...@@ -449,6 +458,8 @@ def get_available_reporting_integrations():
integrations.append("comet_ml") integrations.append("comet_ml")
if is_mlflow_available(): if is_mlflow_available():
integrations.append("mlflow") integrations.append("mlflow")
if is_neptune_available():
integrations.append("neptune")
if is_tensorboard_available(): if is_tensorboard_available():
integrations.append("tensorboard") integrations.append("tensorboard")
if is_wandb_available(): if is_wandb_available():
...@@ -925,75 +936,276 @@ class MLflowCallback(TrainerCallback): ...@@ -925,75 +936,276 @@ class MLflowCallback(TrainerCallback):
self._ml_flow.end_run() self._ml_flow.end_run()
class NeptuneCallback(TrainerCallback): class NeptuneMissingConfiguration(Exception):
def __init__(self):
super().__init__(
""" """
A [`TrainerCallback`] that sends the logs to [Neptune](https://neptune.ai). ------ Unsupported ---- We were not able to create new runs. You provided a custom Neptune run to
`NeptuneCallback` with the `run` argument. For the integration to work fully, provide your `api_token` and
`project` by saving them as environment variables or passing them to the callback.
""" """
)
def __init__(self):
class NeptuneCallback(TrainerCallback):
"""TrainerCallback that sends the logs to [Neptune](https://neptune.ai).
Args:
api_token (`str`, optional):
Neptune API token obtained upon registration. You can leave this argument out if you have saved your token
to the `NEPTUNE_API_TOKEN` environment variable (strongly recommended). See full setup instructions in the
[docs](https://docs.neptune.ai/getting-started/installation).
project (`str`, optional):
Name of an existing Neptune project, in the form: "workspace-name/project-name". You can find and copy the
name from the project Settings -> Properties in Neptune. If None (default), the value of the
`NEPTUNE_PROJECT` environment variable will be used.
name (`str`, optional): Custom name for the run.
base_namespace (`str`, optional, defaults to "finetuning"): In the Neptune run, the root namespace
that will contain all of the logged metadata.
log_parameters (`bool`, optional, defaults to True):
If True, logs all Trainer arguments and model parameters provided by the Trainer.
log_checkpoints (`str`, optional, defaults to None):
If "same", uploads checkpoints whenever they are saved by the Trainer. If "last", uploads only the most
recently saved checkpoint. If "best", uploads the best checkpoint (among the ones saved by the Trainer). If
None, does not upload checkpoints.
run (`Run`, optional):
Pass a Neptune run object if you want to continue logging to an existing run. Read more about resuming runs
in the [docs](https://docs.neptune.ai/how-to-guides/neptune-api/resume-run).
**neptune_run_kwargs (optional):
Additional keyword arguments to be passed directly to the
[neptune.init_run()](https://docs.neptune.ai/api-reference/neptune#.init_run) function when a new run is
created.
"""
integration_version_key = "source_code/integrations/transformers"
model_parameters_key = "model_parameters"
trial_name_key = "trial"
trial_params_key = "trial_params"
trainer_parameters_key = "trainer_parameters"
flat_metrics = {"train/epoch"}
def __init__(
self,
*,
api_token: Optional[str] = None,
project: Optional[str] = None,
name: Optional[str] = None,
base_namespace: str = "finetuning",
run: Optional["Run"] = None,
log_parameters: bool = True,
log_checkpoints: Optional[str] = None,
**neptune_run_kwargs
):
if not is_neptune_available(): if not is_neptune_available():
raise ValueError( raise ValueError(
"NeptuneCallback requires neptune-client to be installed. Run `pip install neptune-client`." "NeptuneCallback requires the Neptune client library to be installed. "
"To install the library, run `pip install neptune-client`."
) )
import neptune.new as neptune
self._neptune = neptune from neptune.new.metadata_containers.run import Run
self._initialized = False
self._log_artifacts = False
def setup(self, args, state, model): try:
""" from neptune.new.integrations.utils import verify_type
Setup the Neptune integration. except ImportError:
from neptune.new.internal.utils import verify_type
verify_type("api_token", api_token, (str, type(None)))
verify_type("project", project, (str, type(None)))
verify_type("name", name, (str, type(None)))
verify_type("base_namespace", base_namespace, str)
verify_type("run", run, (Run, type(None)))
verify_type("log_parameters", log_parameters, bool)
verify_type("log_checkpoints", log_checkpoints, (str, type(None)))
self._base_namespace_path = base_namespace
self._log_parameters = log_parameters
self._log_checkpoints = log_checkpoints
self._initial_run: Optional[Run] = run
self._run = None
self._is_monitoring_run = False
self._run_id = None
self._force_reset_monitoring_run = False
self._init_run_kwargs = {"api_token": api_token, "project": project, "name": name, **neptune_run_kwargs}
self._volatile_checkpoints_dir = None
self._should_upload_checkpoint = self._log_checkpoints is not None
self._recent_checkpoint_path = None
if self._log_checkpoints in {"last", "best"}:
self._target_checkpoints_namespace = f"checkpoints/{self._log_checkpoints}"
self._should_clean_recently_uploaded_checkpoint = True
else:
self._target_checkpoints_namespace = "checkpoints"
self._should_clean_recently_uploaded_checkpoint = False
Environment: def _stop_run_if_exists(self):
NEPTUNE_PROJECT (`str`, *required*): if self._run:
The project ID for neptune.ai account. Should be in format *workspace_name/project_name* self._run.stop()
NEPTUNE_API_TOKEN (`str`, *required*): del self._run
API-token for neptune.ai account self._run = None
NEPTUNE_CONNECTION_MODE (`str`, *optional*):
Neptune connection mode. *async* by default def _initialize_run(self, **additional_neptune_kwargs):
NEPTUNE_RUN_NAME (`str`, *optional*): from neptune.new import init_run
The name of run process on Neptune dashboard from neptune.new.exceptions import NeptuneMissingApiTokenException, NeptuneMissingProjectNameException
"""
if state.is_world_process_zero: self._stop_run_if_exists()
self._neptune_run = self._neptune.init(
project=os.getenv("NEPTUNE_PROJECT"), try:
api_token=os.getenv("NEPTUNE_API_TOKEN"), self._run = init_run(**self._init_run_kwargs, **additional_neptune_kwargs)
mode=os.getenv("NEPTUNE_CONNECTION_MODE", "async"), self._run_id = self._run["sys/id"].fetch()
name=os.getenv("NEPTUNE_RUN_NAME", None), except (NeptuneMissingProjectNameException, NeptuneMissingApiTokenException) as e:
run=os.getenv("NEPTUNE_RUN_ID", None), raise NeptuneMissingConfiguration() from e
def _use_initial_run(self):
self._run = self._initial_run
self._is_monitoring_run = True
self._run_id = self._run["sys/id"].fetch()
self._initial_run = None
def _ensure_run_with_monitoring(self):
if self._initial_run is not None:
self._use_initial_run()
else:
if not self._force_reset_monitoring_run and self._is_monitoring_run:
return
if self._run and not self._is_monitoring_run and not self._force_reset_monitoring_run:
self._initialize_run(run=self._run_id)
self._is_monitoring_run = True
else:
self._initialize_run()
self._force_reset_monitoring_run = False
def _ensure_at_least_run_without_monitoring(self):
if self._initial_run is not None:
self._use_initial_run()
else:
if not self._run:
self._initialize_run(
run=self._run_id,
capture_stdout=False,
capture_stderr=False,
capture_hardware_metrics=False,
capture_traceback=False,
) )
combined_dict = args.to_dict() self._is_monitoring_run = False
if hasattr(model, "config") and model.config is not None:
model_config = model.config.to_dict() @property
combined_dict = {**model_config, **combined_dict} def run(self):
self._neptune_run["parameters"] = combined_dict if self._run is None:
self._initialized = True self._ensure_at_least_run_without_monitoring()
return self._run
@property
def _metadata_namespace(self):
return self.run[self._base_namespace_path]
def _log_integration_version(self):
self.run[NeptuneCallback.integration_version_key] = version
def _log_trainer_parameters(self, args):
self._metadata_namespace[NeptuneCallback.trainer_parameters_key] = args.to_sanitized_dict()
def _log_model_parameters(self, model):
if model and hasattr(model, "config") and model.config is not None:
self._metadata_namespace[NeptuneCallback.model_parameters_key] = model.config.to_dict()
def _log_hyper_param_search_parameters(self, state):
if state and hasattr(state, "trial_name"):
self._metadata_namespace[NeptuneCallback.trial_name_key] = state.trial_name
if state and hasattr(state, "trial_params") and state.trial_params is not None:
self._metadata_namespace[NeptuneCallback.trial_params_key] = state.trial_params
def _log_model_checkpoint(self, source_directory: str, checkpoint: str):
target_path = relative_path = os.path.join(source_directory, checkpoint)
if self._volatile_checkpoints_dir is not None:
consistent_checkpoint_path = os.path.join(self._volatile_checkpoints_dir, checkpoint)
try:
shutil.copytree(relative_path, os.path.join(consistent_checkpoint_path, relative_path))
target_path = consistent_checkpoint_path
except IOError as e:
logger.warning(
"NeptuneCallback was unable to made a copy of checkpoint due to I/O exception: '{}'."
"Could fail trying to upload.".format(e)
)
self._metadata_namespace[self._target_checkpoints_namespace].upload_files(target_path)
if self._should_clean_recently_uploaded_checkpoint and self._recent_checkpoint_path is not None:
self._metadata_namespace[self._target_checkpoints_namespace].delete_files(self._recent_checkpoint_path)
self._recent_checkpoint_path = relative_path
def on_init_end(self, args, state, control, **kwargs):
self._volatile_checkpoints_dir = None
if self._log_checkpoints and (args.overwrite_output_dir or args.save_total_limit is not None):
self._volatile_checkpoints_dir = tempfile.TemporaryDirectory().name
if self._log_checkpoints == "best" and not args.load_best_model_at_end:
raise ValueError("To save the best model checkpoint, the load_best_model_at_end argument must be enabled.")
def on_train_begin(self, args, state, control, model=None, **kwargs): def on_train_begin(self, args, state, control, model=None, **kwargs):
if not self._initialized: if not state.is_world_process_zero:
self.setup(args, state, model) return
def on_log(self, args, state, control, logs, model=None, **kwargs): self._ensure_run_with_monitoring()
if not self._initialized: self._force_reset_monitoring_run = True
self.setup(args, state, model)
if state.is_world_process_zero: self._log_integration_version()
for k, v in logs.items(): if self._log_parameters:
self._neptune_run[k].log(v, step=state.global_step) self._log_trainer_parameters(args)
self._log_model_parameters(model)
if state.is_hyper_param_search:
self._log_hyper_param_search_parameters(state)
def on_train_end(self, args, state, control, **kwargs):
self._stop_run_if_exists()
def __del__(self): def __del__(self):
""" if self._volatile_checkpoints_dir is not None:
Environment: shutil.rmtree(self._volatile_checkpoints_dir, ignore_errors=True)
NEPTUNE_STOP_TIMEOUT (`int`, *optional*):
Number of seconsds to wait for all Neptune.ai tracking calls to finish, before stopping the tracked self._stop_run_if_exists()
run. If not set it will wait for all tracking calls to finish.
""" def on_save(self, args, state, control, **kwargs):
try: if self._should_upload_checkpoint:
stop_timeout = os.getenv("NEPTUNE_STOP_TIMEOUT") self._log_model_checkpoint(args.output_dir, f"checkpoint-{state.global_step}")
stop_timeout = int(stop_timeout) if stop_timeout else None
self._neptune_run.stop(seconds=stop_timeout) def on_evaluate(self, args, state, control, metrics=None, **kwargs):
except AttributeError: if self._log_checkpoints == "best":
pass best_metric_name = args.metric_for_best_model
if not best_metric_name.startswith("eval_"):
best_metric_name = f"eval_{best_metric_name}"
metric_value = metrics.get(best_metric_name)
operator = np.greater if args.greater_is_better else np.less
self._should_upload_checkpoint = state.best_metric is None or operator(metric_value, state.best_metric)
@classmethod
def get_run(cls, trainer):
for callback in trainer.callback_handler.callbacks:
if isinstance(callback, cls):
return callback.run
raise Exception("The trainer doesn't have a NeptuneCallback configured.")
def on_log(self, args, state, control, logs: Optional[Dict[str, float]] = None, **kwargs):
if not state.is_world_process_zero:
return
if logs is not None:
for name, value in rewrite_logs(logs).items():
if isinstance(value, (int, float)):
if name in NeptuneCallback.flat_metrics:
self._metadata_namespace[name] = value
else:
self._metadata_namespace[name].log(value, step=state.global_step)
class CodeCarbonCallback(TrainerCallback): class CodeCarbonCallback(TrainerCallback):
......
...@@ -414,8 +414,8 @@ class TrainingArguments: ...@@ -414,8 +414,8 @@ class TrainingArguments:
instance of `Dataset`. instance of `Dataset`.
report_to (`str` or `List[str]`, *optional*, defaults to `"all"`): report_to (`str` or `List[str]`, *optional*, defaults to `"all"`):
The list of integrations to report the results and logs to. Supported platforms are `"azure_ml"`, The list of integrations to report the results and logs to. Supported platforms are `"azure_ml"`,
`"comet_ml"`, `"mlflow"`, `"tensorboard"` and `"wandb"`. Use `"all"` to report to all integrations `"comet_ml"`, `"mlflow"`, `"neptune"`, `"tensorboard"` and `"wandb"`. Use `"all"` to report to all
installed, `"none"` for no integrations. integrations installed, `"none"` for no integrations.
ddp_find_unused_parameters (`bool`, *optional*): ddp_find_unused_parameters (`bool`, *optional*):
When using distributed training, the value of the flag `find_unused_parameters` passed to When using distributed training, the value of the flag `find_unused_parameters` passed to
`DistributedDataParallel`. Will default to `False` if gradient checkpointing is used, `True` otherwise. `DistributedDataParallel`. Will default to `False` if gradient checkpointing is used, `True` otherwise.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment