finetune.py 15.9 KB
Newer Older
1
2
3
4
5
import argparse
import glob
import logging
import os
import time
6
from collections import defaultdict
7
8
from pathlib import Path
from typing import Dict, List, Tuple
9

10
11
import numpy as np
import pytorch_lightning as pl
12
13
14
import torch
from torch.utils.data import DataLoader

15
from lightning_base import BaseTransformer, add_generic_args, generic_train
16
from transformers import MarianTokenizer, MBartTokenizer, T5ForConditionalGeneration
17
18
19


try:
20
    from .utils import (
21
        assert_all_frozen,
22
23
24
25
26
        use_task_specific_params,
        lmap,
        flatten_list,
        pickle_save,
        save_git_info,
27
        save_json,
28
29
30
31
        freeze_params,
        calculate_rouge,
        get_git_info,
        ROUGE_KEYS,
32
        calculate_bleu_score,
33
        Seq2SeqDataset,
34
        TranslationDataset,
35
        label_smoothed_nll_loss,
36
    )
37

38
    from .callbacks import Seq2SeqLoggingCallback, get_checkpoint_callback, get_early_stopping_callback
39
except ImportError:
40
    from utils import (
41
        Seq2SeqDataset,
42
        TranslationDataset,
43
        assert_all_frozen,
44
45
46
47
48
        use_task_specific_params,
        lmap,
        flatten_list,
        pickle_save,
        save_git_info,
49
        save_json,
50
51
52
53
        freeze_params,
        calculate_rouge,
        get_git_info,
        ROUGE_KEYS,
54
        calculate_bleu_score,
55
        label_smoothed_nll_loss,
56
    )
57
    from callbacks import Seq2SeqLoggingCallback, get_checkpoint_callback, get_early_stopping_callback
58
59
60
61

logger = logging.getLogger(__name__)


62
63
64
class SummarizationModule(BaseTransformer):
    mode = "summarization"
    loss_names = ["loss"]
65
66
    metric_names = ROUGE_KEYS
    val_metric = "rouge2"
67

68
69
70
71
    def __init__(self, hparams, **kwargs):
        super().__init__(hparams, num_labels=None, mode=self.mode, **kwargs)
        use_task_specific_params(self.model, "summarization")
        save_git_info(self.hparams.output_dir)
72
        self.metrics_save_path = Path(self.output_dir) / "metrics.json"
73
        self.hparams_save_path = Path(self.output_dir) / "hparams.pkl"
74
        pickle_save(self.hparams, self.hparams_save_path)
75
        self.step_count = 0
76
        self.metrics = defaultdict(list)
77

78
79
80
        self.dataset_kwargs: dict = dict(
            data_dir=self.hparams.data_dir,
            max_source_length=self.hparams.max_source_length,
81
            prefix=self.model.config.prefix or "",
82
        )
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
        n_observations_per_split = {
            "train": self.hparams.n_train,
            "val": self.hparams.n_val,
            "test": self.hparams.n_test,
        }
        self.n_obs = {k: v if v >= 0 else None for k, v in n_observations_per_split.items()}

        self.target_lens = {
            "train": self.hparams.max_target_length,
            "val": self.hparams.val_max_target_length,
            "test": self.hparams.test_max_target_length,
        }
        assert self.target_lens["train"] <= self.target_lens["val"], f"target_lens: {self.target_lens}"
        assert self.target_lens["train"] <= self.target_lens["test"], f"target_lens: {self.target_lens}"

        if self.hparams.freeze_embeds:
            self.freeze_embeds()
        if self.hparams.freeze_encoder:
101
102
103
            freeze_params(self.model.get_encoder())
            assert_all_frozen(self.model.get_encoder())

104
        self.hparams.git_sha = get_git_info()["repo_sha"]
105
        self.num_workers = hparams.num_workers
106
        self.decoder_start_token_id = None
107
108
109
        if self.model.config.decoder_start_token_id is None and isinstance(self.tokenizer, MBartTokenizer):
            self.decoder_start_token_id = self.tokenizer.lang_code_to_id[hparams.tgt_lang]
            self.model.config.decoder_start_token_id = self.decoder_start_token_id
110
111
        if isinstance(self.tokenizer, MBartTokenizer) or isinstance(self.tokenizer, MarianTokenizer):
            self.dataset_class = TranslationDataset
112
113
        else:
            self.dataset_class = Seq2SeqDataset
114
115
116

    def freeze_embeds(self):
        """Freeze token embeddings and positional embeddings for bart, just token embeddings for t5."""
117
        try:
118
119
120
121
            freeze_params(self.model.model.shared)
            for d in [self.model.model.encoder, self.model.model.decoder]:
                freeze_params(d.embed_positions)
                freeze_params(d.embed_tokens)
122
        except AttributeError:
123
124
125
126
127
128
129
130
131
132
            freeze_params(self.model.shared)
            for d in [self.model.encoder, self.model.decoder]:
                freeze_params(d.embed_tokens)

    def forward(self, input_ids, **kwargs):
        return self.model(input_ids, **kwargs)

    def ids_to_clean_text(self, generated_ids: List[int]):
        gen_text = self.tokenizer.batch_decode(
            generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
133
        )
134
        return lmap(str.strip, gen_text)
135

136
    def _step(self, batch: dict) -> Tuple:
137
        pad_token_id = self.tokenizer.pad_token_id
138
        source_ids, source_mask, target_ids = batch["input_ids"], batch["attention_mask"], batch["decoder_input_ids"]
139
140
141
142
143
144
145
146

        if isinstance(self.model, T5ForConditionalGeneration):
            decoder_input_ids = self.model._shift_right(target_ids)
            lm_labels = target_ids
        else:
            decoder_input_ids = target_ids[:, :-1].contiguous()  # Why this line?
            lm_labels = target_ids[:, 1:].clone()  # why clone?

147
148
149
150
151
152
153
154
155
156
157
158
159
        outputs = self(source_ids, attention_mask=source_mask, decoder_input_ids=decoder_input_ids, use_cache=False)

        if self.hparams.label_smoothing == 0:
            # Same behavior as modeling_bart.py
            loss_fct = torch.nn.CrossEntropyLoss(ignore_index=pad_token_id)
            lm_logits = outputs[0]
            assert lm_logits.shape[-1] == self.model.config.vocab_size
            loss = loss_fct(lm_logits.view(-1, lm_logits.shape[-1]), lm_labels.view(-1))
        else:
            lprobs = torch.nn.functional.log_softmax(outputs[0], dim=-1)
            loss, nll_loss = label_smoothed_nll_loss(
                lprobs, lm_labels, self.hparams.label_smoothing, ignore_index=pad_token_id
            )
160
161
        return (loss,)

162
163
164
165
    @property
    def pad(self) -> int:
        return self.tokenizer.pad_token_id

166
167
    def training_step(self, batch, batch_idx) -> Dict:
        loss_tensors = self._step(batch)
168

169
        logs = {name: loss for name, loss in zip(self.loss_names, loss_tensors)}
170
171
        # tokens per batch
        logs["tpb"] = batch["input_ids"].ne(self.pad).sum() + batch["decoder_input_ids"].ne(self.pad).sum()
172
173
174
175
176
        return {"loss": loss_tensors[0], "log": logs}

    def validation_step(self, batch, batch_idx) -> Dict:
        return self._generative_step(batch)

177
    def validation_epoch_end(self, outputs, prefix="val") -> Dict:
178
179
180
        self.step_count += 1
        losses = {k: torch.stack([x[k] for x in outputs]).mean() for k in self.loss_names}
        loss = losses["loss"]
181
        rouges = {k: np.array([x[k] for x in outputs]).mean() for k in self.metric_names + ["gen_time", "gen_len"]}
182
        rouge_tensor: torch.FloatTensor = torch.tensor(rouges[self.val_metric]).type_as(loss)
183
184
185
186
187
188
        rouges.update({k: v.item() for k, v in losses.items()})
        losses.update(rouges)
        metrics = {f"{prefix}_avg_{k}": x for k, x in losses.items()}
        metrics["step_count"] = self.step_count
        self.save_metrics(metrics, prefix)  # writes to self.metrics_save_path
        preds = flatten_list([x["preds"] for x in outputs])
189
190
191
192
193
        return {"log": metrics, "preds": preds, f"{prefix}_loss": loss, f"{prefix}_{self.val_metric}": rouge_tensor}

    def save_metrics(self, latest_metrics, type_path) -> None:
        self.metrics[type_path].append(latest_metrics)
        save_json(self.metrics, self.metrics_save_path)
194

195
196
    def calc_generative_metrics(self, preds, target) -> Dict:
        return calculate_rouge(preds, target)
197

198
    def _generative_step(self, batch: dict) -> dict:
199
        t0 = time.time()
200
        generated_ids = self.model.generate(
201
202
            batch["input_ids"],
            attention_mask=batch["attention_mask"],
203
204
205
            use_cache=True,
            decoder_start_token_id=self.decoder_start_token_id,
        )
206
207
208
        gen_time = (time.time() - t0) / batch["input_ids"].shape[0]
        preds: List[str] = self.ids_to_clean_text(generated_ids)
        target: List[str] = self.ids_to_clean_text(batch["decoder_input_ids"])
209
210
        loss_tensors = self._step(batch)
        base_metrics = {name: loss for name, loss in zip(self.loss_names, loss_tensors)}
211
        rouge: Dict = self.calc_generative_metrics(preds, target)
212
        summ_len = np.mean(lmap(len, generated_ids))
213
        base_metrics.update(gen_time=gen_time, gen_len=summ_len, preds=preds, target=target, **rouge)
214
        return base_metrics
215

216
217
    def test_step(self, batch, batch_idx):
        return self._generative_step(batch)
218
219

    def test_epoch_end(self, outputs):
220
        return self.validation_epoch_end(outputs, prefix="test")
221

222
    def get_dataset(self, type_path) -> Seq2SeqDataset:
223
224
        n_obs = self.n_obs[type_path]
        max_target_length = self.target_lens[type_path]
225
        dataset = self.dataset_class(
226
227
228
229
230
231
232
233
            self.tokenizer,
            type_path=type_path,
            n_obs=n_obs,
            max_target_length=max_target_length,
            **self.dataset_kwargs,
        )
        return dataset

234
    def get_dataloader(self, type_path: str, batch_size: int, shuffle: bool = False) -> DataLoader:
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
        dataset = self.get_dataset(type_path)
        sampler = None
        if self.hparams.sortish_sampler and type_path == "train":
            assert self.hparams.gpus <= 1  # TODO: assert earlier
            sampler = dataset.make_sortish_sampler(batch_size)
            shuffle = False

        dataloader = DataLoader(
            dataset,
            batch_size=batch_size,
            collate_fn=dataset.collate_fn,
            shuffle=shuffle,
            num_workers=self.num_workers,
            sampler=sampler,
        )
250
251
252
        return dataloader

    def train_dataloader(self) -> DataLoader:
253
        dataloader = self.get_dataloader("train", batch_size=self.hparams.train_batch_size, shuffle=True)
254
255
        return dataloader

256
257
    def val_dataloader(self) -> DataLoader:
        return self.get_dataloader("val", batch_size=self.hparams.eval_batch_size)
258

259
260
    def test_dataloader(self) -> DataLoader:
        return self.get_dataloader("test", batch_size=self.hparams.eval_batch_size)
261
262
263
264

    @staticmethod
    def add_model_specific_args(parser, root_dir):
        BaseTransformer.add_model_specific_args(parser, root_dir)
265
        add_generic_args(parser, root_dir)
266
        parser.add_argument(
267
            "--max_source_length",
268
269
270
271
272
            default=1024,
            type=int,
            help="The maximum total input sequence length after tokenization. Sequences longer "
            "than this will be truncated, sequences shorter will be padded.",
        )
273
274
275
276
277
278
279
        parser.add_argument(
            "--max_target_length",
            default=56,
            type=int,
            help="The maximum total input sequence length after tokenization. Sequences longer "
            "than this will be truncated, sequences shorter will be padded.",
        )
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
        parser.add_argument(
            "--val_max_target_length",
            default=142,  # these defaults are optimized for CNNDM. For xsum, see README.md.
            type=int,
            help="The maximum total input sequence length after tokenization. Sequences longer "
            "than this will be truncated, sequences shorter will be padded.",
        )
        parser.add_argument(
            "--test_max_target_length",
            default=142,
            type=int,
            help="The maximum total input sequence length after tokenization. Sequences longer "
            "than this will be truncated, sequences shorter will be padded.",
        )
        parser.add_argument("--freeze_encoder", action="store_true")
        parser.add_argument("--freeze_embeds", action="store_true")
        parser.add_argument("--sortish_sampler", action="store_true", default=False)
297
        parser.add_argument("--logger_name", type=str, choices=["default", "wandb", "wandb_shared"], default="default")
298
299
300
        parser.add_argument("--n_train", type=int, default=-1, required=False, help="# examples. -1 means use all.")
        parser.add_argument("--n_val", type=int, default=500, required=False, help="# examples. -1 means use all.")
        parser.add_argument("--n_test", type=int, default=-1, required=False, help="# examples. -1 means use all.")
301
302
303
        parser.add_argument(
            "--task", type=str, default="summarization", required=False, help="# examples. -1 means use all."
        )
304
        parser.add_argument("--label_smoothing", type=float, default=0.0, required=False)
305
306
        parser.add_argument("--src_lang", type=str, default="", required=False)
        parser.add_argument("--tgt_lang", type=str, default="", required=False)
307
308
309
310
311
312
313
        parser.add_argument(
            "--early_stopping_patience",
            type=int,
            default=-1,
            required=False,
            help="-1 means never early stop. early_stopping_patience is measured in validation checks, not epochs. So val_check_interval will effect it.",
        )
314
315
316
        return parser


317
318
319
320
321
322
class TranslationModule(SummarizationModule):
    mode = "translation"
    loss_names = ["loss"]
    metric_names = ["bleu"]
    val_metric = "bleu"

323
324
325
326
327
    def __init__(self, hparams, **kwargs):
        super().__init__(hparams, **kwargs)
        self.dataset_kwargs["src_lang"] = hparams.src_lang
        self.dataset_kwargs["tgt_lang"] = hparams.tgt_lang

328
329
330
331
    def calc_generative_metrics(self, preds, target) -> dict:
        return calculate_bleu_score(preds, target)


332
333
334
335
336
def main(args, model=None) -> SummarizationModule:
    Path(args.output_dir).mkdir(exist_ok=True)
    if len(os.listdir(args.output_dir)) > 3 and args.do_train:
        raise ValueError("Output directory ({}) already exists and is not empty.".format(args.output_dir))
    if model is None:
337
338
339
340
        if args.task == "summarization":
            model: SummarizationModule = SummarizationModule(args)
        else:
            model: SummarizationModule = TranslationModule(args)
341
342

    dataset = Path(args.data_dir).name
343
    if (
344
        args.logger_name == "default"
345
346
347
348
349
        or args.fast_dev_run
        or str(args.output_dir).startswith("/tmp")
        or str(args.output_dir).startswith("/var")
    ):
        logger = True  # don't pollute wandb logs unnecessarily
350
    elif args.logger_name == "wandb":
351
352
        from pytorch_lightning.loggers import WandbLogger

353
354
        project = os.environ.get("WANDB_PROJECT", dataset)
        logger = WandbLogger(name=model.output_dir.name, project=project)
355

356
    elif args.logger_name == "wandb_shared":
357
358
        from pytorch_lightning.loggers import WandbLogger

359
        logger = WandbLogger(name=model.output_dir.name, project=f"hf_{dataset}")
360
361
362
363
364

    if args.early_stopping_patience >= 0:
        es_callback = get_early_stopping_callback(model.val_metric, args.early_stopping_patience)
    else:
        es_callback = False
365
366
367
368
    trainer: pl.Trainer = generic_train(
        model,
        args,
        logging_callback=Seq2SeqLoggingCallback(),
369
        checkpoint_callback=get_checkpoint_callback(args.output_dir, model.val_metric),
370
        early_stopping_callback=es_callback,
371
372
373
        logger=logger,
        # TODO: early stopping callback seems messed up
    )
374
    pickle_save(model.hparams, model.output_dir / "hparams.pkl")
375
376
377
378
379
380
381
382
383
    if not args.do_predict:
        return model

    model.hparams.test_checkpoint = ""
    checkpoints = list(sorted(glob.glob(os.path.join(args.output_dir, "*.ckpt"), recursive=True)))
    if checkpoints:
        model.hparams.test_checkpoint = checkpoints[-1]
        trainer.resume_from_checkpoint = checkpoints[-1]
    trainer.logger.log_hyperparams(model.hparams)
384
385
386

    # test() without a model tests using the best checkpoint automatically
    trainer.test()
387
    return model
388
389
390
391


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
392
    parser = pl.Trainer.add_argparse_args(parser)
393
    parser = SummarizationModule.add_model_specific_args(parser, os.getcwd())
394

395
396
397
    args = parser.parse_args()

    main(args)