realm_index.py 8.11 KB
Newer Older
Neel Kant's avatar
Neel Kant committed
1
2
3
4
5
6
7
8
9
import itertools
import os
import pickle
import shutil

import faiss
import numpy as np
import torch

Neel Kant's avatar
Neel Kant committed
10
from megatron import get_args
Neel Kant's avatar
Neel Kant committed
11
12
13
14
15
16
17
18


def detach(tensor):
    return tensor.detach().cpu().numpy()


class BlockData(object):
    """Serializable data structure for holding data for blocks -- embeddings and necessary metadata for REALM"""
Neel Kant's avatar
Neel Kant committed
19
    def __init__(self, block_data_path=None, load_from_path=True, rank=None):
Neel Kant's avatar
Neel Kant committed
20
21
22
23
24
25
26
27
28
        self.embed_data = dict()
        self.meta_data = dict()
        if block_data_path is None:
            args = get_args()
            block_data_path = args.block_data_path
            rank = args.rank
        self.block_data_path = block_data_path
        self.rank = rank

Neel Kant's avatar
Neel Kant committed
29
30
31
        if load_from_path:
            self.load_from_file()

Neel Kant's avatar
Neel Kant committed
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
        block_data_name = os.path.splitext(self.block_data_path)[0]
        self.temp_dir_name = block_data_name + '_tmp'

    def state(self):
        return {
            'embed_data': self.embed_data,
            'meta_data': self.meta_data,
        }

    def clear(self):
        """Clear the embedding data structures to save memory.
        The metadata ends up getting used, and is also much smaller in dimensionality
        so it isn't really worth clearing.
        """
        self.embed_data = dict()

Neel Kant's avatar
Neel Kant committed
48
49
50
    def load_from_file(self):
        """Populate members from instance saved to file"""

Neel Kant's avatar
Neel Kant committed
51
        print("\n> Unpickling BlockData", flush=True)
Neel Kant's avatar
Neel Kant committed
52
        state_dict = pickle.load(open(self.block_data_path, 'rb'))
Neel Kant's avatar
Neel Kant committed
53
54
        print(">> Finished unpickling BlockData\n", flush=True)

Neel Kant's avatar
Neel Kant committed
55
56
        self.embed_data = state_dict['embed_data']
        self.meta_data = state_dict['meta_data']
Neel Kant's avatar
Neel Kant committed
57
58

    def add_block_data(self, block_indices, block_embeds, block_metas, allow_overwrite=False):
Neel Kant's avatar
Neel Kant committed
59
60
61
62
63
64
        """Add data for set of blocks
        :param block_indices: 1D array of unique int ids for the blocks
        :param block_embeds: 2D array of embeddings of the blocks
        :param block_metas: 2D array of metadata for the blocks.
            In the case of REALM this will be [start_idx, end_idx, doc_idx]
        """
Neel Kant's avatar
Neel Kant committed
65
66
67
68
69
70
71
72
        for idx, embed, meta in zip(block_indices, block_embeds, block_metas):
            if not allow_overwrite and idx in self.embed_data:
                raise ValueError("Unexpectedly tried to overwrite block data")

            self.embed_data[idx] = np.float16(embed)
            self.meta_data[idx] = meta

    def save_shard(self):
Neel Kant's avatar
Neel Kant committed
73
        """Save the block data that was created this in this process"""
Neel Kant's avatar
Neel Kant committed
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
        if not os.path.isdir(self.temp_dir_name):
            os.makedirs(self.temp_dir_name, exist_ok=True)

        # save the data for each shard
        with open('{}/{}.pkl'.format(self.temp_dir_name, self.rank), 'wb') as data_file:
            pickle.dump(self.state(), data_file)

    def merge_shards_and_save(self):
        """Combine all the shards made using self.save_shard()"""
        shard_names = os.listdir(self.temp_dir_name)
        seen_own_shard = False

        for fname in os.listdir(self.temp_dir_name):
            shard_rank = int(os.path.splitext(fname)[0])
            if shard_rank == self.rank:
                seen_own_shard = True
                continue

            with open('{}/{}'.format(self.temp_dir_name, fname), 'rb') as f:
                data = pickle.load(f)
                old_size = len(self.embed_data)
                shard_size = len(data['embed_data'])

                # add the shard's data and check to make sure there is no overlap
                self.embed_data.update(data['embed_data'])
                self.meta_data.update(data['meta_data'])
                assert len(self.embed_data) == old_size + shard_size

        assert seen_own_shard

        # save the consolidated shards and remove temporary directory
        with open(self.block_data_path, 'wb') as final_file:
            pickle.dump(self.state(), final_file)
        shutil.rmtree(self.temp_dir_name, ignore_errors=True)

        print("Finished merging {} shards for a total of {} embeds".format(
            len(shard_names), len(self.embed_data)), flush=True)


class FaissMIPSIndex(object):
    """Wrapper object for a BlockData which similarity search via FAISS under the hood"""
Neel Kant's avatar
Neel Kant committed
115
    def __init__(self, embed_size, block_data=None, use_gpu=False):
Neel Kant's avatar
Neel Kant committed
116
        self.embed_size = embed_size
Neel Kant's avatar
Neel Kant committed
117
        self.block_data = block_data
Neel Kant's avatar
Neel Kant committed
118
119
120
121
122
123
124
        self.use_gpu = use_gpu
        self.id_map = dict()

        self.block_mips_index = None
        self._set_block_index()

    def _set_block_index(self):
Neel Kant's avatar
Neel Kant committed
125
        """Create a Faiss Flat index with inner product as the metric to search against"""
Neel Kant's avatar
Neel Kant committed
126
127
128
129
130
131
132
133
134
135
136
        print("\n> Building index", flush=True)
        self.block_mips_index = faiss.index_factory(self.embed_size, 'Flat', faiss.METRIC_INNER_PRODUCT)

        if self.use_gpu:
            # create resources and config for GpuIndex
            res = faiss.StandardGpuResources()
            config = faiss.GpuIndexFlatConfig()
            config.device = torch.cuda.current_device()
            config.useFloat16 = True

            self.block_mips_index = faiss.GpuIndexFlat(res, self.block_mips_index, config)
Neel Kant's avatar
Neel Kant committed
137
            print(">> Initialized index on GPU {}".format(self.block_mips_index.getDevice()), flush=True)
Neel Kant's avatar
Neel Kant committed
138
139
140
        else:
            # CPU index supports IDs so wrap with IDMap
            self.block_mips_index = faiss.IndexIDMap(self.block_mips_index)
Neel Kant's avatar
Neel Kant committed
141
142
143
144
145
            print(">> Initialized index on CPU", flush=True)

        # if we were constructed with a BlockData, then automatically load it when the FAISS structure is built
        if self.block_data is not None:
            self.add_block_embed_data(self.block_data)
Neel Kant's avatar
Neel Kant committed
146
147
148
149

    def reset_index(self):
        """Delete existing index and create anew"""
        del self.block_mips_index
Neel Kant's avatar
Neel Kant committed
150
151
152
153
154
155
156

        # reset the block data so that _set_block_index will reload it as well
        if self.block_data is not None:
            block_data_path = self.block_data.block_data_path
            del self.block_data
            self.block_data = BlockData.load_from_file(block_data_path)

Neel Kant's avatar
Neel Kant committed
157
158
159
160
        self._set_block_index()

    def add_block_embed_data(self, all_block_data):
        """Add the embedding of each block to the underlying FAISS index"""
Neel Kant's avatar
Neel Kant committed
161
162

        # this assumes the embed_data is a dict : {int: np.array<float>}
Neel Kant's avatar
Neel Kant committed
163
        block_indices, block_embeds = zip(*all_block_data.embed_data.items())
Neel Kant's avatar
Neel Kant committed
164
165
166
167
168
169

        # the embeddings have to be entered in as float32 even though the math internally is done with float16.
        block_embeds_arr = np.float32(np.array(block_embeds))
        block_indices_arr = np.array(block_indices)

        # faiss GpuIndex doesn't work with IDMap wrapper so store ids to map back with
Neel Kant's avatar
Neel Kant committed
170
171
172
173
        if self.use_gpu:
            for i, idx in enumerate(block_indices):
                self.id_map[i] = idx

Neel Kant's avatar
Neel Kant committed
174
        # we no longer need the embedding data since it's in the index now
Neel Kant's avatar
Neel Kant committed
175
        all_block_data.clear()
Neel Kant's avatar
Neel Kant committed
176

Neel Kant's avatar
Neel Kant committed
177
        if self.use_gpu:
Neel Kant's avatar
Neel Kant committed
178
            self.block_mips_index.add(block_embeds_arr)
Neel Kant's avatar
Neel Kant committed
179
        else:
Neel Kant's avatar
Neel Kant committed
180
181
182
            self.block_mips_index.add_with_ids(block_embeds_arr, block_indices_arr)

        print(">>> Finished adding block data to index", flush=True)
Neel Kant's avatar
Neel Kant committed
183
184
185
186
187
188
189
190
191

    def search_mips_index(self, query_embeds, top_k, reconstruct=True):
        """Get the top-k blocks by the index distance metric.

        :param reconstruct: if True: return a [num_queries x k x embed_dim] array of blocks
                            if False: return [num_queries x k] array of distances, and another for indices
        """
        query_embeds = np.float32(detach(query_embeds))
        with torch.no_grad():
Neel Kant's avatar
Neel Kant committed
192

Neel Kant's avatar
Neel Kant committed
193
            if reconstruct:
Neel Kant's avatar
Neel Kant committed
194
                # get the vectors themselves
Neel Kant's avatar
Neel Kant committed
195
196
                top_k_block_embeds = self.block_mips_index.search_and_reconstruct(query_embeds, top_k)
                return top_k_block_embeds
Neel Kant's avatar
Neel Kant committed
197

Neel Kant's avatar
Neel Kant committed
198
            else:
Neel Kant's avatar
Neel Kant committed
199
                # get distances and indices of closest vectors
Neel Kant's avatar
Neel Kant committed
200
201
202
203
204
205
206
                distances, block_indices = self.block_mips_index.search(query_embeds, top_k)
                if self.use_gpu:
                    fresh_indices = np.zeros(block_indices.shape)
                    for i, j in itertools.product(block_indices.shape):
                        fresh_indices[i, j] = self.id_map[block_indices[i, j]]
                    block_indices = fresh_indices
                return distances, block_indices