realm_index.py 8.36 KB
Newer Older
Neel Kant's avatar
Neel Kant committed
1
2
3
4
5
6
7
8
import itertools
import os
import pickle
import shutil

import numpy as np
import torch

Neel Kant's avatar
Neel Kant committed
9
from megatron import get_args
10
from megatron import mpu
Neel Kant's avatar
Neel Kant committed
11
12
13
14
15
16


def detach(tensor):
    return tensor.detach().cpu().numpy()


Mostofa Patwary's avatar
Mostofa Patwary committed
17
class OpenRetreivalDataStore(object):
18
19
20
21
    """
    Serializable data structure for holding data for blocks --
    embeddings and necessary metadata for Retriever
    """
Mostofa Patwary's avatar
Mostofa Patwary committed
22
    def __init__(self, embedding_path=None, load_from_path=True, rank=None):
Neel Kant's avatar
Neel Kant committed
23
        self.embed_data = dict()
Mostofa Patwary's avatar
Mostofa Patwary committed
24
        if embedding_path is None:
Neel Kant's avatar
Neel Kant committed
25
            args = get_args()
Mostofa Patwary's avatar
Mostofa Patwary committed
26
            embedding_path = args.embedding_path
Neel Kant's avatar
Neel Kant committed
27
            rank = args.rank
Mostofa Patwary's avatar
Mostofa Patwary committed
28
        self.embedding_path = embedding_path
Neel Kant's avatar
Neel Kant committed
29
30
        self.rank = rank

Neel Kant's avatar
Neel Kant committed
31
32
33
        if load_from_path:
            self.load_from_file()

Mostofa Patwary's avatar
Mostofa Patwary committed
34
        block_data_name = os.path.splitext(self.embedding_path)[0]
Neel Kant's avatar
Neel Kant committed
35
36
37
38
39
40
41
42
        self.temp_dir_name = block_data_name + '_tmp'

    def state(self):
        return {
            'embed_data': self.embed_data,
        }

    def clear(self):
43
44
45
46
        """
        Clear the embedding data structures to save memory.
        The metadata ends up getting used, and is also much smaller in
        dimensionality so it isn't really worth clearing.
Neel Kant's avatar
Neel Kant committed
47
48
49
        """
        self.embed_data = dict()

Neel Kant's avatar
Neel Kant committed
50
51
52
    def load_from_file(self):
        """Populate members from instance saved to file"""

53
54
        if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
            print("\n> Unpickling BlockData", flush=True)
Mostofa Patwary's avatar
Mostofa Patwary committed
55
        state_dict = pickle.load(open(self.embedding_path, 'rb'))
56
57
        if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
            print(">> Finished unpickling BlockData\n", flush=True)
Neel Kant's avatar
Neel Kant committed
58

Neel Kant's avatar
Neel Kant committed
59
        self.embed_data = state_dict['embed_data']
Neel Kant's avatar
Neel Kant committed
60

Mostofa Patwary's avatar
Mostofa Patwary committed
61
    def add_block_data(self, row_id, block_embeds, allow_overwrite=False):
62
63
        """
        Add data for set of blocks
Mostofa Patwary's avatar
Mostofa Patwary committed
64
        :param row_id: 1D array of unique int ids for the blocks
Neel Kant's avatar
Neel Kant committed
65
        :param block_embeds: 2D array of embeddings of the blocks
66
            In the case of retriever this will be [start_idx, end_idx, doc_idx]
Neel Kant's avatar
Neel Kant committed
67
        """
Mostofa Patwary's avatar
Mostofa Patwary committed
68
        for idx, embed in zip(row_id, block_embeds):
Neel Kant's avatar
Neel Kant committed
69
70
71
72
73
74
            if not allow_overwrite and idx in self.embed_data:
                raise ValueError("Unexpectedly tried to overwrite block data")

            self.embed_data[idx] = np.float16(embed)

    def save_shard(self):
75
76
77
        """
        Save the block data that was created this in this process
        """
Neel Kant's avatar
Neel Kant committed
78
79
80
81
        if not os.path.isdir(self.temp_dir_name):
            os.makedirs(self.temp_dir_name, exist_ok=True)

        # save the data for each shard
82
83
        with open('{}/{}.pkl'.format(self.temp_dir_name, self.rank), 'wb') \
            as writer:
Mostofa Patwary's avatar
Mostofa Patwary committed
84
            pickle.dump(self.state(), writer)
Neel Kant's avatar
Neel Kant committed
85
86

    def merge_shards_and_save(self):
87
        #Combine all the shards made using save_shard
Neel Kant's avatar
Neel Kant committed
88
89
90
91
92
93
94
95
96
97
98
99
100
101
        shard_names = os.listdir(self.temp_dir_name)
        seen_own_shard = False

        for fname in os.listdir(self.temp_dir_name):
            shard_rank = int(os.path.splitext(fname)[0])
            if shard_rank == self.rank:
                seen_own_shard = True
                continue

            with open('{}/{}'.format(self.temp_dir_name, fname), 'rb') as f:
                data = pickle.load(f)
                old_size = len(self.embed_data)
                shard_size = len(data['embed_data'])

102
103
                # add the shard's data and check to make sure there
                # is no overlap
Neel Kant's avatar
Neel Kant committed
104
105
106
107
108
109
                self.embed_data.update(data['embed_data'])
                assert len(self.embed_data) == old_size + shard_size

        assert seen_own_shard

        # save the consolidated shards and remove temporary directory
Mostofa Patwary's avatar
Mostofa Patwary committed
110
        with open(self.embedding_path, 'wb') as final_file:
Neel Kant's avatar
Neel Kant committed
111
112
113
114
115
116
117
118
119
            pickle.dump(self.state(), final_file)
        shutil.rmtree(self.temp_dir_name, ignore_errors=True)

        print("Finished merging {} shards for a total of {} embeds".format(
            len(shard_names), len(self.embed_data)), flush=True)


class FaissMIPSIndex(object):
    """Wrapper object for a BlockData which similarity search via FAISS under the hood"""
Neel Kant's avatar
Neel Kant committed
120
    def __init__(self, embed_size, block_data=None, use_gpu=False):
Neel Kant's avatar
Neel Kant committed
121
        self.embed_size = embed_size
Neel Kant's avatar
Neel Kant committed
122
        self.block_data = block_data
Neel Kant's avatar
Neel Kant committed
123
124
125
126
127
128
129
        self.use_gpu = use_gpu
        self.id_map = dict()

        self.block_mips_index = None
        self._set_block_index()

    def _set_block_index(self):
Neel Kant's avatar
Neel Kant committed
130
        """Create a Faiss Flat index with inner product as the metric to search against"""
Neel Kant's avatar
Neel Kant committed
131
132
133
134
135
        try:
            import faiss
        except ImportError:
            raise Exception("Error: Please install faiss to use FaissMIPSIndex")

136
137
        if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
            print("\n> Building index", flush=True)
Neel Kant's avatar
Neel Kant committed
138
139
140
141
142
143
144
145
146
147
        self.block_mips_index = faiss.index_factory(self.embed_size, 'Flat', faiss.METRIC_INNER_PRODUCT)

        if self.use_gpu:
            # create resources and config for GpuIndex
            res = faiss.StandardGpuResources()
            config = faiss.GpuIndexFlatConfig()
            config.device = torch.cuda.current_device()
            config.useFloat16 = True

            self.block_mips_index = faiss.GpuIndexFlat(res, self.block_mips_index, config)
148
149
            if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
                print(">> Initialized index on GPU {}".format(self.block_mips_index.getDevice()), flush=True)
Neel Kant's avatar
Neel Kant committed
150
151
152
        else:
            # CPU index supports IDs so wrap with IDMap
            self.block_mips_index = faiss.IndexIDMap(self.block_mips_index)
153
154
            if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
                print(">> Initialized index on CPU", flush=True)
Neel Kant's avatar
Neel Kant committed
155
156
157
158

        # if we were constructed with a BlockData, then automatically load it when the FAISS structure is built
        if self.block_data is not None:
            self.add_block_embed_data(self.block_data)
Neel Kant's avatar
Neel Kant committed
159
160
161
162

    def reset_index(self):
        """Delete existing index and create anew"""
        del self.block_mips_index
Neel Kant's avatar
Neel Kant committed
163
164
165
166
167

        # reset the block data so that _set_block_index will reload it as well
        if self.block_data is not None:
            block_data_path = self.block_data.block_data_path
            del self.block_data
168
            self.block_data = BlockData(block_data_path)
Neel Kant's avatar
Neel Kant committed
169

Neel Kant's avatar
Neel Kant committed
170
171
172
173
        self._set_block_index()

    def add_block_embed_data(self, all_block_data):
        """Add the embedding of each block to the underlying FAISS index"""
Neel Kant's avatar
Neel Kant committed
174
175

        # this assumes the embed_data is a dict : {int: np.array<float>}
Neel Kant's avatar
Neel Kant committed
176
        block_indices, block_embeds = zip(*all_block_data.embed_data.items())
Neel Kant's avatar
Neel Kant committed
177
178
179
180
181
182

        # the embeddings have to be entered in as float32 even though the math internally is done with float16.
        block_embeds_arr = np.float32(np.array(block_embeds))
        block_indices_arr = np.array(block_indices)

        # faiss GpuIndex doesn't work with IDMap wrapper so store ids to map back with
Neel Kant's avatar
Neel Kant committed
183
184
185
186
        if self.use_gpu:
            for i, idx in enumerate(block_indices):
                self.id_map[i] = idx

Neel Kant's avatar
Neel Kant committed
187
        # we no longer need the embedding data since it's in the index now
Neel Kant's avatar
Neel Kant committed
188
        all_block_data.clear()
Neel Kant's avatar
Neel Kant committed
189

Neel Kant's avatar
Neel Kant committed
190
        if self.use_gpu:
Neel Kant's avatar
Neel Kant committed
191
            self.block_mips_index.add(block_embeds_arr)
Neel Kant's avatar
Neel Kant committed
192
        else:
Neel Kant's avatar
Neel Kant committed
193
194
            self.block_mips_index.add_with_ids(block_embeds_arr, block_indices_arr)

195
196
        if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
            print(">>> Finished adding block data to index", flush=True)
Neel Kant's avatar
Neel Kant committed
197
198
199
200
201
202
203
204

    def search_mips_index(self, query_embeds, top_k, reconstruct=True):
        """Get the top-k blocks by the index distance metric.

        :param reconstruct: if True: return a [num_queries x k x embed_dim] array of blocks
                            if False: return [num_queries x k] array of distances, and another for indices
        """
        query_embeds = np.float32(detach(query_embeds))
Neel Kant's avatar
Neel Kant committed
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219

        if reconstruct:
            # get the vectors themselves
            top_k_block_embeds = self.block_mips_index.search_and_reconstruct(query_embeds, top_k)
            return top_k_block_embeds

        else:
            # get distances and indices of closest vectors
            distances, block_indices = self.block_mips_index.search(query_embeds, top_k)
            if self.use_gpu:
                fresh_indices = np.zeros(block_indices.shape)
                for i, j in itertools.product(block_indices.shape):
                    fresh_indices[i, j] = self.id_map[block_indices[i, j]]
                block_indices = fresh_indices
            return distances, block_indices