Commit 395d2ce6 authored by huchen's avatar huchen
Browse files

init the faiss for rocm

parent 5ded39f5
#! /usr/bin/env python2
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import print_function
import numpy as np
import time
import os
import sys
import faiss
import re
from multiprocessing.dummy import Pool as ThreadPool
from datasets import ivecs_read
####################################################################
# Parse command line
####################################################################
def usage():
print("""
Usage: bench_gpu_1bn.py dataset indextype [options]
dataset: set of vectors to operate on.
Supported: SIFT1M, SIFT2M, ..., SIFT1000M or Deep1B
indextype: any index type supported by index_factory that runs on GPU.
General options
-ngpu ngpu nb of GPUs to use (default = all)
-tempmem N use N bytes of temporary GPU memory
-nocache do not read or write intermediate files
-float16 use 16-bit floats on the GPU side
Add options
-abs N split adds in blocks of no more than N vectors
-max_add N copy sharded dataset to CPU each max_add additions
(to avoid memory overflows with geometric reallocations)
-altadd Alternative add function, where the index is not stored
on GPU during add. Slightly faster for big datasets on
slow GPUs
Search options
-R R: nb of replicas of the same dataset (the dataset
will be copied across ngpu/R, default R=1)
-noptables do not use precomputed tables in IVFPQ.
-qbs N split queries in blocks of no more than N vectors
-nnn N search N neighbors for each query
-nprobe 4,16,64 try this number of probes
-knngraph instead of the standard setup for the dataset,
compute a k-nn graph with nnn neighbors per element
-oI xx%d.npy output the search result indices to this numpy file,
%d will be replaced with the nprobe
-oD xx%d.npy output the search result distances to this file
""", file=sys.stderr)
sys.exit(1)
# default values
dbname = None
index_key = None
ngpu = faiss.get_num_gpus()
replicas = 1 # nb of replicas of sharded dataset
add_batch_size = 32768
query_batch_size = 16384
nprobes = [1 << l for l in range(9)]
knngraph = False
use_precomputed_tables = True
tempmem = -1 # if -1, use system default
max_add = -1
use_float16 = False
use_cache = True
nnn = 10
altadd = False
I_fname = None
D_fname = None
args = sys.argv[1:]
while args:
a = args.pop(0)
if a == '-h': usage()
elif a == '-ngpu': ngpu = int(args.pop(0))
elif a == '-R': replicas = int(args.pop(0))
elif a == '-noptables': use_precomputed_tables = False
elif a == '-abs': add_batch_size = int(args.pop(0))
elif a == '-qbs': query_batch_size = int(args.pop(0))
elif a == '-nnn': nnn = int(args.pop(0))
elif a == '-tempmem': tempmem = int(args.pop(0))
elif a == '-nocache': use_cache = False
elif a == '-knngraph': knngraph = True
elif a == '-altadd': altadd = True
elif a == '-float16': use_float16 = True
elif a == '-nprobe': nprobes = [int(x) for x in args.pop(0).split(',')]
elif a == '-max_add': max_add = int(args.pop(0))
elif not dbname: dbname = a
elif not index_key: index_key = a
else:
print("argument %s unknown" % a, file=sys.stderr)
sys.exit(1)
cacheroot = '/tmp/bench_gpu_1bn'
if not os.path.isdir(cacheroot):
print("%s does not exist, creating it" % cacheroot)
os.mkdir(cacheroot)
#################################################################
# Small Utility Functions
#################################################################
# we mem-map the biggest files to avoid having them in memory all at
# once
def mmap_fvecs(fname):
x = np.memmap(fname, dtype='int32', mode='r')
d = x[0]
return x.view('float32').reshape(-1, d + 1)[:, 1:]
def mmap_bvecs(fname):
x = np.memmap(fname, dtype='uint8', mode='r')
d = x[:4].view('int32')[0]
return x.reshape(-1, d + 4)[:, 4:]
def rate_limited_imap(f, l):
"""A threaded imap that does not produce elements faster than they
are consumed"""
pool = ThreadPool(1)
res = None
for i in l:
res_next = pool.apply_async(f, (i, ))
if res:
yield res.get()
res = res_next
yield res.get()
class IdentPreproc:
"""a pre-processor is either a faiss.VectorTransform or an IndentPreproc"""
def __init__(self, d):
self.d_in = self.d_out = d
def apply_py(self, x):
return x
def sanitize(x):
""" convert array to a c-contiguous float array """
return np.ascontiguousarray(x.astype('float32'))
def dataset_iterator(x, preproc, bs):
""" iterate over the lines of x in blocks of size bs"""
nb = x.shape[0]
block_ranges = [(i0, min(nb, i0 + bs))
for i0 in range(0, nb, bs)]
def prepare_block(i01):
i0, i1 = i01
xb = sanitize(x[i0:i1])
return i0, preproc.apply_py(xb)
return rate_limited_imap(prepare_block, block_ranges)
def eval_intersection_measure(gt_I, I):
""" measure intersection measure (used for knngraph)"""
inter = 0
rank = I.shape[1]
assert gt_I.shape[1] >= rank
for q in range(nq_gt):
inter += faiss.ranklist_intersection_size(
rank, faiss.swig_ptr(gt_I[q, :]),
rank, faiss.swig_ptr(I[q, :].astype('int64')))
return inter / float(rank * nq_gt)
#################################################################
# Prepare dataset
#################################################################
print("Preparing dataset", dbname)
if dbname.startswith('SIFT'):
# SIFT1M to SIFT1000M
dbsize = int(dbname[4:-1])
xb = mmap_bvecs('bigann/bigann_base.bvecs')
xq = mmap_bvecs('bigann/bigann_query.bvecs')
xt = mmap_bvecs('bigann/bigann_learn.bvecs')
# trim xb to correct size
xb = xb[:dbsize * 1000 * 1000]
gt_I = ivecs_read('bigann/gnd/idx_%dM.ivecs' % dbsize)
elif dbname == 'Deep1B':
xb = mmap_fvecs('deep1b/base.fvecs')
xq = mmap_fvecs('deep1b/deep1B_queries.fvecs')
xt = mmap_fvecs('deep1b/learn.fvecs')
# deep1B's train is is outrageously big
xt = xt[:10 * 1000 * 1000]
gt_I = ivecs_read('deep1b/deep1B_groundtruth.ivecs')
else:
print('unknown dataset', dbname, file=sys.stderr)
sys.exit(1)
if knngraph:
# convert to knn-graph dataset
xq = xb
xt = xb
# we compute the ground-truth on this number of queries for validation
nq_gt = 10000
gt_sl = 100
# ground truth will be computed below
gt_I = None
print("sizes: B %s Q %s T %s gt %s" % (
xb.shape, xq.shape, xt.shape,
gt_I.shape if gt_I is not None else None))
#################################################################
# Parse index_key and set cache files
#
# The index_key is a valid factory key that would work, but we
# decompose the training to do it faster
#################################################################
pat = re.compile('(OPQ[0-9]+(_[0-9]+)?,|PCAR[0-9]+,)?' +
'(IVF[0-9]+),' +
'(PQ[0-9]+|Flat)')
matchobject = pat.match(index_key)
assert matchobject, 'could not parse ' + index_key
mog = matchobject.groups()
preproc_str = mog[0]
ivf_str = mog[2]
pqflat_str = mog[3]
ncent = int(ivf_str[3:])
prefix = ''
if knngraph:
gt_cachefile = '%s/BK_gt_%s.npy' % (cacheroot, dbname)
prefix = 'BK_'
# files must be kept distinct because the training set is not the
# same for the knngraph
if preproc_str:
preproc_cachefile = '%s/%spreproc_%s_%s.vectrans' % (
cacheroot, prefix, dbname, preproc_str[:-1])
else:
preproc_cachefile = None
preproc_str = ''
cent_cachefile = '%s/%scent_%s_%s%s.npy' % (
cacheroot, prefix, dbname, preproc_str, ivf_str)
index_cachefile = '%s/%s%s_%s%s,%s.index' % (
cacheroot, prefix, dbname, preproc_str, ivf_str, pqflat_str)
if not use_cache:
preproc_cachefile = None
cent_cachefile = None
index_cachefile = None
print("cachefiles:")
print(preproc_cachefile)
print(cent_cachefile)
print(index_cachefile)
#################################################################
# Wake up GPUs
#################################################################
print("preparing resources for %d GPUs" % ngpu)
gpu_resources = []
for i in range(ngpu):
res = faiss.StandardGpuResources()
if tempmem >= 0:
res.setTempMemory(tempmem)
gpu_resources.append(res)
def make_vres_vdev(i0=0, i1=-1):
" return vectors of device ids and resources useful for gpu_multiple"
vres = faiss.GpuResourcesVector()
vdev = faiss.IntVector()
if i1 == -1:
i1 = ngpu
for i in range(i0, i1):
vdev.push_back(i)
vres.push_back(gpu_resources[i])
return vres, vdev
#################################################################
# Prepare ground truth (for the knngraph)
#################################################################
def compute_GT():
print("compute GT")
t0 = time.time()
gt_I = np.zeros((nq_gt, gt_sl), dtype='int64')
gt_D = np.zeros((nq_gt, gt_sl), dtype='float32')
heaps = faiss.float_maxheap_array_t()
heaps.k = gt_sl
heaps.nh = nq_gt
heaps.val = faiss.swig_ptr(gt_D)
heaps.ids = faiss.swig_ptr(gt_I)
heaps.heapify()
bs = 10 ** 5
n, d = xb.shape
xqs = sanitize(xq[:nq_gt])
db_gt = faiss.IndexFlatL2(d)
vres, vdev = make_vres_vdev()
db_gt_gpu = faiss.index_cpu_to_gpu_multiple(
vres, vdev, db_gt)
# compute ground-truth by blocks of bs, and add to heaps
for i0, xsl in dataset_iterator(xb, IdentPreproc(d), bs):
db_gt_gpu.add(xsl)
D, I = db_gt_gpu.search(xqs, gt_sl)
I += i0
heaps.addn_with_ids(
gt_sl, faiss.swig_ptr(D), faiss.swig_ptr(I), gt_sl)
db_gt_gpu.reset()
print("\r %d/%d, %.3f s" % (i0, n, time.time() - t0), end=' ')
print()
heaps.reorder()
print("GT time: %.3f s" % (time.time() - t0))
return gt_I
if knngraph:
if gt_cachefile and os.path.exists(gt_cachefile):
print("load GT", gt_cachefile)
gt_I = np.load(gt_cachefile)
else:
gt_I = compute_GT()
if gt_cachefile:
print("store GT", gt_cachefile)
np.save(gt_cachefile, gt_I)
#################################################################
# Prepare the vector transformation object (pure CPU)
#################################################################
def train_preprocessor():
print("train preproc", preproc_str)
d = xt.shape[1]
t0 = time.time()
if preproc_str.startswith('OPQ'):
fi = preproc_str[3:-1].split('_')
m = int(fi[0])
dout = int(fi[1]) if len(fi) == 2 else d
preproc = faiss.OPQMatrix(d, m, dout)
elif preproc_str.startswith('PCAR'):
dout = int(preproc_str[4:-1])
preproc = faiss.PCAMatrix(d, dout, 0, True)
else:
assert False
preproc.train(sanitize(xt[:1000000]))
print("preproc train done in %.3f s" % (time.time() - t0))
return preproc
def get_preprocessor():
if preproc_str:
if not preproc_cachefile or not os.path.exists(preproc_cachefile):
preproc = train_preprocessor()
if preproc_cachefile:
print("store", preproc_cachefile)
faiss.write_VectorTransform(preproc, preproc_cachefile)
else:
print("load", preproc_cachefile)
preproc = faiss.read_VectorTransform(preproc_cachefile)
else:
d = xb.shape[1]
preproc = IdentPreproc(d)
return preproc
#################################################################
# Prepare the coarse quantizer
#################################################################
def train_coarse_quantizer(x, k, preproc):
d = preproc.d_out
clus = faiss.Clustering(d, k)
clus.verbose = True
# clus.niter = 2
clus.max_points_per_centroid = 10000000
print("apply preproc on shape", x.shape, 'k=', k)
t0 = time.time()
x = preproc.apply_py(sanitize(x))
print(" preproc %.3f s output shape %s" % (
time.time() - t0, x.shape))
vres, vdev = make_vres_vdev()
index = faiss.index_cpu_to_gpu_multiple(
vres, vdev, faiss.IndexFlatL2(d))
clus.train(x, index)
centroids = faiss.vector_float_to_array(clus.centroids)
return centroids.reshape(k, d)
def prepare_coarse_quantizer(preproc):
if cent_cachefile and os.path.exists(cent_cachefile):
print("load centroids", cent_cachefile)
centroids = np.load(cent_cachefile)
else:
nt = max(1000000, 256 * ncent)
print("train coarse quantizer...")
t0 = time.time()
centroids = train_coarse_quantizer(xt[:nt], ncent, preproc)
print("Coarse train time: %.3f s" % (time.time() - t0))
if cent_cachefile:
print("store centroids", cent_cachefile)
np.save(cent_cachefile, centroids)
coarse_quantizer = faiss.IndexFlatL2(preproc.d_out)
coarse_quantizer.add(centroids)
return coarse_quantizer
#################################################################
# Make index and add elements to it
#################################################################
def prepare_trained_index(preproc):
coarse_quantizer = prepare_coarse_quantizer(preproc)
d = preproc.d_out
if pqflat_str == 'Flat':
print("making an IVFFlat index")
idx_model = faiss.IndexIVFFlat(coarse_quantizer, d, ncent,
faiss.METRIC_L2)
else:
m = int(pqflat_str[2:])
assert m < 56 or use_float16, "PQ%d will work only with -float16" % m
print("making an IVFPQ index, m = ", m)
idx_model = faiss.IndexIVFPQ(coarse_quantizer, d, ncent, m, 8)
coarse_quantizer.this.disown()
idx_model.own_fields = True
# finish training on CPU
t0 = time.time()
print("Training vector codes")
x = preproc.apply_py(sanitize(xt[:1000000]))
idx_model.train(x)
print(" done %.3f s" % (time.time() - t0))
return idx_model
def compute_populated_index(preproc):
"""Add elements to a sharded index. Return the index and if available
a sharded gpu_index that contains the same data. """
indexall = prepare_trained_index(preproc)
co = faiss.GpuMultipleClonerOptions()
co.useFloat16 = use_float16
co.useFloat16CoarseQuantizer = False
co.usePrecomputed = use_precomputed_tables
co.indicesOptions = faiss.INDICES_CPU
co.verbose = True
co.reserveVecs = max_add if max_add > 0 else xb.shape[0]
co.shard = True
assert co.shard_type in (0, 1, 2)
vres, vdev = make_vres_vdev()
gpu_index = faiss.index_cpu_to_gpu_multiple(
vres, vdev, indexall, co)
print("add...")
t0 = time.time()
nb = xb.shape[0]
for i0, xs in dataset_iterator(xb, preproc, add_batch_size):
i1 = i0 + xs.shape[0]
gpu_index.add_with_ids(xs, np.arange(i0, i1))
if max_add > 0 and gpu_index.ntotal > max_add:
print("Flush indexes to CPU")
for i in range(ngpu):
index_src_gpu = faiss.downcast_index(gpu_index.at(i))
index_src = faiss.index_gpu_to_cpu(index_src_gpu)
print(" index %d size %d" % (i, index_src.ntotal))
index_src.copy_subset_to(indexall, 0, 0, nb)
index_src_gpu.reset()
index_src_gpu.reserveMemory(max_add)
gpu_index.sync_with_shard_indexes()
print('\r%d/%d (%.3f s) ' % (
i0, nb, time.time() - t0), end=' ')
sys.stdout.flush()
print("Add time: %.3f s" % (time.time() - t0))
print("Aggregate indexes to CPU")
t0 = time.time()
if hasattr(gpu_index, 'at'):
# it is a sharded index
for i in range(ngpu):
index_src = faiss.index_gpu_to_cpu(gpu_index.at(i))
print(" index %d size %d" % (i, index_src.ntotal))
index_src.copy_subset_to(indexall, 0, 0, nb)
else:
# simple index
index_src = faiss.index_gpu_to_cpu(gpu_index)
index_src.copy_subset_to(indexall, 0, 0, nb)
print(" done in %.3f s" % (time.time() - t0))
if max_add > 0:
# it does not contain all the vectors
gpu_index = None
return gpu_index, indexall
def compute_populated_index_2(preproc):
indexall = prepare_trained_index(preproc)
# set up a 3-stage pipeline that does:
# - stage 1: load + preproc
# - stage 2: assign on GPU
# - stage 3: add to index
stage1 = dataset_iterator(xb, preproc, add_batch_size)
vres, vdev = make_vres_vdev()
coarse_quantizer_gpu = faiss.index_cpu_to_gpu_multiple(
vres, vdev, indexall.quantizer)
def quantize(args):
(i0, xs) = args
_, assign = coarse_quantizer_gpu.search(xs, 1)
return i0, xs, assign.ravel()
stage2 = rate_limited_imap(quantize, stage1)
print("add...")
t0 = time.time()
nb = xb.shape[0]
for i0, xs, assign in stage2:
i1 = i0 + xs.shape[0]
if indexall.__class__ == faiss.IndexIVFPQ:
indexall.add_core_o(i1 - i0, faiss.swig_ptr(xs),
None, None, faiss.swig_ptr(assign))
elif indexall.__class__ == faiss.IndexIVFFlat:
indexall.add_core(i1 - i0, faiss.swig_ptr(xs), None,
faiss.swig_ptr(assign))
else:
assert False
print('\r%d/%d (%.3f s) ' % (
i0, nb, time.time() - t0), end=' ')
sys.stdout.flush()
print("Add time: %.3f s" % (time.time() - t0))
return None, indexall
def get_populated_index(preproc):
if not index_cachefile or not os.path.exists(index_cachefile):
if not altadd:
gpu_index, indexall = compute_populated_index(preproc)
else:
gpu_index, indexall = compute_populated_index_2(preproc)
if index_cachefile:
print("store", index_cachefile)
faiss.write_index(indexall, index_cachefile)
else:
print("load", index_cachefile)
indexall = faiss.read_index(index_cachefile)
gpu_index = None
co = faiss.GpuMultipleClonerOptions()
co.useFloat16 = use_float16
co.useFloat16CoarseQuantizer = False
co.usePrecomputed = use_precomputed_tables
co.indicesOptions = 0
co.verbose = True
co.shard = True # the replicas will be made "manually"
t0 = time.time()
print("CPU index contains %d vectors, move to GPU" % indexall.ntotal)
if replicas == 1:
if not gpu_index:
print("copying loaded index to GPUs")
vres, vdev = make_vres_vdev()
index = faiss.index_cpu_to_gpu_multiple(
vres, vdev, indexall, co)
else:
index = gpu_index
else:
del gpu_index # We override the GPU index
print("Copy CPU index to %d sharded GPU indexes" % replicas)
index = faiss.IndexReplicas()
for i in range(replicas):
gpu0 = ngpu * i / replicas
gpu1 = ngpu * (i + 1) / replicas
vres, vdev = make_vres_vdev(gpu0, gpu1)
print(" dispatch to GPUs %d:%d" % (gpu0, gpu1))
index1 = faiss.index_cpu_to_gpu_multiple(
vres, vdev, indexall, co)
index1.this.disown()
index.addIndex(index1)
index.own_fields = True
del indexall
print("move to GPU done in %.3f s" % (time.time() - t0))
return index
#################################################################
# Perform search
#################################################################
def eval_dataset(index, preproc):
ps = faiss.GpuParameterSpace()
ps.initialize(index)
nq_gt = gt_I.shape[0]
print("search...")
sl = query_batch_size
nq = xq.shape[0]
for nprobe in nprobes:
ps.set_index_parameter(index, 'nprobe', nprobe)
t0 = time.time()
if sl == 0:
D, I = index.search(preproc.apply_py(sanitize(xq)), nnn)
else:
I = np.empty((nq, nnn), dtype='int32')
D = np.empty((nq, nnn), dtype='float32')
inter_res = ''
for i0, xs in dataset_iterator(xq, preproc, sl):
print('\r%d/%d (%.3f s%s) ' % (
i0, nq, time.time() - t0, inter_res), end=' ')
sys.stdout.flush()
i1 = i0 + xs.shape[0]
Di, Ii = index.search(xs, nnn)
I[i0:i1] = Ii
D[i0:i1] = Di
if knngraph and not inter_res and i1 >= nq_gt:
ires = eval_intersection_measure(
gt_I[:, :nnn], I[:nq_gt])
inter_res = ', %.4f' % ires
t1 = time.time()
if knngraph:
ires = eval_intersection_measure(gt_I[:, :nnn], I[:nq_gt])
print(" probe=%-3d: %.3f s rank-%d intersection results: %.4f" % (
nprobe, t1 - t0, nnn, ires))
else:
print(" probe=%-3d: %.3f s" % (nprobe, t1 - t0), end=' ')
gtc = gt_I[:, :1]
nq = xq.shape[0]
for rank in 1, 10, 100:
if rank > nnn: continue
nok = (I[:, :rank] == gtc).sum()
print("1-R@%d: %.4f" % (rank, nok / float(nq)), end=' ')
print()
if I_fname:
I_fname_i = I_fname % I
print("storing", I_fname_i)
np.save(I, I_fname_i)
if D_fname:
D_fname_i = I_fname % I
print("storing", D_fname_i)
np.save(D, D_fname_i)
#################################################################
# Driver
#################################################################
preproc = get_preprocessor()
index = get_populated_index(preproc)
eval_dataset(index, preproc)
# make sure index is deleted before the resources
del index
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import time
import numpy as np
import pdb
import faiss
from datasets import load_sift1M, evaluate
print("load data")
xb, xq, xt, gt = load_sift1M()
nq, d = xq.shape
# we need only a StandardGpuResources per GPU
res = faiss.StandardGpuResources()
#################################################################
# Exact search experiment
#################################################################
print("============ Exact search")
flat_config = faiss.GpuIndexFlatConfig()
flat_config.device = 0
index = faiss.GpuIndexFlatL2(res, d, flat_config)
print("add vectors to index")
index.add(xb)
print("warmup")
index.search(xq, 123)
print("benchmark")
for lk in range(11):
k = 1 << lk
t, r = evaluate(index, xq, gt, k)
# the recall should be 1 at all times
print("k=%d %.3f ms, R@1 %.4f" % (k, t, r[1]))
#################################################################
# Approximate search experiment
#################################################################
print("============ Approximate search")
index = faiss.index_factory(d, "IVF4096,PQ64")
# faster, uses more memory
# index = faiss.index_factory(d, "IVF16384,Flat")
co = faiss.GpuClonerOptions()
# here we are using a 64-byte PQ, so we must set the lookup tables to
# 16 bit float (this is due to the limited temporary memory).
co.useFloat16 = True
index = faiss.index_cpu_to_gpu(res, 0, index, co)
print("train")
index.train(xt)
print("add vectors to index")
index.add(xb)
print("warmup")
index.search(xq, 123)
print("benchmark")
for lnprobe in range(10):
nprobe = 1 << lnprobe
index.setNumProbes(nprobe)
t, r = evaluate(index, xq, gt, 100)
print("nprobe=%4d %.3f ms recalls= %.4f %.4f %.4f" % (nprobe, t, r[1], r[10], r[100]))
/**
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <omp.h>
#include <cstdio>
#include <vector>
#include <faiss/impl/FaissAssert.h>
#include <faiss/utils/hamming.h>
#include <faiss/utils/random.h>
#include <faiss/utils/utils.h>
using namespace faiss;
template <class T>
void hamming_cpt_test(
int code_size,
uint8_t* data1,
uint8_t* data2,
int n,
int* rst) {
T computer(data1, code_size);
for (int i = 0; i < n; i++) {
rst[i] = computer.hamming(data2);
data2 += code_size;
}
}
int main() {
size_t n = 4 * 1000 * 1000;
std::vector<size_t> code_size = {128, 256, 512, 1000};
std::vector<uint8_t> x(n * code_size.back());
byte_rand(x.data(), n, 12345);
int nrun = 100;
for (size_t cs : code_size) {
printf("benchmark with code_size=%zd n=%zd nrun=%d\n", cs, n, nrun);
double tot_t1 = 0, tot_t2 = 0, tot_t3 = 0;
#pragma omp parallel reduction(+ : tot_t1, tot_t2, tot_t3)
{
std::vector<int> rst_m4(n);
std::vector<int> rst_m8(n);
std::vector<int> rst_default(n);
#pragma omp for
for (int run = 0; run < nrun; run++) {
double t0, t1, t2, t3;
t0 = getmillisecs();
// new implem from Zilliz
hamming_cpt_test<HammingComputerDefault>(
cs, x.data(), x.data(), n, rst_default.data());
t1 = getmillisecs();
// M8
hamming_cpt_test<HammingComputerM8>(
cs, x.data(), x.data(), n, rst_m8.data());
t2 = getmillisecs();
// M4
hamming_cpt_test<HammingComputerM4>(
cs, x.data(), x.data(), n, rst_m4.data());
t3 = getmillisecs();
tot_t1 += t1 - t0;
tot_t2 += t2 - t1;
tot_t3 += t3 - t2;
}
for (int i = 0; i < n; i++) {
FAISS_THROW_IF_NOT_FMT(
(rst_m4[i] == rst_m8[i] && rst_m4[i] == rst_default[i]),
"wrong result i=%d, m4 %d m8 %d default %d",
i,
rst_m4[i],
rst_m8[i],
rst_default[i]);
}
}
printf("Hamming_Dft implem: %.3f ms\n", tot_t1 / nrun);
printf("Hamming_M8 implem: %.3f ms\n", tot_t2 / nrun);
printf("Hamming_M4 implem: %.3f ms\n", tot_t3 / nrun);
}
return 0;
}
/**
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <omp.h>
#include <cstdio>
#include <faiss/impl/FaissAssert.h>
#include <faiss/utils/Heap.h>
#include <faiss/utils/random.h>
#include <faiss/utils/utils.h>
using namespace faiss;
void addn_default(
size_t n,
size_t k,
const float* x,
int64_t* heap_ids,
float* heap_val) {
for (size_t i = 0; i < k; i++) {
minheap_push(i + 1, heap_val, heap_ids, x[i], i);
}
for (size_t i = k; i < n; i++) {
if (x[i] > heap_val[0]) {
minheap_pop(k, heap_val, heap_ids);
minheap_push(k, heap_val, heap_ids, x[i], i);
}
}
minheap_reorder(k, heap_val, heap_ids);
}
void addn_replace(
size_t n,
size_t k,
const float* x,
int64_t* heap_ids,
float* heap_val) {
for (size_t i = 0; i < k; i++) {
minheap_push(i + 1, heap_val, heap_ids, x[i], i);
}
for (size_t i = k; i < n; i++) {
if (x[i] > heap_val[0]) {
minheap_replace_top(k, heap_val, heap_ids, x[i], i);
}
}
minheap_reorder(k, heap_val, heap_ids);
}
void addn_func(
size_t n,
size_t k,
const float* x,
int64_t* heap_ids,
float* heap_val) {
minheap_heapify(k, heap_val, heap_ids);
minheap_addn(k, heap_val, heap_ids, x, nullptr, n);
minheap_reorder(k, heap_val, heap_ids);
}
int main() {
size_t n = 10 * 1000 * 1000;
std::vector<size_t> ks({20, 50, 100, 200, 500, 1000, 2000, 5000});
std::vector<float> x(n);
float_randn(x.data(), n, 12345);
int nrun = 100;
for (size_t k : ks) {
printf("benchmark with k=%zd n=%zd nrun=%d\n", k, n, nrun);
FAISS_THROW_IF_NOT(k < n);
double tot_t1 = 0, tot_t2 = 0, tot_t3 = 0;
#pragma omp parallel reduction(+ : tot_t1, tot_t2, tot_t3)
{
std::vector<float> heap_dis(k);
std::vector<float> heap_dis_2(k);
std::vector<float> heap_dis_3(k);
std::vector<int64_t> heap_ids(k);
std::vector<int64_t> heap_ids_2(k);
std::vector<int64_t> heap_ids_3(k);
#pragma omp for
for (int run = 0; run < nrun; run++) {
double t0, t1, t2, t3;
t0 = getmillisecs();
// default implem
addn_default(n, k, x.data(), heap_ids.data(), heap_dis.data());
t1 = getmillisecs();
// new implem from Zilliz
addn_replace(
n, k, x.data(), heap_ids_2.data(), heap_dis_2.data());
t2 = getmillisecs();
// with addn
addn_func(n, k, x.data(), heap_ids_3.data(), heap_dis_3.data());
t3 = getmillisecs();
tot_t1 += t1 - t0;
tot_t2 += t2 - t1;
tot_t3 += t3 - t2;
}
for (size_t i = 0; i < k; i++) {
FAISS_THROW_IF_NOT_FMT(
heap_ids[i] == heap_ids_2[i],
"i=%ld (%ld, %g) != (%ld, %g)",
i,
size_t(heap_ids[i]),
heap_dis[i],
size_t(heap_ids_2[i]),
heap_dis_2[i]);
FAISS_THROW_IF_NOT(heap_dis[i] == heap_dis_2[i]);
}
for (size_t i = 0; i < k; i++) {
FAISS_THROW_IF_NOT(heap_ids[i] == heap_ids_3[i]);
FAISS_THROW_IF_NOT(heap_dis[i] == heap_dis_3[i]);
}
}
printf("default implem: %.3f ms\n", tot_t1 / nrun);
printf("replace implem: %.3f ms\n", tot_t2 / nrun);
printf("addn implem: %.3f ms\n", tot_t3 / nrun);
}
return 0;
}
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import time
import sys
import numpy as np
import faiss
try:
from faiss.contrib.datasets_fb import DatasetSIFT1M
except ImportError:
from faiss.contrib.datasets import DatasetSIFT1M
# from datasets import load_sift1M
k = int(sys.argv[1])
todo = sys.argv[1:]
print("load data")
# xb, xq, xt, gt = load_sift1M()
ds = DatasetSIFT1M()
xq = ds.get_queries()
xb = ds.get_database()
gt = ds.get_groundtruth()
xt = ds.get_train()
nq, d = xq.shape
if todo == []:
todo = 'hnsw hnsw_sq ivf ivf_hnsw_quantizer kmeans kmeans_hnsw nsg'.split()
def evaluate(index):
# for timing with a single core
# faiss.omp_set_num_threads(1)
t0 = time.time()
D, I = index.search(xq, k)
t1 = time.time()
missing_rate = (I == -1).sum() / float(k * nq)
recall_at_1 = (I == gt[:, :1]).sum() / float(nq)
print("\t %7.3f ms per query, R@1 %.4f, missing rate %.4f" % (
(t1 - t0) * 1000.0 / nq, recall_at_1, missing_rate))
if 'hnsw' in todo:
print("Testing HNSW Flat")
index = faiss.IndexHNSWFlat(d, 32)
# training is not needed
# this is the default, higher is more accurate and slower to
# construct
index.hnsw.efConstruction = 40
print("add")
# to see progress
index.verbose = True
index.add(xb)
print("search")
for efSearch in 16, 32, 64, 128, 256:
for bounded_queue in [True, False]:
print("efSearch", efSearch, "bounded queue", bounded_queue, end=' ')
index.hnsw.search_bounded_queue = bounded_queue
index.hnsw.efSearch = efSearch
evaluate(index)
if 'hnsw_sq' in todo:
print("Testing HNSW with a scalar quantizer")
# also set M so that the vectors and links both use 128 bytes per
# entry (total 256 bytes)
index = faiss.IndexHNSWSQ(d, faiss.ScalarQuantizer.QT_8bit, 16)
print("training")
# training for the scalar quantizer
index.train(xt)
# this is the default, higher is more accurate and slower to
# construct
index.hnsw.efConstruction = 40
print("add")
# to see progress
index.verbose = True
index.add(xb)
print("search")
for efSearch in 16, 32, 64, 128, 256:
print("efSearch", efSearch, end=' ')
index.hnsw.efSearch = efSearch
evaluate(index)
if 'ivf' in todo:
print("Testing IVF Flat (baseline)")
quantizer = faiss.IndexFlatL2(d)
index = faiss.IndexIVFFlat(quantizer, d, 16384)
index.cp.min_points_per_centroid = 5 # quiet warning
# to see progress
index.verbose = True
print("training")
index.train(xt)
print("add")
index.add(xb)
print("search")
for nprobe in 1, 4, 16, 64, 256:
print("nprobe", nprobe, end=' ')
index.nprobe = nprobe
evaluate(index)
if 'ivf_hnsw_quantizer' in todo:
print("Testing IVF Flat with HNSW quantizer")
quantizer = faiss.IndexHNSWFlat(d, 32)
index = faiss.IndexIVFFlat(quantizer, d, 16384)
index.cp.min_points_per_centroid = 5 # quiet warning
index.quantizer_trains_alone = 2
# to see progress
index.verbose = True
print("training")
index.train(xt)
print("add")
index.add(xb)
print("search")
quantizer.hnsw.efSearch = 64
for nprobe in 1, 4, 16, 64, 256:
print("nprobe", nprobe, end=' ')
index.nprobe = nprobe
evaluate(index)
# Bonus: 2 kmeans tests
if 'kmeans' in todo:
print("Performing kmeans on sift1M database vectors (baseline)")
clus = faiss.Clustering(d, 16384)
clus.verbose = True
clus.niter = 10
index = faiss.IndexFlatL2(d)
clus.train(xb, index)
if 'kmeans_hnsw' in todo:
print("Performing kmeans on sift1M using HNSW assignment")
clus = faiss.Clustering(d, 16384)
clus.verbose = True
clus.niter = 10
index = faiss.IndexHNSWFlat(d, 32)
# increase the default efSearch, otherwise the number of empty
# clusters is too high.
index.hnsw.efSearch = 128
clus.train(xb, index)
if 'nsg' in todo:
print("Testing NSG Flat")
index = faiss.IndexNSGFlat(d, 32)
index.build_type = 1
# training is not needed
# this is the default, higher is more accurate and slower to
# construct
print("add")
# to see progress
index.verbose = True
index.add(xb)
print("search")
for search_L in -1, 16, 32, 64, 128, 256:
print("search_L", search_L, end=' ')
index.nsg.search_L = search_L
evaluate(index)
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import time
import os
import numpy as np
import faiss
from faiss.contrib.datasets import SyntheticDataset
os.system("grep -m1 'model name' < /proc/cpuinfo")
def format_tab(x):
return "\n".join("\t".join("%g" % xi for xi in row) for row in x)
faiss.cvar.distance_compute_min_k_reservoir = 5
# for have_threads in True, False:
for have_threads in False, :
if have_threads:
# good config for Intel(R) Xeon(R) CPU E5-2698 v4 @ 2.20GHz
nthread = 32
else:
nthread = 1
faiss.omp_set_num_threads(nthread)
print("************ nthread=", nthread)
for nq in 100, 10000:
print("*********** nq=", nq)
if nq == 100:
nrun = 500
unit = "ms"
else:
nrun = 20
unit = "s"
restab = []
for d in 16, 32, 64, 128:
print("========== d=", d)
nb = 10000
# d = 32
ds = SyntheticDataset(d, 0, nb, nq)
print(ds)
index = faiss.IndexFlatL2(d)
index.add(ds.get_database())
nrun = 10
restab1 = []
restab.append(restab1)
for k in 1, 10, 100:
times = []
for run in range(nrun):
t0 = time.time()
index.search(ds.get_queries(), k)
t1 = time.time()
if run >= nrun // 5: # the rest is considered warmup
times.append((t1 - t0))
times = np.array(times)
if unit == "ms":
times *= 1000
print("search k=%3d t=%.3f ms (± %.4f)" % (
k, np.mean(times), np.std(times)))
else:
print("search k=%3d t=%.3f s (± %.4f)" % (
k, np.mean(times), np.std(times)))
restab1.append(np.mean(times))
print("restab=\n", format_tab(restab))
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import print_function
import faiss
from datasets import load_sift1M, evaluate
xb, xq, xt, gt = load_sift1M()
nq, d = xq.shape
k = 32
for nbits in 4, 6, 8, 10, 12:
index = faiss.IndexPQ(d, 8, nbits)
index.train(xt)
index.add(xb)
t, r = evaluate(index, xq, gt, k)
print("\t %7.3f ms per query, R@1 %.4f" % (t, r[1]))
del index
#! /usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""small test script to benchmark the SIMD implementation of the
distance computations for the additional metrics. Call eg. with L1 to
get L1 distance computations.
"""
import faiss
import sys
import time
d = 64
nq = 4096
nb = 16384
print("sample")
xq = faiss.randn((nq, d), 123)
xb = faiss.randn((nb, d), 123)
mt_name = "L2" if len(sys.argv) < 2 else sys.argv[1]
mt = getattr(faiss, "METRIC_" + mt_name)
print("distances")
t0 = time.time()
dis = faiss.pairwise_distances(xq, xb, mt)
t1 = time.time()
print("nq=%d nb=%d d=%d %s: %.3f s" % (nq, nb, d, mt_name, t1 - t0))
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import time
import faiss
import numpy as np
def do_partition(n, qin, maxval=65536, seed=123, id_type='int64'):
print(
f"n={n} qin={qin} maxval={maxval} id_type={id_type} ",
end="\t", flush=True
)
# print("seed=", seed)
rs = np.random.RandomState(seed)
vals = rs.randint(maxval, size=n).astype('uint16')
ids = (rs.permutation(n) + 12345).astype(id_type)
sp = faiss.swig_ptr
tab_a = faiss.AlignedTableUint16()
faiss.copy_array_to_AlignedTable(vals, tab_a)
nrun = 2000
times = []
nerr = 0
stats = faiss.cvar.partition_stats
stats.reset()
for _run in range(nrun):
faiss.copy_array_to_AlignedTable(vals, tab_a)
t0 = time.time()
# print("tab a type", tab_a.get())
if type(qin) == int:
q = qin
faiss.CMax_uint16_partition_fuzzy(
tab_a.get(), sp(ids), n, q, q, None)
else:
q_min, q_max = qin
q = np.array([-1], dtype='uint64')
faiss.CMax_uint16_partition_fuzzy(
tab_a.get(), sp(ids), n,
q_min, q_max, sp(q)
)
q = q[0]
if not (q_min <= q <= q_max):
nerr += 1
t1 = time.time()
times.append(t1 - t0)
times = np.array(times[100:]) * 1000000
print(
f"times {times.mean():.3f} µs (± {times.std():.4f} µs) nerr={nerr} "
f"bissect {stats.bissect_cycles / 1e6:.3f} Mcy "
f"compress {stats.compress_cycles / 1e6:.3f} Mcy"
)
do_partition(200, (100, 100))
do_partition(200, (100, 150))
do_partition(2000, (1000, 1000))
do_partition(2000, (1000, 1500))
do_partition(20000, (10000, 10000))
do_partition(20000, (10000, 15000))
do_partition(200, (100, 100), id_type='int32')
do_partition(200, (100, 150), id_type='int32')
do_partition(2000, (1000, 1000), id_type='int32')
do_partition(2000, (1000, 1500), id_type='int32')
do_partition(20000, (10000, 10000), id_type='int32')
do_partition(20000, (10000, 15000), id_type='int32')
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import sys
import time
import numpy as np
import re
import faiss
from multiprocessing.dummy import Pool as ThreadPool
from datasets import ivecs_read
# we mem-map the biggest files to avoid having them in memory all at
# once
def mmap_fvecs(fname):
x = np.memmap(fname, dtype='int32', mode='r')
d = x[0]
return x.view('float32').reshape(-1, d + 1)[:, 1:]
def mmap_bvecs(fname):
x = np.memmap(fname, dtype='uint8', mode='r')
d = x[:4].view('int32')[0]
return x.reshape(-1, d + 4)[:, 4:]
#################################################################
# Bookkeeping
#################################################################
dbname = sys.argv[1]
index_key = sys.argv[2]
parametersets = sys.argv[3:]
tmpdir = '/tmp/bench_polysemous'
if not os.path.isdir(tmpdir):
print("%s does not exist, creating it" % tmpdir)
os.mkdir(tmpdir)
#################################################################
# Prepare dataset
#################################################################
print("Preparing dataset", dbname)
if dbname.startswith('SIFT'):
# SIFT1M to SIFT1000M
dbsize = int(dbname[4:-1])
xb = mmap_bvecs('bigann/bigann_base.bvecs')
xq = mmap_bvecs('bigann/bigann_query.bvecs')
xt = mmap_bvecs('bigann/bigann_learn.bvecs')
# trim xb to correct size
xb = xb[:dbsize * 1000 * 1000]
gt = ivecs_read('bigann/gnd/idx_%dM.ivecs' % dbsize)
elif dbname == 'Deep1B':
xb = mmap_fvecs('deep1b/base.fvecs')
xq = mmap_fvecs('deep1b/deep1B_queries.fvecs')
xt = mmap_fvecs('deep1b/learn.fvecs')
# deep1B's train is is outrageously big
xt = xt[:10 * 1000 * 1000]
gt = ivecs_read('deep1b/deep1B_groundtruth.ivecs')
else:
print('unknown dataset', dbname, file=sys.stderr)
sys.exit(1)
print("sizes: B %s Q %s T %s gt %s" % (
xb.shape, xq.shape, xt.shape, gt.shape))
nq, d = xq.shape
nb, d = xb.shape
assert gt.shape[0] == nq
#################################################################
# Training
#################################################################
def choose_train_size(index_key):
# some training vectors for PQ and the PCA
n_train = 256 * 1000
if "IVF" in index_key:
matches = re.findall('IVF([0-9]+)', index_key)
ncentroids = int(matches[0])
n_train = max(n_train, 100 * ncentroids)
elif "IMI" in index_key:
matches = re.findall('IMI2x([0-9]+)', index_key)
nbit = int(matches[0])
n_train = max(n_train, 256 * (1 << nbit))
return n_train
def get_trained_index():
filename = "%s/%s_%s_trained.index" % (
tmpdir, dbname, index_key)
if not os.path.exists(filename):
index = faiss.index_factory(d, index_key)
n_train = choose_train_size(index_key)
xtsub = xt[:n_train]
print("Keeping %d train vectors" % xtsub.shape[0])
# make sure the data is actually in RAM and in float
xtsub = xtsub.astype('float32').copy()
index.verbose = True
t0 = time.time()
index.train(xtsub)
index.verbose = False
print("train done in %.3f s" % (time.time() - t0))
print("storing", filename)
faiss.write_index(index, filename)
else:
print("loading", filename)
index = faiss.read_index(filename)
return index
#################################################################
# Adding vectors to dataset
#################################################################
def rate_limited_imap(f, l):
'a thread pre-processes the next element'
pool = ThreadPool(1)
res = None
for i in l:
res_next = pool.apply_async(f, (i, ))
if res:
yield res.get()
res = res_next
yield res.get()
def matrix_slice_iterator(x, bs):
" iterate over the lines of x in blocks of size bs"
nb = x.shape[0]
block_ranges = [(i0, min(nb, i0 + bs))
for i0 in range(0, nb, bs)]
return rate_limited_imap(
lambda i01: x[i01[0]:i01[1]].astype('float32').copy(),
block_ranges)
def get_populated_index():
filename = "%s/%s_%s_populated.index" % (
tmpdir, dbname, index_key)
if not os.path.exists(filename):
index = get_trained_index()
i0 = 0
t0 = time.time()
for xs in matrix_slice_iterator(xb, 100000):
i1 = i0 + xs.shape[0]
print('\radd %d:%d, %.3f s' % (i0, i1, time.time() - t0), end=' ')
sys.stdout.flush()
index.add(xs)
i0 = i1
print()
print("Add done in %.3f s" % (time.time() - t0))
print("storing", filename)
faiss.write_index(index, filename)
else:
print("loading", filename)
index = faiss.read_index(filename)
return index
#################################################################
# Perform searches
#################################################################
index = get_populated_index()
ps = faiss.ParameterSpace()
ps.initialize(index)
# make sure queries are in RAM
xq = xq.astype('float32').copy()
# a static C++ object that collects statistics about searches
ivfpq_stats = faiss.cvar.indexIVFPQ_stats
ivf_stats = faiss.cvar.indexIVF_stats
if parametersets == ['autotune'] or parametersets == ['autotuneMT']:
if parametersets == ['autotune']:
faiss.omp_set_num_threads(1)
# setup the Criterion object: optimize for 1-R@1
crit = faiss.OneRecallAtRCriterion(nq, 1)
# by default, the criterion will request only 1 NN
crit.nnn = 100
crit.set_groundtruth(None, gt.astype('int64'))
# then we let Faiss find the optimal parameters by itself
print("exploring operating points")
t0 = time.time()
op = ps.explore(index, xq, crit)
print("Done in %.3f s, available OPs:" % (time.time() - t0))
# opv is a C++ vector, so it cannot be accessed like a Python array
opv = op.optimal_pts
print("%-40s 1-R@1 time" % "Parameters")
for i in range(opv.size()):
opt = opv.at(i)
print("%-40s %.4f %7.3f" % (opt.key, opt.perf, opt.t))
else:
# we do queries in a single thread
faiss.omp_set_num_threads(1)
print(' ' * len(parametersets[0]), '\t', 'R@1 R@10 R@100 time %pass')
for param in parametersets:
print(param, '\t', end=' ')
sys.stdout.flush()
ps.set_index_parameters(index, param)
t0 = time.time()
ivfpq_stats.reset()
ivf_stats.reset()
D, I = index.search(xq, 100)
t1 = time.time()
for rank in 1, 10, 100:
n_ok = (I[:, :rank] == gt[:, :1]).sum()
print("%.4f" % (n_ok / float(nq)), end=' ')
print("%8.3f " % ((t1 - t0) * 1000.0 / nq), end=' ')
print("%5.2f" % (ivfpq_stats.n_hamming_pass * 100.0 / ivf_stats.ndis))
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import print_function
import time
import numpy as np
import faiss
from datasets import load_sift1M, evaluate
print("load data")
xb, xq, xt, gt = load_sift1M()
nq, d = xq.shape
# index with 16 subquantizers, 8 bit each
index = faiss.IndexPQ(d, 16, 8)
index.do_polysemous_training = True
index.verbose = True
print("train")
index.train(xt)
print("add vectors to index")
index.add(xb)
nt = 1
faiss.omp_set_num_threads(1)
print("PQ baseline", end=' ')
index.search_type = faiss.IndexPQ.ST_PQ
t, r = evaluate(index, xq, gt, 1)
print("\t %7.3f ms per query, R@1 %.4f" % (t, r[1]))
for ht in 64, 62, 58, 54, 50, 46, 42, 38, 34, 30:
print("Polysemous", ht, end=' ')
index.search_type = faiss.IndexPQ.ST_polysemous
index.polysemous_ht = ht
t, r = evaluate(index, xq, gt, 1)
print("\t %7.3f ms per query, R@1 %.4f" % (t, r[1]))
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import time
import os
import numpy as np
import faiss
os.system("grep -m1 'model name' < /proc/cpuinfo")
def format_tab(x):
return "\n".join("\t".join("%g" % xi for xi in row) for row in x)
def run_bench(d, dsub, nbit=8, metric=None):
M = d // dsub
pq = faiss.ProductQuantizer(d, M, nbit)
pq.train(faiss.randn((max(1000, pq.ksub * 50), d), 123))
sp = faiss.swig_ptr
times = []
nrun = 100
print(f"d={d} dsub={dsub} ksub={pq.ksub}", end="\t")
res = []
for nx in 1, 10, 100:
x = faiss.randn((nx, d), 555)
times = []
for run in range(nrun):
t0 = time.time()
new_tab = np.zeros((nx, M, pq.ksub), "float32")
if metric == faiss.METRIC_INNER_PRODUCT:
pq.compute_inner_prod_tables(nx, sp(x), sp(new_tab))
elif metric == faiss.METRIC_L2:
pq.compute_distance_tables(nx, sp(x), sp(new_tab))
else:
assert False
t1 = time.time()
if run >= nrun // 5: # the rest is considered warmup
times.append((t1 - t0))
times = np.array(times) * 1000
print(f"nx={nx}: {np.mean(times):.3f} ms (± {np.std(times):.4f})",
end="\t")
res.append(times.mean())
print()
return res
# for have_threads in True, False:
for have_threads in False, True:
if have_threads:
# good config for Intel(R) Xeon(R) CPU E5-2698 v4 @ 2.20GHz
nthread = 32
else:
nthread = 1
faiss.omp_set_num_threads(nthread)
for metric in faiss.METRIC_INNER_PRODUCT, faiss.METRIC_L2:
print("============= nthread=", nthread, "metric=", metric)
allres = []
for dsub in 2, 4, 8:
for nbit in 4, 8:
for M in 8, 20:
res = run_bench(M * dsub, dsub, nbit, metric)
allres.append(res)
allres = np.array(allres)
print("formated result:")
print(format_tab(allres))
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import sys
import faiss
import time
import numpy as np
try:
from faiss.contrib.datasets_fb import \
DatasetSIFT1M, DatasetDeep1B, DatasetBigANN
except ImportError:
from faiss.contrib.datasets import \
DatasetSIFT1M, DatasetDeep1B, DatasetBigANN
def eval_codec(q, xq, xb, gt):
t0 = time.time()
codes = q.compute_codes(xb)
t1 = time.time()
xb_decoded = q.decode(codes)
recons_err = ((xb - xb_decoded) ** 2).sum() / xb.shape[0]
# for compatibility with the codec benchmarks
err_compat = np.linalg.norm(xb - xb_decoded, axis=1).mean()
xq_decoded = q.decode(q.compute_codes(xq))
D, I = faiss.knn(xq_decoded, xb_decoded, 1)
recall = (I[:, 0] == gt[:, 0]).sum() / nq
print(
f"\tencode time: {t1 - t0:.3f} reconstruction error: {recons_err:.3f} "
f"1-recall@1: {recall:.4f} recons_err_compat {err_compat:.3f}")
def eval_quantizer(q, xq, xb, gt, xt, variants=None):
if variants is None:
variants = [(None, None)]
t0 = time.time()
q.train(xt)
t1 = time.time()
train_t = t1 - t0
print(f'\ttraining time: {train_t:.3f} s')
for name, val in variants:
if name is not None:
print(f"{name}={val}")
getattr(q, name) # make sure field exists
setattr(q, name, val)
eval_codec(q, xq, xb, gt)
todo = sys.argv[1:]
if len(todo) > 0 and "deep1M" in todo[0]:
ds = DatasetDeep1B(10**6)
del todo[0]
elif len(todo) > 0 and "bigann1M" in todo[0]:
ds = DatasetBigANN(nb_M=1)
del todo[0]
else:
ds = DatasetSIFT1M()
if len(todo) > 0:
if "x" in todo[0]:
M, nbits = todo[0].split("x")
M = int(M)
nbits = int(nbits)
del todo[0]
maxtrain = max(100 << nbits, 10**5)
print(f"eval on {M}x{nbits} maxtrain={maxtrain}")
xq = ds.get_queries()
xb = ds.get_database()
gt = ds.get_groundtruth()
xt = ds.get_train(maxtrain=maxtrain)
nb, d = xb.shape
nq, d = xq.shape
nt, d = xt.shape
# fastest to slowest
if 'lsq-gpu' in todo:
lsq = faiss.LocalSearchQuantizer(d, M, nbits)
ngpus = faiss.get_num_gpus()
lsq.icm_encoder_factory = faiss.GpuIcmEncoderFactory(ngpus)
lsq.verbose = True
eval_quantizer(lsq, xb, xt, 'lsq-gpu')
if 'pq' in todo:
pq = faiss.ProductQuantizer(d, M, nbits)
print("===== PQ")
eval_quantizer(pq, xq, xb, gt, xt)
if 'opq' in todo:
d2 = ((d + M - 1) // M) * M
print("OPQ d2=", d2)
opq = faiss.OPQMatrix(d, M, d2)
opq.train(xt)
xq2 = opq.apply(xq)
xb2 = opq.apply(xb)
xt2 = opq.apply(xt)
pq = faiss.ProductQuantizer(d2, M, nbits)
print("===== PQ")
eval_quantizer(pq, xq2, xb2, gt, xt2)
if 'rq' in todo:
print("===== RQ")
rq = faiss.ResidualQuantizer(d, M, nbits, )
rq.max_beam_size
rq.max_beam_size = 30 # for compatibility with older runs
# rq.train_type = faiss.ResidualQuantizer.Train_default
# rq.verbose = True
variants = [("max_beam_size", i) for i in (1, 2, 4, 8, 16, 32)]
eval_quantizer(rq, xq, xb, gt, xt, variants=variants)
if 'rq_lut' in todo:
print("===== RQ")
rq = faiss.ResidualQuantizer(d, M, nbits, )
rq.max_beam_size
rq.max_beam_size = 30 # for compatibility with older runs
rq.use_beam_LUT
rq.use_beam_LUT = 1
# rq.train_type = faiss.ResidualQuantizer.Train_default
# rq.verbose = True
variants = [("max_beam_size", i) for i in (1, 2, 4, 8, 16, 32, 64)]
eval_quantizer(rq, xq, xb, gt, xt, variants=variants)
if 'lsq' in todo:
print("===== LSQ")
lsq = faiss.LocalSearchQuantizer(d, M, nbits)
lsq.verbose = True
variants = [("encode_ils_iters", i) for i in (2, 3, 4, 8, 16)]
eval_quantizer(lsq, xq, xb, gt, xt, variants=variants)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import time
import numpy as np
import faiss
from datasets import load_sift1M
print("load data")
xb, xq, xt, gt = load_sift1M()
nq, d = xq.shape
ncent = 256
variants = [(name, getattr(faiss.ScalarQuantizer, name))
for name in dir(faiss.ScalarQuantizer)
if name.startswith('QT_')]
quantizer = faiss.IndexFlatL2(d)
# quantizer.add(np.zeros((1, d), dtype='float32'))
if False:
for name, qtype in [('flat', 0)] + variants:
print("============== test", name)
t0 = time.time()
if name == 'flat':
index = faiss.IndexIVFFlat(quantizer, d, ncent,
faiss.METRIC_L2)
else:
index = faiss.IndexIVFScalarQuantizer(quantizer, d, ncent,
qtype, faiss.METRIC_L2)
index.nprobe = 16
print("[%.3f s] train" % (time.time() - t0))
index.train(xt)
print("[%.3f s] add" % (time.time() - t0))
index.add(xb)
print("[%.3f s] search" % (time.time() - t0))
D, I = index.search(xq, 100)
print("[%.3f s] eval" % (time.time() - t0))
for rank in 1, 10, 100:
n_ok = (I[:, :rank] == gt[:, :1]).sum()
print("%.4f" % (n_ok / float(nq)), end=' ')
print()
if True:
for name, qtype in variants:
print("============== test", name)
for rsname, vals in [('RS_minmax',
[-0.4, -0.2, -0.1, -0.05, 0.0, 0.1, 0.5]),
('RS_meanstd', [0.8, 1.0, 1.5, 2.0, 3.0, 5.0, 10.0]),
('RS_quantiles', [0.02, 0.05, 0.1, 0.15]),
('RS_optim', [0.0])]:
for val in vals:
print("%-15s %5g " % (rsname, val), end=' ')
index = faiss.IndexIVFScalarQuantizer(quantizer, d, ncent,
qtype, faiss.METRIC_L2)
index.nprobe = 16
index.sq.rangestat = getattr(faiss.ScalarQuantizer,
rsname)
index.rangestat_arg = val
index.train(xt)
index.add(xb)
t0 = time.time()
D, I = index.search(xq, 100)
t1 = time.time()
for rank in 1, 10, 100:
n_ok = (I[:, :rank] == gt[:, :1]).sum()
print("%.4f" % (n_ok / float(nq)), end=' ')
print(" %.3f s" % (t1 - t0))
#! /usr/bin/env python2
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import print_function
import numpy as np
import faiss
import time
swig_ptr = faiss.swig_ptr
if False:
a = np.arange(10, 14).astype('float32')
b = np.arange(20, 24).astype('float32')
faiss.fvec_inner_product (swig_ptr(a), swig_ptr(b), 4)
1/0
xd = 100
yd = 1000000
np.random.seed(1234)
faiss.omp_set_num_threads(1)
print('xd=%d yd=%d' % (xd, yd))
print('Running inner products test..')
for d in 3, 4, 12, 36, 64:
x = faiss.rand(xd * d).reshape(xd, d)
y = faiss.rand(yd * d).reshape(yd, d)
distances = np.empty((xd, yd), dtype='float32')
t0 = time.time()
for i in range(xd):
faiss.fvec_inner_products_ny(swig_ptr(distances[i]),
swig_ptr(x[i]),
swig_ptr(y),
d, yd)
t1 = time.time()
# sparse verification
ntry = 100
num, denom = 0, 0
for t in range(ntry):
xi = np.random.randint(xd)
yi = np.random.randint(yd)
num += abs(distances[xi, yi] - np.dot(x[xi], y[yi]))
denom += abs(distances[xi, yi])
print('d=%d t=%.3f s diff=%g' % (d, t1 - t0, num / denom))
print('Running L2sqr test..')
for d in 3, 4, 12, 36, 64:
x = faiss.rand(xd * d).reshape(xd, d)
y = faiss.rand(yd * d).reshape(yd, d)
distances = np.empty((xd, yd), dtype='float32')
t0 = time.time()
for i in range(xd):
faiss.fvec_L2sqr_ny(swig_ptr(distances[i]),
swig_ptr(x[i]),
swig_ptr(y),
d, yd)
t1 = time.time()
# sparse verification
ntry = 100
num, denom = 0, 0
for t in range(ntry):
xi = np.random.randint(xd)
yi = np.random.randint(yd)
num += abs(distances[xi, yi] - np.sum((x[xi] - y[yi]) ** 2))
denom += abs(distances[xi, yi])
print('d=%d t=%.3f s diff=%g' % (d, t1 - t0, num / denom))
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import print_function
import sys
import time
import numpy as np
def ivecs_read(fname):
a = np.fromfile(fname, dtype='int32')
d = a[0]
return a.reshape(-1, d + 1)[:, 1:].copy()
def fvecs_read(fname):
return ivecs_read(fname).view('float32')
def load_sift1M():
print("Loading sift1M...", end='', file=sys.stderr)
xt = fvecs_read("sift1M/sift_learn.fvecs")
xb = fvecs_read("sift1M/sift_base.fvecs")
xq = fvecs_read("sift1M/sift_query.fvecs")
gt = ivecs_read("sift1M/sift_groundtruth.ivecs")
print("done", file=sys.stderr)
return xb, xq, xt, gt
def evaluate(index, xq, gt, k):
nq = xq.shape[0]
t0 = time.time()
D, I = index.search(xq, k) # noqa: E741
t1 = time.time()
recalls = {}
i = 1
while i <= k:
recalls[i] = (I[:, :i] == gt[:, :1]).sum() / float(nq)
i *= 10
return (t1 - t0) * 1000.0 / nq, recalls
# Distributed on-disk index for 1T-scale datasets
This is code corresponding to the description in [Indexing 1T vectors](https://github.com/facebookresearch/faiss/wiki/Indexing-1T-vectors).
All the code is in python 3 (and not compatible with Python 2).
The current code uses the Deep1B dataset for demonstration purposes, but can scale to 1000x larger.
To run it, download the Deep1B dataset as explained [here](../#getting-deep1b), and edit paths to the dataset in the scripts.
The cluster commands are written for the Slurm batch scheduling system.
Hopefully, changing to another type of scheduler should be quite straightforward.
## Distributed k-means
To cluster 500M vectors to 10M centroids, it is useful to have a distriubuted k-means implementation.
The distribution simply consists in splitting the training vectors across machines (servers) and have them do the assignment.
The master/client then synthesizes the results and updates the centroids.
The distributed k-means implementation here is based on 3 files:
- [`rpc.py`](rpc.py) is a very simple remote procedure call implementation based on sockets and pickle.
It exposes the methods of an object on the server side so that they can be called from the client as if the object was local.
- [`distributed_kmeans.py`](distributed_kmeans.py) contains the k-means implementation.
The main loop of k-means is re-implemented in python but follows closely the Faiss C++ implementation, and should not be significantly less efficient.
It relies on a `DatasetAssign` object that does the assignement to centrtoids, which is the bulk of the computation.
The object can be a Faiss CPU index, a GPU index or a set of remote GPU or CPU indexes.
- [`run_on_cluster.bash`](run_on_cluster.bash) contains the shell code to run the distributed k-means on a cluster.
The distributed k-means works with a Python install that contains faiss and scipy (for sparse matrices).
It clusters the training data of Deep1B, this can be changed easily to any file in fvecs, bvecs or npy format that contains the training set.
The training vectors may be too large to fit in RAM, but they are memory-mapped so that should not be a problem.
The file is also assumed to be accessible from all server machines with eg. a distributed file system.
### Local tests
Edit `distibuted_kmeans.py` to point `testdata` to your local copy of the dataset.
Then, 4 levels of sanity check can be run:
```bash
# reference Faiss C++ run
python distributed_kmeans.py --test 0
# using the Python implementation
python distributed_kmeans.py --test 1
# use the dispatch object (on local datasets)
python distributed_kmeans.py --test 2
# same, with GPUs
python distributed_kmeans.py --test 3
```
The output should look like [This gist](https://gist.github.com/mdouze/ffa01fe666a9325761266fe55ead72ad).
### Distributed sanity check
To run the distributed k-means, `distibuted_kmeans.py` has to be run both on the servers (`--server` option) and client sides (`--client` option).
Edit the top of `run_on_cluster.bash` to set the path of the data to cluster.
Sanity checks can be run with
```bash
# non distributed baseline
bash run_on_cluster.bash test_kmeans_0
# using all the machine's GPUs
bash run_on_cluster.bash test_kmeans_1
# distrbuted run, with one local server per GPU
bash run_on_cluster.bash test_kmeans_2
```
The test `test_kmeans_2` simulates a distributed run on a single machine by starting one server process per GPU and connecting to the servers via the rpc protocol.
The output should look like [this gist](https://gist.github.com/mdouze/5b2dc69b74579ecff04e1686a277d32e).
### Distributed run
The way the script can be distributed depends on the cluster's scheduling system.
Here we use Slurm, but it should be relatively easy to adapt to any scheduler that can allocate a set of matchines and start the same exectuable on all of them.
The command
```
bash run_on_cluster.bash slurm_distributed_kmeans
```
asks SLURM for 5 machines with 4 GPUs each with the `srun` command.
All 5 machines run the script with the `slurm_within_kmeans_server` option.
They determine the number of servers and their own server id via the `SLURM_NPROCS` and `SLURM_PROCID` environment variables.
All machines start `distributed_kmeans.py` in server mode for the slice of the dataset they are responsible for.
In addition, the machine #0 also starts the client.
The client knows who are the other servers via the variable `SLURM_JOB_NODELIST`.
It connects to all clients and performs the clustering.
The output should look like [this gist](https://gist.github.com/mdouze/8d25e89fb4af5093057cae0f917da6cd).
### Run used for deep1B
For the real run, we run the clustering on 50M vectors to 1M centroids.
This is just a matter of using as many machines / GPUs as possible in setting the output centroids with the `--out filename` option.
Then run
```
bash run_on_cluster.bash deep1b_clustering
```
The last lines of output read like:
```
Iteration 19 (898.92 s, search 875.71 s): objective=1.33601e+07 imbalance=1.303 nsplit=0
0: writing centroids to /checkpoint/matthijs/ondisk_distributed/1M_centroids.npy
```
This means that the total training time was 899s, of which 876s were used for computation.
However, the computation includes the I/O overhead to the assignment servers.
In this implementation, the overhead of transmitting the data is non-negligible and so is the centroid computation stage.
This is due to the inefficient Python implementation and the RPC protocol that is not optimized for broadcast / gather (like MPI).
However, it is a simple implementation that should run on most clusters.
## Making the trained index
After the centroids are obtained, an empty trained index must be constructed.
This is done by:
- applying a pre-processing stage (a random rotation) to balance the dimensions of the vectors. This can be done after clustering, the clusters are just rotated as well.
- wrapping the centroids into a HNSW index to speed up the CPU-based assignment of vectors
- training the 6-bit scalar quantizer used to encode the vectors
This is performed by the script [`make_trained_index.py`](make_trained_index.py).
## Building the index by slices
We call the slices "vslisces" as they are vertical slices of the big matrix, see explanation in the wiki section [Split across datanbase partitions](https://github.com/facebookresearch/faiss/wiki/Indexing-1T-vectors#split-across-database-partitions).
The script [make_index_vslice.py](make_index_vslice.py) makes an index for a subset of the vectors of the input data and stores it as an independent index.
There are 200 slices of 5M vectors each for Deep1B.
It can be run in a brute-force parallel fashion, there is no constraint on ordering.
To run the script in parallel on a slurm cluster, use:
```
bash run_on_cluster.bash make_index_vslices
```
For a real dataset, the data would be read from a DBMS.
In that case, reading the data and indexing it in parallel is worthwhile because reading is very slow.
## Splitting accross inverted lists
The 200 slices need to be merged together.
This is done with the script [merge_to_ondisk.py](merge_to_ondisk.py), that memory maps the 200 vertical slice indexes, extracts a subset of the inverted lists and writes them to a contiguous horizontal slice.
We slice the inverted lists into 50 horizontal slices.
This is run with
```
bash run_on_cluster.bash make_index_hslices
```
## Querying the index
At this point the index is ready.
The horizontal slices need to be loaded in the right order and combined into an index to be usable.
This is done in the [combined_index.py](combined_index.py) script.
It provides a `CombinedIndexDeep1B` object that contains an index object that can be searched.
To test, run:
```
python combined_index.py
```
The output should look like:
```
(faiss_1.5.2) matthijs@devfair0144:~/faiss_versions/faiss_1Tcode/faiss/benchs/distributed_ondisk$ python combined_index.py
reading /checkpoint/matthijs/ondisk_distributed//hslices/slice49.faissindex
loading empty index /checkpoint/matthijs/ondisk_distributed/trained.faissindex
replace invlists
loaded index of size 1000000000
nprobe=1 1-recall@1=0.2904 t=12.35s
nnprobe=10 1-recall@1=0.6499 t=17.67s
nprobe=100 1-recall@1=0.8673 t=29.23s
nprobe=1000 1-recall@1=0.9132 t=129.58s
```
ie. searching is a lot slower than from RAM.
## Distributed query
To reduce the bandwidth required from the machine that does the queries, it is possible to split the search accross several search servers.
This way, only the effective results are returned to the main machine.
The search client and server are implemented in [`search_server.py`](search_server.py).
It can be used as a script to start a search server for `CombinedIndexDeep1B` or as a module to load the clients.
The search servers can be started with
```
bash run_on_cluster.bash run_search_servers
```
(adjust to the number of servers that can be used).
Then an example of search client is [`distributed_query_demo.py`](distributed_query_demo.py).
It connects to the servers and assigns subsets of inverted lists to visit to each of them.
A typical output is [this gist](https://gist.github.com/mdouze/1585b9854a9a2437d71f2b2c3c05c7c5).
The number in MiB indicates the amount of data that is read from disk to perform the search.
In this case, the scale of the dataset is too small for the distributed search to have much impact, but on datasets > 10x larger, the difference becomes more significant.
## Conclusion
This code contains the core components to make an index that scales up to 1T vectors.
There are a few simplifications wrt. the index that was effectively used in [Indexing 1T vectors](https://github.com/facebookresearch/faiss/wiki/Indexing-1T-vectors).
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import faiss
import numpy as np
class CombinedIndex:
"""
combines a set of inverted lists into a hstack
masks part of those lists
adds these inverted lists to an empty index that contains
the info on how to perform searches
"""
def __init__(self, invlist_fnames, empty_index_fname,
masked_index_fname=None):
self.indexes = indexes = []
ilv = faiss.InvertedListsPtrVector()
for fname in invlist_fnames:
if os.path.exists(fname):
print('reading', fname, end='\r', flush=True)
index = faiss.read_index(fname)
indexes.append(index)
il = faiss.extract_index_ivf(index).invlists
else:
raise AssertionError
ilv.push_back(il)
print()
self.big_il = faiss.VStackInvertedLists(ilv.size(), ilv.data())
if masked_index_fname:
self.big_il_base = self.big_il
print('loading', masked_index_fname)
self.masked_index = faiss.read_index(
masked_index_fname,
faiss.IO_FLAG_MMAP | faiss.IO_FLAG_READ_ONLY)
self.big_il = faiss.MaskedInvertedLists(
faiss.extract_index_ivf(self.masked_index).invlists,
self.big_il_base)
print('loading empty index', empty_index_fname)
self.index = faiss.read_index(empty_index_fname)
ntotal = self.big_il.compute_ntotal()
print('replace invlists')
index_ivf = faiss.extract_index_ivf(self.index)
index_ivf.replace_invlists(self.big_il, False)
index_ivf.ntotal = self.index.ntotal = ntotal
index_ivf.parallel_mode = 1 # seems reasonable to do this all the time
quantizer = faiss.downcast_index(index_ivf.quantizer)
quantizer.hnsw.efSearch = 1024
############################################################
# Expose fields and functions of the index as methods so that they
# can be called by RPC
def search(self, x, k):
return self.index.search(x, k)
def range_search(self, x, radius):
return self.index.range_search(x, radius)
def transform_and_assign(self, xq):
index = self.index
if isinstance(index, faiss.IndexPreTransform):
assert index.chain.size() == 1
vt = index.chain.at(0)
xq = vt.apply_py(xq)
# perform quantization
index_ivf = faiss.extract_index_ivf(index)
quantizer = index_ivf.quantizer
coarse_dis, list_nos = quantizer.search(xq, index_ivf.nprobe)
return xq, list_nos, coarse_dis
def ivf_search_preassigned(self, xq, list_nos, coarse_dis, k):
index_ivf = faiss.extract_index_ivf(self.index)
n, d = xq.shape
assert d == index_ivf.d
n2, d2 = list_nos.shape
assert list_nos.shape == coarse_dis.shape
assert n2 == n
assert d2 == index_ivf.nprobe
D = np.empty((n, k), dtype='float32')
I = np.empty((n, k), dtype='int64')
index_ivf.search_preassigned(
n, faiss.swig_ptr(xq), k,
faiss.swig_ptr(list_nos), faiss.swig_ptr(coarse_dis),
faiss.swig_ptr(D), faiss.swig_ptr(I), False)
return D, I
def ivf_range_search_preassigned(self, xq, list_nos, coarse_dis, radius):
index_ivf = faiss.extract_index_ivf(self.index)
n, d = xq.shape
assert d == index_ivf.d
n2, d2 = list_nos.shape
assert list_nos.shape == coarse_dis.shape
assert n2 == n
assert d2 == index_ivf.nprobe
res = faiss.RangeSearchResult(n)
index_ivf.range_search_preassigned(
n, faiss.swig_ptr(xq), radius,
faiss.swig_ptr(list_nos), faiss.swig_ptr(coarse_dis),
res)
lims = faiss.rev_swig_ptr(res.lims, n + 1).copy()
nd = int(lims[-1])
D = faiss.rev_swig_ptr(res.distances, nd).copy()
I = faiss.rev_swig_ptr(res.labels, nd).copy()
return lims, D, I
def set_nprobe(self, nprobe):
index_ivf = faiss.extract_index_ivf(self.index)
index_ivf.nprobe = nprobe
def set_parallel_mode(self, pm):
index_ivf = faiss.extract_index_ivf(self.index)
index_ivf.parallel_mode = pm
def get_ntotal(self):
return self.index.ntotal
def set_prefetch_nthread(self, nt):
for idx in self.indexes:
il = faiss.downcast_InvertedLists(
faiss.extract_index_ivf(idx).invlists)
il.prefetch_nthread
il.prefetch_nthread = nt
def set_omp_num_threads(self, nt):
faiss.omp_set_num_threads(nt)
class CombinedIndexDeep1B(CombinedIndex):
""" loads a CombinedIndex with the data from the big photodna index """
def __init__(self):
# set some paths
workdir = "/checkpoint/matthijs/ondisk_distributed/"
# empty index with the proper quantizer
indexfname = workdir + 'trained.faissindex'
# index that has some invlists that override the big one
masked_index_fname = None
invlist_fnames = [
'%s/hslices/slice%d.faissindex' % (workdir, i)
for i in range(50)
]
CombinedIndex.__init__(self, invlist_fnames, indexfname, masked_index_fname)
def ivecs_read(fname):
a = np.fromfile(fname, dtype='int32')
d = a[0]
return a.reshape(-1, d + 1)[:, 1:].copy()
def fvecs_read(fname):
return ivecs_read(fname).view('float32')
if __name__ == '__main__':
import time
ci = CombinedIndexDeep1B()
print('loaded index of size ', ci.index.ntotal)
deep1bdir = "/datasets01_101/simsearch/041218/deep1b/"
xq = fvecs_read(deep1bdir + "deep1B_queries.fvecs")
gt_fname = deep1bdir + "deep1B_groundtruth.ivecs"
gt = ivecs_read(gt_fname)
for nprobe in 1, 10, 100, 1000:
ci.set_nprobe(nprobe)
t0 = time.time()
D, I = ci.search(xq, 100)
t1 = time.time()
print('nprobe=%d 1-recall@1=%.4f t=%.2fs' % (
nprobe, (I[:, 0] == gt[:, 0]).sum() / len(xq),
t1 - t0
))
#! /usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""
Simple distributed kmeans implementation Relies on an abstraction
for the training matrix, that can be sharded over several machines.
"""
import faiss
import time
import numpy as np
import sys
import pdb
import argparse
from scipy.sparse import csc_matrix
from multiprocessing.dummy import Pool as ThreadPool
import rpc
class DatasetAssign:
"""Wrapper for a matrix that offers a function to assign the vectors
to centroids. All other implementations offer the same interface"""
def __init__(self, x):
self.x = np.ascontiguousarray(x, dtype='float32')
def count(self):
return self.x.shape[0]
def dim(self):
return self.x.shape[1]
def get_subset(self, indices):
return self.x[indices]
def perform_search(self, centroids):
index = faiss.IndexFlatL2(self.x.shape[1])
index.add(centroids)
return index.search(self.x, 1)
def assign_to(self, centroids, weights=None):
D, I = self.perform_search(centroids)
I = I.ravel()
D = D.ravel()
n = len(self.x)
if weights is None:
weights = np.ones(n, dtype='float32')
nc = len(centroids)
m = csc_matrix((weights, I, np.arange(n + 1)),
shape=(nc, n))
sum_per_centroid = m * self.x
return I, D, sum_per_centroid
class DatasetAssignGPU(DatasetAssign):
""" GPU version of the previous """
def __init__(self, x, gpu_id, verbose=False):
DatasetAssign.__init__(self, x)
index = faiss.IndexFlatL2(x.shape[1])
if gpu_id >= 0:
self.index = faiss.index_cpu_to_gpu(
faiss.StandardGpuResources(),
gpu_id, index)
else:
# -1 -> assign to all GPUs
self.index = faiss.index_cpu_to_all_gpus(index)
def perform_search(self, centroids):
self.index.reset()
self.index.add(centroids)
return self.index.search(self.x, 1)
class DatasetAssignDispatch:
"""dispatches to several other DatasetAssigns and combines the
results"""
def __init__(self, xes, in_parallel):
self.xes = xes
self.d = xes[0].dim()
if not in_parallel:
self.imap = map
else:
self.pool = ThreadPool(len(self.xes))
self.imap = self.pool.imap
self.sizes = list(map(lambda x: x.count(), self.xes))
self.cs = np.cumsum([0] + self.sizes)
def count(self):
return self.cs[-1]
def dim(self):
return self.d
def get_subset(self, indices):
res = np.zeros((len(indices), self.d), dtype='float32')
nos = np.searchsorted(self.cs[1:], indices, side='right')
def handle(i):
mask = nos == i
sub_indices = indices[mask] - self.cs[i]
subset = self.xes[i].get_subset(sub_indices)
res[mask] = subset
list(self.imap(handle, range(len(self.xes))))
return res
def assign_to(self, centroids, weights=None):
src = self.imap(
lambda x: x.assign_to(centroids, weights),
self.xes
)
I = []
D = []
sum_per_centroid = None
for Ii, Di, sum_per_centroid_i in src:
I.append(Ii)
D.append(Di)
if sum_per_centroid is None:
sum_per_centroid = sum_per_centroid_i
else:
sum_per_centroid += sum_per_centroid_i
return np.hstack(I), np.hstack(D), sum_per_centroid
def imbalance_factor(k , assign):
return faiss.imbalance_factor(len(assign), k, faiss.swig_ptr(assign))
def reassign_centroids(hassign, centroids, rs=None):
""" reassign centroids when some of them collapse """
if rs is None:
rs = np.random
k, d = centroids.shape
nsplit = 0
empty_cents = np.where(hassign == 0)[0]
if empty_cents.size == 0:
return 0
fac = np.ones(d)
fac[::2] += 1 / 1024.
fac[1::2] -= 1 / 1024.
# this is a single pass unless there are more than k/2
# empty centroids
while empty_cents.size > 0:
# choose which centroids to split
probas = hassign.astype('float') - 1
probas[probas < 0] = 0
probas /= probas.sum()
nnz = (probas > 0).sum()
nreplace = min(nnz, empty_cents.size)
cjs = rs.choice(k, size=nreplace, p=probas)
for ci, cj in zip(empty_cents[:nreplace], cjs):
c = centroids[cj]
centroids[ci] = c * fac
centroids[cj] = c / fac
hassign[ci] = hassign[cj] // 2
hassign[cj] -= hassign[ci]
nsplit += 1
empty_cents = empty_cents[nreplace:]
return nsplit
def kmeans(k, data, niter=25, seed=1234, checkpoint=None):
"""Pure python kmeans implementation. Follows the Faiss C++ version
quite closely, but takes a DatasetAssign instead of a training data
matrix. Also redo is not implemented. """
n, d = data.count(), data.dim()
print(("Clustering %d points in %dD to %d clusters, " +
"%d iterations seed %d") % (n, d, k, niter, seed))
rs = np.random.RandomState(seed)
print("preproc...")
t0 = time.time()
# initialization
perm = rs.choice(n, size=k, replace=False)
centroids = data.get_subset(perm)
print(" done")
t_search_tot = 0
obj = []
for i in range(niter):
t0s = time.time()
print('assigning', end='\r', flush=True)
assign, D, sums = data.assign_to(centroids)
print('compute centroids', end='\r', flush=True)
# pdb.set_trace()
t_search_tot += time.time() - t0s;
err = D.sum()
obj.append(err)
hassign = np.bincount(assign, minlength=k)
fac = hassign.reshape(-1, 1).astype('float32')
fac[fac == 0] = 1 # quiet warning
centroids = sums / fac
nsplit = reassign_centroids(hassign, centroids, rs)
print((" Iteration %d (%.2f s, search %.2f s): "
"objective=%g imbalance=%.3f nsplit=%d") % (
i, (time.time() - t0), t_search_tot,
err, imbalance_factor (k, assign),
nsplit)
)
if checkpoint is not None:
print('storing centroids in', checkpoint)
np.save(checkpoint, centroids)
return centroids
class AssignServer(rpc.Server):
""" Assign version that can be exposed via RPC """
def __init__(self, s, assign, log_prefix=''):
rpc.Server.__init__(self, s, log_prefix=log_prefix)
self.assign = assign
def __getattr__(self, f):
return getattr(self.assign, f)
def bvecs_mmap(fname):
x = np.memmap(fname, dtype='uint8', mode='r')
d = x[:4].view('int32')[0]
return x.reshape(-1, d + 4)[:, 4:]
def ivecs_mmap(fname):
a = np.memmap(fname, dtype='int32', mode='r')
d = a[0]
return a.reshape(-1, d + 1)[:, 1:]
def fvecs_mmap(fname):
return ivecs_mmap(fname).view('float32')
def do_test(todo):
testdata = '/datasets01_101/simsearch/041218/bigann/bigann_learn.bvecs'
x = bvecs_mmap(testdata)
# bad distribution to stress-test split code
xx = x[:100000].copy()
xx[:50000] = x[0]
todo = sys.argv[1:]
if "0" in todo:
# reference C++ run
km = faiss.Kmeans(x.shape[1], 1000, niter=20, verbose=True)
km.train(xx.astype('float32'))
if "1" in todo:
# using the Faiss c++ implementation
data = DatasetAssign(xx)
kmeans(1000, data, 20)
if "2" in todo:
# use the dispatch object (on local datasets)
data = DatasetAssignDispatch([
DatasetAssign(xx[20000 * i : 20000 * (i + 1)])
for i in range(5)
], False
)
kmeans(1000, data, 20)
if "3" in todo:
# same, with GPU
ngpu = faiss.get_num_gpus()
print('using %d GPUs' % ngpu)
data = DatasetAssignDispatch([
DatasetAssignGPU(xx[100000 * i // ngpu: 100000 * (i + 1) // ngpu], i)
for i in range(ngpu)
], True
)
kmeans(1000, data, 20)
def main():
parser = argparse.ArgumentParser()
def aa(*args, **kwargs):
group.add_argument(*args, **kwargs)
group = parser.add_argument_group('general options')
aa('--test', default='', help='perform tests (comma-separated numbers)')
aa('--k', default=0, type=int, help='nb centroids')
aa('--seed', default=1234, type=int, help='random seed')
aa('--niter', default=20, type=int, help='nb iterations')
aa('--gpu', default=-2, type=int, help='GPU to use (-2:none, -1: all)')
group = parser.add_argument_group('I/O options')
aa('--indata', default='',
help='data file to load (supported formats fvecs, bvecs, npy')
aa('--i0', default=0, type=int, help='first vector to keep')
aa('--i1', default=-1, type=int, help='last vec to keep + 1')
aa('--out', default='', help='file to store centroids')
aa('--store_each_iteration', default=False, action='store_true',
help='store centroid checkpoints')
group = parser.add_argument_group('server options')
aa('--server', action='store_true', default=False, help='run server')
aa('--port', default=12345, type=int, help='server port')
aa('--when_ready', default=None, help='store host:port to this file when ready')
aa('--ipv4', default=False, action='store_true', help='force ipv4')
group = parser.add_argument_group('client options')
aa('--client', action='store_true', default=False, help='run client')
aa('--servers', default='', help='list of server:port separated by spaces')
args = parser.parse_args()
if args.test:
do_test(args.test.split(','))
return
# prepare data matrix (either local or remote)
if args.indata:
print('loading ', args.indata)
if args.indata.endswith('.bvecs'):
x = bvecs_mmap(args.indata)
elif args.indata.endswith('.fvecs'):
x = fvecs_mmap(args.indata)
elif args.indata.endswith('.npy'):
x = np.load(args.indata, mmap_mode='r')
else:
raise AssertionError
if args.i1 == -1:
args.i1 = len(x)
x = x[args.i0:args.i1]
if args.gpu == -2:
data = DatasetAssign(x)
else:
print('moving to GPU')
data = DatasetAssignGPU(x, args.gpu)
elif args.client:
print('connecting to servers')
def connect_client(hostport):
host, port = hostport.split(':')
port = int(port)
print('connecting %s:%d' % (host, port))
client = rpc.Client(host, port, v6=not args.ipv4)
print('client %s:%d ready' % (host, port))
return client
hostports = args.servers.strip().split(' ')
# pool = ThreadPool(len(hostports))
data = DatasetAssignDispatch(
list(map(connect_client, hostports)),
True
)
else:
raise AssertionError
if args.server:
print('starting server')
log_prefix = f"{rpc.socket.gethostname()}:{args.port}"
rpc.run_server(
lambda s: AssignServer(s, data, log_prefix=log_prefix),
args.port, report_to_file=args.when_ready,
v6=not args.ipv4)
else:
print('running kmeans')
centroids = kmeans(args.k, data, niter=args.niter, seed=args.seed,
checkpoint=args.out if args.store_each_iteration else None)
if args.out != '':
print('writing centroids to', args.out)
np.save(args.out, centroids)
if __name__ == '__main__':
main()
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import faiss
import numpy as np
import time
import rpc
import sys
import combined_index
import search_server
hostnames = sys.argv[1:]
print("Load local index")
ci = combined_index.CombinedIndexDeep1B()
print("connect to clients")
clients = []
for host in hostnames:
client = rpc.Client(host, 12012, v6=False)
clients.append(client)
# check if all servers respond
print("sizes seen by servers:", [cl.get_ntotal() for cl in clients])
# aggregate all clients into a one that uses them all for speed
# note that it also requires a local index ci
sindex = search_server.SplitPerListIndex(ci, clients)
sindex.verbose = True
# set reasonable parameters
ci.set_parallel_mode(1)
ci.set_prefetch_nthread(0)
ci.set_omp_num_threads(64)
# initialize params
sindex.set_parallel_mode(1)
sindex.set_prefetch_nthread(0)
sindex.set_omp_num_threads(64)
def ivecs_read(fname):
a = np.fromfile(fname, dtype='int32')
d = a[0]
return a.reshape(-1, d + 1)[:, 1:].copy()
def fvecs_read(fname):
return ivecs_read(fname).view('float32')
deep1bdir = "/datasets01_101/simsearch/041218/deep1b/"
xq = fvecs_read(deep1bdir + "deep1B_queries.fvecs")
gt_fname = deep1bdir + "deep1B_groundtruth.ivecs"
gt = ivecs_read(gt_fname)
for nprobe in 1, 10, 100, 1000:
sindex.set_nprobe(nprobe)
t0 = time.time()
D, I = sindex.search(xq, 100)
t1 = time.time()
print('nprobe=%d 1-recall@1=%.4f t=%.2fs' % (
nprobe, (I[:, 0] == gt[:, 0]).sum() / len(xq),
t1 - t0
))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment