realm_index.py 8.62 KB
Newer Older
Neel Kant's avatar
Neel Kant committed
1
2
3
4
5
6
7
8
import itertools
import os
import pickle
import shutil

import numpy as np
import torch

Neel Kant's avatar
Neel Kant committed
9
from megatron import get_args
10
from megatron import mpu
Neel Kant's avatar
Neel Kant committed
11
12
13
14
15
16
17
18


def detach(tensor):
    return tensor.detach().cpu().numpy()


class BlockData(object):
    """Serializable data structure for holding data for blocks -- embeddings and necessary metadata for REALM"""
Neel Kant's avatar
Neel Kant committed
19
    def __init__(self, block_data_path=None, load_from_path=True, rank=None):
Neel Kant's avatar
Neel Kant committed
20
21
22
23
24
25
26
27
28
        self.embed_data = dict()
        self.meta_data = dict()
        if block_data_path is None:
            args = get_args()
            block_data_path = args.block_data_path
            rank = args.rank
        self.block_data_path = block_data_path
        self.rank = rank

Neel Kant's avatar
Neel Kant committed
29
30
31
        if load_from_path:
            self.load_from_file()

Neel Kant's avatar
Neel Kant committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
        block_data_name = os.path.splitext(self.block_data_path)[0]
        self.temp_dir_name = block_data_name + '_tmp'

    def state(self):
        return {
            'embed_data': self.embed_data,
            'meta_data': self.meta_data,
        }

    def clear(self):
        """Clear the embedding data structures to save memory.
        The metadata ends up getting used, and is also much smaller in dimensionality
        so it isn't really worth clearing.
        """
        self.embed_data = dict()

Neel Kant's avatar
Neel Kant committed
48
49
50
    def load_from_file(self):
        """Populate members from instance saved to file"""

51
52
        if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
            print("\n> Unpickling BlockData", flush=True)
Neel Kant's avatar
Neel Kant committed
53
        state_dict = pickle.load(open(self.block_data_path, 'rb'))
54
55
        if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
            print(">> Finished unpickling BlockData\n", flush=True)
Neel Kant's avatar
Neel Kant committed
56

Neel Kant's avatar
Neel Kant committed
57
58
        self.embed_data = state_dict['embed_data']
        self.meta_data = state_dict['meta_data']
Neel Kant's avatar
Neel Kant committed
59
60

    def add_block_data(self, block_indices, block_embeds, block_metas, allow_overwrite=False):
Neel Kant's avatar
Neel Kant committed
61
62
63
64
65
66
        """Add data for set of blocks
        :param block_indices: 1D array of unique int ids for the blocks
        :param block_embeds: 2D array of embeddings of the blocks
        :param block_metas: 2D array of metadata for the blocks.
            In the case of REALM this will be [start_idx, end_idx, doc_idx]
        """
Neel Kant's avatar
Neel Kant committed
67
68
69
70
71
72
73
74
        for idx, embed, meta in zip(block_indices, block_embeds, block_metas):
            if not allow_overwrite and idx in self.embed_data:
                raise ValueError("Unexpectedly tried to overwrite block data")

            self.embed_data[idx] = np.float16(embed)
            self.meta_data[idx] = meta

    def save_shard(self):
Neel Kant's avatar
Neel Kant committed
75
        """Save the block data that was created this in this process"""
Neel Kant's avatar
Neel Kant committed
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
        if not os.path.isdir(self.temp_dir_name):
            os.makedirs(self.temp_dir_name, exist_ok=True)

        # save the data for each shard
        with open('{}/{}.pkl'.format(self.temp_dir_name, self.rank), 'wb') as data_file:
            pickle.dump(self.state(), data_file)

    def merge_shards_and_save(self):
        """Combine all the shards made using self.save_shard()"""
        shard_names = os.listdir(self.temp_dir_name)
        seen_own_shard = False

        for fname in os.listdir(self.temp_dir_name):
            shard_rank = int(os.path.splitext(fname)[0])
            if shard_rank == self.rank:
                seen_own_shard = True
                continue

            with open('{}/{}'.format(self.temp_dir_name, fname), 'rb') as f:
                data = pickle.load(f)
                old_size = len(self.embed_data)
                shard_size = len(data['embed_data'])

                # add the shard's data and check to make sure there is no overlap
                self.embed_data.update(data['embed_data'])
                self.meta_data.update(data['meta_data'])
                assert len(self.embed_data) == old_size + shard_size

        assert seen_own_shard

        # save the consolidated shards and remove temporary directory
        with open(self.block_data_path, 'wb') as final_file:
            pickle.dump(self.state(), final_file)
        shutil.rmtree(self.temp_dir_name, ignore_errors=True)

        print("Finished merging {} shards for a total of {} embeds".format(
            len(shard_names), len(self.embed_data)), flush=True)


class FaissMIPSIndex(object):
    """Wrapper object for a BlockData which similarity search via FAISS under the hood"""
Neel Kant's avatar
Neel Kant committed
117
    def __init__(self, embed_size, block_data=None, use_gpu=False):
Neel Kant's avatar
Neel Kant committed
118
        self.embed_size = embed_size
Neel Kant's avatar
Neel Kant committed
119
        self.block_data = block_data
Neel Kant's avatar
Neel Kant committed
120
121
122
123
124
125
126
        self.use_gpu = use_gpu
        self.id_map = dict()

        self.block_mips_index = None
        self._set_block_index()

    def _set_block_index(self):
Neel Kant's avatar
Neel Kant committed
127
        """Create a Faiss Flat index with inner product as the metric to search against"""
Neel Kant's avatar
Neel Kant committed
128
129
130
131
132
        try:
            import faiss
        except ImportError:
            raise Exception("Error: Please install faiss to use FaissMIPSIndex")

133
134
        if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
            print("\n> Building index", flush=True)
Neel Kant's avatar
Neel Kant committed
135
136
137
138
139
140
141
142
143
144
        self.block_mips_index = faiss.index_factory(self.embed_size, 'Flat', faiss.METRIC_INNER_PRODUCT)

        if self.use_gpu:
            # create resources and config for GpuIndex
            res = faiss.StandardGpuResources()
            config = faiss.GpuIndexFlatConfig()
            config.device = torch.cuda.current_device()
            config.useFloat16 = True

            self.block_mips_index = faiss.GpuIndexFlat(res, self.block_mips_index, config)
145
146
            if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
                print(">> Initialized index on GPU {}".format(self.block_mips_index.getDevice()), flush=True)
Neel Kant's avatar
Neel Kant committed
147
148
149
        else:
            # CPU index supports IDs so wrap with IDMap
            self.block_mips_index = faiss.IndexIDMap(self.block_mips_index)
150
151
            if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
                print(">> Initialized index on CPU", flush=True)
Neel Kant's avatar
Neel Kant committed
152
153
154
155

        # if we were constructed with a BlockData, then automatically load it when the FAISS structure is built
        if self.block_data is not None:
            self.add_block_embed_data(self.block_data)
Neel Kant's avatar
Neel Kant committed
156
157
158
159

    def reset_index(self):
        """Delete existing index and create anew"""
        del self.block_mips_index
Neel Kant's avatar
Neel Kant committed
160
161
162
163
164

        # reset the block data so that _set_block_index will reload it as well
        if self.block_data is not None:
            block_data_path = self.block_data.block_data_path
            del self.block_data
165
            self.block_data = BlockData(block_data_path)
Neel Kant's avatar
Neel Kant committed
166

Neel Kant's avatar
Neel Kant committed
167
168
169
170
        self._set_block_index()

    def add_block_embed_data(self, all_block_data):
        """Add the embedding of each block to the underlying FAISS index"""
Neel Kant's avatar
Neel Kant committed
171
172

        # this assumes the embed_data is a dict : {int: np.array<float>}
Neel Kant's avatar
Neel Kant committed
173
        block_indices, block_embeds = zip(*all_block_data.embed_data.items())
Neel Kant's avatar
Neel Kant committed
174
175
176
177
178
179

        # the embeddings have to be entered in as float32 even though the math internally is done with float16.
        block_embeds_arr = np.float32(np.array(block_embeds))
        block_indices_arr = np.array(block_indices)

        # faiss GpuIndex doesn't work with IDMap wrapper so store ids to map back with
Neel Kant's avatar
Neel Kant committed
180
181
182
183
        if self.use_gpu:
            for i, idx in enumerate(block_indices):
                self.id_map[i] = idx

Neel Kant's avatar
Neel Kant committed
184
        # we no longer need the embedding data since it's in the index now
Neel Kant's avatar
Neel Kant committed
185
        all_block_data.clear()
Neel Kant's avatar
Neel Kant committed
186

Neel Kant's avatar
Neel Kant committed
187
        if self.use_gpu:
Neel Kant's avatar
Neel Kant committed
188
            self.block_mips_index.add(block_embeds_arr)
Neel Kant's avatar
Neel Kant committed
189
        else:
Neel Kant's avatar
Neel Kant committed
190
191
            self.block_mips_index.add_with_ids(block_embeds_arr, block_indices_arr)

192
193
        if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
            print(">>> Finished adding block data to index", flush=True)
Neel Kant's avatar
Neel Kant committed
194
195
196
197
198
199
200
201

    def search_mips_index(self, query_embeds, top_k, reconstruct=True):
        """Get the top-k blocks by the index distance metric.

        :param reconstruct: if True: return a [num_queries x k x embed_dim] array of blocks
                            if False: return [num_queries x k] array of distances, and another for indices
        """
        query_embeds = np.float32(detach(query_embeds))
Neel Kant's avatar
Neel Kant committed
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216

        if reconstruct:
            # get the vectors themselves
            top_k_block_embeds = self.block_mips_index.search_and_reconstruct(query_embeds, top_k)
            return top_k_block_embeds

        else:
            # get distances and indices of closest vectors
            distances, block_indices = self.block_mips_index.search(query_embeds, top_k)
            if self.use_gpu:
                fresh_indices = np.zeros(block_indices.shape)
                for i, j in itertools.product(block_indices.shape):
                    fresh_indices[i, j] = self.id_map[block_indices[i, j]]
                block_indices = fresh_indices
            return distances, block_indices