realm_index.py 8.75 KB
Newer Older
Neel Kant's avatar
Neel Kant committed
1
2
3
4
5
6
7
8
import itertools
import os
import pickle
import shutil

import numpy as np
import torch

Neel Kant's avatar
Neel Kant committed
9
from megatron import get_args
10
from megatron import mpu
Neel Kant's avatar
Neel Kant committed
11
12
13
14
15
16


def detach(tensor):
    return tensor.detach().cpu().numpy()


Mostofa Patwary's avatar
Mostofa Patwary committed
17
18
19
20
class OpenRetreivalDataStore(object):
    """Serializable data structure for holding data for blocks -- embeddings 
    and necessary metadata for Retriever"""
    def __init__(self, embedding_path=None, load_from_path=True, rank=None):
Neel Kant's avatar
Neel Kant committed
21
        self.embed_data = dict()
Mostofa Patwary's avatar
Mostofa Patwary committed
22
23
        #self.meta_data = dict()
        if embedding_path is None:
Neel Kant's avatar
Neel Kant committed
24
            args = get_args()
Mostofa Patwary's avatar
Mostofa Patwary committed
25
            embedding_path = args.embedding_path
Neel Kant's avatar
Neel Kant committed
26
            rank = args.rank
Mostofa Patwary's avatar
Mostofa Patwary committed
27
        self.embedding_path = embedding_path
Neel Kant's avatar
Neel Kant committed
28
29
        self.rank = rank

Neel Kant's avatar
Neel Kant committed
30
31
32
        if load_from_path:
            self.load_from_file()

Mostofa Patwary's avatar
Mostofa Patwary committed
33
        block_data_name = os.path.splitext(self.embedding_path)[0]
Neel Kant's avatar
Neel Kant committed
34
35
36
37
38
        self.temp_dir_name = block_data_name + '_tmp'

    def state(self):
        return {
            'embed_data': self.embed_data,
Mostofa Patwary's avatar
Mostofa Patwary committed
39
            #'meta_data': self.meta_data,
Neel Kant's avatar
Neel Kant committed
40
41
42
43
44
45
46
47
48
        }

    def clear(self):
        """Clear the embedding data structures to save memory.
        The metadata ends up getting used, and is also much smaller in dimensionality
        so it isn't really worth clearing.
        """
        self.embed_data = dict()

Neel Kant's avatar
Neel Kant committed
49
50
51
    def load_from_file(self):
        """Populate members from instance saved to file"""

52
53
        if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
            print("\n> Unpickling BlockData", flush=True)
Mostofa Patwary's avatar
Mostofa Patwary committed
54
        state_dict = pickle.load(open(self.embedding_path, 'rb'))
55
56
        if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
            print(">> Finished unpickling BlockData\n", flush=True)
Neel Kant's avatar
Neel Kant committed
57

Neel Kant's avatar
Neel Kant committed
58
        self.embed_data = state_dict['embed_data']
Mostofa Patwary's avatar
Mostofa Patwary committed
59
        #self.meta_data = state_dict['meta_data']
Neel Kant's avatar
Neel Kant committed
60

Mostofa Patwary's avatar
Mostofa Patwary committed
61
62
    #def add_block_data(self, block_indices, block_embeds, block_metas, allow_overwrite=False):
    def add_block_data(self, row_id, block_embeds, allow_overwrite=False):
Neel Kant's avatar
Neel Kant committed
63
        """Add data for set of blocks
Mostofa Patwary's avatar
Mostofa Patwary committed
64
        :param row_id: 1D array of unique int ids for the blocks
Neel Kant's avatar
Neel Kant committed
65
        :param block_embeds: 2D array of embeddings of the blocks
Mostofa Patwary's avatar
Mostofa Patwary committed
66
        #:param block_metas: 2D array of metadata for the blocks.
Neel Kant's avatar
Neel Kant committed
67
68
            In the case of REALM this will be [start_idx, end_idx, doc_idx]
        """
Mostofa Patwary's avatar
Mostofa Patwary committed
69
70
        #for idx, embed, meta in zip(block_indices, block_embeds, block_metas):
        for idx, embed in zip(row_id, block_embeds):
Neel Kant's avatar
Neel Kant committed
71
72
73
74
            if not allow_overwrite and idx in self.embed_data:
                raise ValueError("Unexpectedly tried to overwrite block data")

            self.embed_data[idx] = np.float16(embed)
Mostofa Patwary's avatar
Mostofa Patwary committed
75
            #self.meta_data[idx] = meta
Neel Kant's avatar
Neel Kant committed
76
77

    def save_shard(self):
Neel Kant's avatar
Neel Kant committed
78
        """Save the block data that was created this in this process"""
Neel Kant's avatar
Neel Kant committed
79
80
81
82
        if not os.path.isdir(self.temp_dir_name):
            os.makedirs(self.temp_dir_name, exist_ok=True)

        # save the data for each shard
Mostofa Patwary's avatar
Mostofa Patwary committed
83
84
        with open('{}/{}.pkl'.format(self.temp_dir_name, self.rank), 'wb') as writer:
            pickle.dump(self.state(), writer)
Neel Kant's avatar
Neel Kant committed
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

    def merge_shards_and_save(self):
        """Combine all the shards made using self.save_shard()"""
        shard_names = os.listdir(self.temp_dir_name)
        seen_own_shard = False

        for fname in os.listdir(self.temp_dir_name):
            shard_rank = int(os.path.splitext(fname)[0])
            if shard_rank == self.rank:
                seen_own_shard = True
                continue

            with open('{}/{}'.format(self.temp_dir_name, fname), 'rb') as f:
                data = pickle.load(f)
                old_size = len(self.embed_data)
                shard_size = len(data['embed_data'])

                # add the shard's data and check to make sure there is no overlap
                self.embed_data.update(data['embed_data'])
Mostofa Patwary's avatar
Mostofa Patwary committed
104
                #self.meta_data.update(data['meta_data'])
Neel Kant's avatar
Neel Kant committed
105
106
107
108
109
                assert len(self.embed_data) == old_size + shard_size

        assert seen_own_shard

        # save the consolidated shards and remove temporary directory
Mostofa Patwary's avatar
Mostofa Patwary committed
110
        with open(self.embedding_path, 'wb') as final_file:
Neel Kant's avatar
Neel Kant committed
111
112
113
114
115
116
117
118
119
            pickle.dump(self.state(), final_file)
        shutil.rmtree(self.temp_dir_name, ignore_errors=True)

        print("Finished merging {} shards for a total of {} embeds".format(
            len(shard_names), len(self.embed_data)), flush=True)


class FaissMIPSIndex(object):
    """Wrapper object for a BlockData which similarity search via FAISS under the hood"""
Neel Kant's avatar
Neel Kant committed
120
    def __init__(self, embed_size, block_data=None, use_gpu=False):
Neel Kant's avatar
Neel Kant committed
121
        self.embed_size = embed_size
Neel Kant's avatar
Neel Kant committed
122
        self.block_data = block_data
Neel Kant's avatar
Neel Kant committed
123
124
125
126
127
128
129
        self.use_gpu = use_gpu
        self.id_map = dict()

        self.block_mips_index = None
        self._set_block_index()

    def _set_block_index(self):
Neel Kant's avatar
Neel Kant committed
130
        """Create a Faiss Flat index with inner product as the metric to search against"""
Neel Kant's avatar
Neel Kant committed
131
132
133
134
135
        try:
            import faiss
        except ImportError:
            raise Exception("Error: Please install faiss to use FaissMIPSIndex")

136
137
        if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
            print("\n> Building index", flush=True)
Neel Kant's avatar
Neel Kant committed
138
139
140
141
142
143
144
145
146
147
        self.block_mips_index = faiss.index_factory(self.embed_size, 'Flat', faiss.METRIC_INNER_PRODUCT)

        if self.use_gpu:
            # create resources and config for GpuIndex
            res = faiss.StandardGpuResources()
            config = faiss.GpuIndexFlatConfig()
            config.device = torch.cuda.current_device()
            config.useFloat16 = True

            self.block_mips_index = faiss.GpuIndexFlat(res, self.block_mips_index, config)
148
149
            if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
                print(">> Initialized index on GPU {}".format(self.block_mips_index.getDevice()), flush=True)
Neel Kant's avatar
Neel Kant committed
150
151
152
        else:
            # CPU index supports IDs so wrap with IDMap
            self.block_mips_index = faiss.IndexIDMap(self.block_mips_index)
153
154
            if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
                print(">> Initialized index on CPU", flush=True)
Neel Kant's avatar
Neel Kant committed
155
156
157
158

        # if we were constructed with a BlockData, then automatically load it when the FAISS structure is built
        if self.block_data is not None:
            self.add_block_embed_data(self.block_data)
Neel Kant's avatar
Neel Kant committed
159
160
161
162

    def reset_index(self):
        """Delete existing index and create anew"""
        del self.block_mips_index
Neel Kant's avatar
Neel Kant committed
163
164
165
166
167

        # reset the block data so that _set_block_index will reload it as well
        if self.block_data is not None:
            block_data_path = self.block_data.block_data_path
            del self.block_data
168
            self.block_data = BlockData(block_data_path)
Neel Kant's avatar
Neel Kant committed
169

Neel Kant's avatar
Neel Kant committed
170
171
172
173
        self._set_block_index()

    def add_block_embed_data(self, all_block_data):
        """Add the embedding of each block to the underlying FAISS index"""
Neel Kant's avatar
Neel Kant committed
174
175

        # this assumes the embed_data is a dict : {int: np.array<float>}
Neel Kant's avatar
Neel Kant committed
176
        block_indices, block_embeds = zip(*all_block_data.embed_data.items())
Neel Kant's avatar
Neel Kant committed
177
178
179
180
181
182

        # the embeddings have to be entered in as float32 even though the math internally is done with float16.
        block_embeds_arr = np.float32(np.array(block_embeds))
        block_indices_arr = np.array(block_indices)

        # faiss GpuIndex doesn't work with IDMap wrapper so store ids to map back with
Neel Kant's avatar
Neel Kant committed
183
184
185
186
        if self.use_gpu:
            for i, idx in enumerate(block_indices):
                self.id_map[i] = idx

Neel Kant's avatar
Neel Kant committed
187
        # we no longer need the embedding data since it's in the index now
Neel Kant's avatar
Neel Kant committed
188
        all_block_data.clear()
Neel Kant's avatar
Neel Kant committed
189

Neel Kant's avatar
Neel Kant committed
190
        if self.use_gpu:
Neel Kant's avatar
Neel Kant committed
191
            self.block_mips_index.add(block_embeds_arr)
Neel Kant's avatar
Neel Kant committed
192
        else:
Neel Kant's avatar
Neel Kant committed
193
194
            self.block_mips_index.add_with_ids(block_embeds_arr, block_indices_arr)

195
196
        if mpu.is_unitialized() or mpu.get_data_parallel_rank() == 0:
            print(">>> Finished adding block data to index", flush=True)
Neel Kant's avatar
Neel Kant committed
197
198
199
200
201
202
203
204

    def search_mips_index(self, query_embeds, top_k, reconstruct=True):
        """Get the top-k blocks by the index distance metric.

        :param reconstruct: if True: return a [num_queries x k x embed_dim] array of blocks
                            if False: return [num_queries x k] array of distances, and another for indices
        """
        query_embeds = np.float32(detach(query_embeds))
Neel Kant's avatar
Neel Kant committed
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219

        if reconstruct:
            # get the vectors themselves
            top_k_block_embeds = self.block_mips_index.search_and_reconstruct(query_embeds, top_k)
            return top_k_block_embeds

        else:
            # get distances and indices of closest vectors
            distances, block_indices = self.block_mips_index.search(query_embeds, top_k)
            if self.use_gpu:
                fresh_indices = np.zeros(block_indices.shape)
                for i, j in itertools.product(block_indices.shape):
                    fresh_indices[i, j] = self.id_map[block_indices[i, j]]
                block_indices = fresh_indices
            return distances, block_indices