Unverified Commit 100ddd06 authored by Chao Ma's avatar Chao Ma Committed by GitHub
Browse files

[KVStore] Bug fix (#1480)



* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* fix lint

* update

* Update network.cc
Co-authored-by: default avatarDa Zheng <zhengda1936@gmail.com>
parent aa17d78a
...@@ -820,7 +820,7 @@ class KVClient(object): ...@@ -820,7 +820,7 @@ class KVClient(object):
---------- ----------
name : str name : str
data name data name
shape : list of int shape : list or tuple of int
data shape data shape
dtype : dtype dtype : dtype
data type data type
...@@ -840,7 +840,7 @@ class KVClient(object): ...@@ -840,7 +840,7 @@ class KVClient(object):
m_id = machines[idx] m_id = machines[idx]
data_str = self._serialize_shared_tensor(name, dtype) data_str = self._serialize_shared_tensor(name, dtype)
data_str = data_str + '|' + target_name data_str = data_str + '|' + target_name
partitioned_shape = shape.copy() partitioned_shape = list(shape)
partitioned_shape[0] = count[idx] partitioned_shape[0] = count[idx]
for n in range(self._group_count): for n in range(self._group_count):
server_id = m_id * self._group_count + n server_id = m_id * self._group_count + n
...@@ -1149,16 +1149,17 @@ class KVClient(object): ...@@ -1149,16 +1149,17 @@ class KVClient(object):
We usually invoke this API by just one client (e.g., client_0). We usually invoke this API by just one client (e.g., client_0).
""" """
for server_id in range(self._server_count): if self._client_id == 0:
msg = KVStoreMsg( for server_id in range(self._server_count):
type=KVMsgType.FINAL, msg = KVStoreMsg(
rank=self._client_id, type=KVMsgType.FINAL,
name=None, rank=self._client_id,
id=None, name=None,
data=None, id=None,
shape=None, data=None,
c_ptr=None) shape=None,
_send_kv_msg(self._sender, msg, server_id) c_ptr=None)
_send_kv_msg(self._sender, msg, server_id)
def _get_local_usable_addr(self): def _get_local_usable_addr(self):
......
...@@ -65,6 +65,8 @@ NDArray CreateNDArrayFromRaw(std::vector<int64_t> shape, ...@@ -65,6 +65,8 @@ NDArray CreateNDArrayFromRaw(std::vector<int64_t> shape,
} }
void ArrayMeta::AddArray(const NDArray& array) { void ArrayMeta::AddArray(const NDArray& array) {
// Get data type of current NDArray
data_type_.push_back(array->dtype);
// We first write the ndim to the data_shape_ // We first write the ndim to the data_shape_
data_shape_.push_back(static_cast<int64_t>(array->ndim)); data_shape_.push_back(static_cast<int64_t>(array->ndim));
// Then we write the data shape // Then we write the data shape
...@@ -82,6 +84,9 @@ char* ArrayMeta::Serialize(int64_t* size) { ...@@ -82,6 +84,9 @@ char* ArrayMeta::Serialize(int64_t* size) {
buffer_size += sizeof(ndarray_count_); buffer_size += sizeof(ndarray_count_);
buffer_size += sizeof(data_shape_.size()); buffer_size += sizeof(data_shape_.size());
buffer_size += sizeof(int64_t) * data_shape_.size(); buffer_size += sizeof(int64_t) * data_shape_.size();
// we don't need to write data_type_.size()
// because it equals to ndarray_count_ * 3
buffer_size += sizeof(DLDataType) * data_type_.size();
} }
// In the future, we should have a better memory management as // In the future, we should have a better memory management as
// allocating a large chunk of memory can be very expensive. // allocating a large chunk of memory can be very expensive.
...@@ -94,6 +99,11 @@ char* ArrayMeta::Serialize(int64_t* size) { ...@@ -94,6 +99,11 @@ char* ArrayMeta::Serialize(int64_t* size) {
// Write ndarray_count_ // Write ndarray_count_
*(reinterpret_cast<int*>(pointer)) = ndarray_count_; *(reinterpret_cast<int*>(pointer)) = ndarray_count_;
pointer += sizeof(ndarray_count_); pointer += sizeof(ndarray_count_);
// Write data type
memcpy(pointer,
reinterpret_cast<DLDataType*>(data_type_.data()),
sizeof(DLDataType) * data_type_.size());
pointer += (sizeof(DLDataType) * data_type_.size());
// Write size of data_shape_ // Write size of data_shape_
*(reinterpret_cast<size_t*>(pointer)) = data_shape_.size(); *(reinterpret_cast<size_t*>(pointer)) = data_shape_.size();
pointer += sizeof(data_shape_.size()); pointer += sizeof(data_shape_.size());
...@@ -117,6 +127,12 @@ void ArrayMeta::Deserialize(char* buffer, int64_t size) { ...@@ -117,6 +127,12 @@ void ArrayMeta::Deserialize(char* buffer, int64_t size) {
ndarray_count_ = *(reinterpret_cast<int*>(buffer)); ndarray_count_ = *(reinterpret_cast<int*>(buffer));
buffer += sizeof(int); buffer += sizeof(int);
data_size += sizeof(int); data_size += sizeof(int);
// Read data type
data_type_.resize(ndarray_count_);
memcpy(data_type_.data(), buffer,
ndarray_count_ * sizeof(DLDataType));
buffer += ndarray_count_ * sizeof(DLDataType);
data_size += ndarray_count_ * sizeof(DLDataType);
// Read size of data_shape_ // Read size of data_shape_
size_t count = *(reinterpret_cast<size_t*>(buffer)); size_t count = *(reinterpret_cast<size_t*>(buffer));
buffer += sizeof(size_t); buffer += sizeof(size_t);
...@@ -570,7 +586,7 @@ static KVStoreMsg* recv_kv_message(network::Receiver* receiver) { ...@@ -570,7 +586,7 @@ static KVStoreMsg* recv_kv_message(network::Receiver* receiver) {
CHECK_EQ(meta.data_shape_[0], 1); CHECK_EQ(meta.data_shape_[0], 1);
kv_msg->id = CreateNDArrayFromRaw( kv_msg->id = CreateNDArrayFromRaw(
{meta.data_shape_[1]}, {meta.data_shape_[1]},
DLDataType{kDLInt, 64, 1}, meta.data_type_[0],
DLContext{kDLCPU, 0}, DLContext{kDLCPU, 0},
recv_id_msg.data, recv_id_msg.data,
AUTO_FREE); AUTO_FREE);
...@@ -588,7 +604,7 @@ static KVStoreMsg* recv_kv_message(network::Receiver* receiver) { ...@@ -588,7 +604,7 @@ static KVStoreMsg* recv_kv_message(network::Receiver* receiver) {
} }
kv_msg->data = CreateNDArrayFromRaw( kv_msg->data = CreateNDArrayFromRaw(
vec_shape, vec_shape,
DLDataType{kDLFloat, 32, 1}, meta.data_type_[1],
DLContext{kDLCPU, 0}, DLContext{kDLCPU, 0},
recv_data_msg.data, recv_data_msg.data,
AUTO_FREE); AUTO_FREE);
...@@ -607,7 +623,7 @@ static KVStoreMsg* recv_kv_message(network::Receiver* receiver) { ...@@ -607,7 +623,7 @@ static KVStoreMsg* recv_kv_message(network::Receiver* receiver) {
} }
kv_msg->shape = CreateNDArrayFromRaw( kv_msg->shape = CreateNDArrayFromRaw(
vec_shape, vec_shape,
DLDataType{kDLInt, 64, 1}, meta.data_type_[0],
DLContext{kDLCPU, 0}, DLContext{kDLCPU, 0},
recv_shape_msg.data, recv_shape_msg.data,
AUTO_FREE); AUTO_FREE);
...@@ -772,7 +788,7 @@ DGL_REGISTER_GLOBAL("network._CAPI_FastPull") ...@@ -772,7 +788,7 @@ DGL_REGISTER_GLOBAL("network._CAPI_FastPull")
kv_msg.rank = client_id; kv_msg.rank = client_id;
kv_msg.name = name; kv_msg.name = name;
kv_msg.id = CreateNDArrayFromRaw({static_cast<int64_t>(remote_ids[i].size())}, kv_msg.id = CreateNDArrayFromRaw({static_cast<int64_t>(remote_ids[i].size())},
DLDataType{kDLInt, 64, 1}, ID->dtype,
DLContext{kDLCPU, 0}, DLContext{kDLCPU, 0},
remote_ids[i].data(), remote_ids[i].data(),
!AUTO_FREE); !AUTO_FREE);
...@@ -812,7 +828,7 @@ DGL_REGISTER_GLOBAL("network._CAPI_FastPull") ...@@ -812,7 +828,7 @@ DGL_REGISTER_GLOBAL("network._CAPI_FastPull")
local_data_shape[0] = ID_size; local_data_shape[0] = ID_size;
NDArray res_tensor = CreateNDArrayFromRaw( NDArray res_tensor = CreateNDArrayFromRaw(
local_data_shape, local_data_shape,
DLDataType{kDLFloat, 32, 1}, local_data->dtype,
DLContext{kDLCPU, 0}, DLContext{kDLCPU, 0},
return_data, return_data,
AUTO_FREE); AUTO_FREE);
......
...@@ -67,6 +67,7 @@ enum MessageType { ...@@ -67,6 +67,7 @@ enum MessageType {
kIPIDMsg = 7 kIPIDMsg = 7
}; };
/*! /*!
* \brief Meta data for NDArray message * \brief Meta data for NDArray message
*/ */
...@@ -133,6 +134,11 @@ class ArrayMeta { ...@@ -133,6 +134,11 @@ class ArrayMeta {
*/ */
int ndarray_count_; int ndarray_count_;
/*!
* \brief DataType for each NDArray
*/
std::vector<DLDataType> data_type_;
/*! /*!
* \brief We first write the ndim to data_shape_ * \brief We first write the ndim to data_shape_
* and then write the data shape. * and then write the data shape.
......
...@@ -23,15 +23,27 @@ data_1 = F.zeros((num_entries*2, dim_size), F.float32, F.cpu()) ...@@ -23,15 +23,27 @@ data_1 = F.zeros((num_entries*2, dim_size), F.float32, F.cpu())
g2l_1 = F.arange(0, num_entries*2) g2l_1 = F.arange(0, num_entries*2)
partition_1 = F.zeros(num_entries*2, F.int64, F.cpu()) partition_1 = F.zeros(num_entries*2, F.int64, F.cpu())
data_3 = F.zeros((num_entries, dim_size), F.int64, F.cpu())
data_4 = F.zeros((num_entries, dim_size), F.float64, F.cpu())
data_5 = F.zeros((num_entries, dim_size), F.int32, F.cpu())
def start_server(): def start_server():
my_server = KVServer(server_id=0, server_namebook=server_namebook, num_client=1) my_server = KVServer(server_id=0, server_namebook=server_namebook, num_client=1)
my_server.set_global2local(name='data_0', global2local=g2l_0) my_server.set_global2local(name='data_0', global2local=g2l_0)
my_server.set_global2local(name='data_1', global2local=g2l_1) my_server.set_global2local(name='data_1', global2local=g2l_1)
my_server.set_global2local(name='data_3', global2local=g2l_0)
my_server.set_global2local(name='data_4', global2local=g2l_0)
my_server.set_global2local(name='data_5', global2local=g2l_0)
my_server.set_partition_book(name='data_0', partition_book=partition_0) my_server.set_partition_book(name='data_0', partition_book=partition_0)
my_server.set_partition_book(name='data_1', partition_book=partition_1) my_server.set_partition_book(name='data_1', partition_book=partition_1)
my_server.set_partition_book(name='data_3', partition_book=partition_0)
my_server.set_partition_book(name='data_4', partition_book=partition_0)
my_server.set_partition_book(name='data_5', partition_book=partition_0)
my_server.init_data(name='data_0', data_tensor=data_0) my_server.init_data(name='data_0', data_tensor=data_0)
my_server.init_data(name='data_1', data_tensor=data_1) my_server.init_data(name='data_1', data_tensor=data_1)
my_server.init_data(name='data_3', data_tensor=data_3)
my_server.init_data(name='data_4', data_tensor=data_4)
my_server.init_data(name='data_5', data_tensor=data_5)
my_server.start() my_server.start()
...@@ -40,13 +52,17 @@ def start_client(): ...@@ -40,13 +52,17 @@ def start_client():
my_client = KVClient(server_namebook=server_namebook) my_client = KVClient(server_namebook=server_namebook)
my_client.connect() my_client.connect()
my_client.init_data(name='data_2', shape=[num_entries, dim_size], dtype=F.float32, target_name='data_0') my_client.init_data(name='data_2', shape=(num_entries, dim_size), dtype=F.float32, target_name='data_0')
print("Init data from client..")
name_list = my_client.get_data_name_list() name_list = my_client.get_data_name_list()
assert len(name_list) == 3 assert len(name_list) == 6
assert 'data_0' in name_list assert 'data_0' in name_list
assert 'data_1' in name_list assert 'data_1' in name_list
assert 'data_2' in name_list assert 'data_2' in name_list
assert 'data_3' in name_list
assert 'data_4' in name_list
assert 'data_5' in name_list
meta_0 = my_client.get_data_meta('data_0') meta_0 = my_client.get_data_meta('data_0')
assert meta_0[0] == F.float32 assert meta_0[0] == F.float32
...@@ -60,8 +76,23 @@ def start_client(): ...@@ -60,8 +76,23 @@ def start_client():
assert meta_2[0] == F.float32 assert meta_2[0] == F.float32
assert_array_equal(meta_2[2], partition_0) assert_array_equal(meta_2[2], partition_0)
meta_3 = my_client.get_data_meta('data_3')
assert meta_3[0] == F.int64
assert_array_equal(meta_3[2], partition_0)
meta_4 = my_client.get_data_meta('data_4')
assert meta_4[0] == F.float64
assert_array_equal(meta_3[2], partition_0)
meta_5 = my_client.get_data_meta('data_5')
assert meta_5[0] == F.int32
assert_array_equal(meta_3[2], partition_0)
my_client.push(name='data_0', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]])) my_client.push(name='data_0', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]]))
my_client.push(name='data_2', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]])) my_client.push(name='data_2', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]]))
my_client.push(name='data_3', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1,1,1],[2,2,2],[3,3,3]]))
my_client.push(name='data_4', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]], F.float64))
my_client.push(name='data_5', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1,1,1],[2,2,2],[3,3,3]], F.int32))
target = F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]]) target = F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]])
...@@ -71,6 +102,21 @@ def start_client(): ...@@ -71,6 +102,21 @@ def start_client():
res = my_client.pull(name='data_2', id_tensor=F.tensor([0, 1, 2])) res = my_client.pull(name='data_2', id_tensor=F.tensor([0, 1, 2]))
assert_array_equal(res, target) assert_array_equal(res, target)
target = F.tensor([[1,1,1],[2,2,2],[3,3,3]])
res = my_client.pull(name='data_3', id_tensor=F.tensor([0, 1, 2]))
assert_array_equal(res, target)
target = F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]], F.float64)
res = my_client.pull(name='data_4', id_tensor=F.tensor([0, 1, 2]))
assert_array_equal(res, target)
target = F.tensor([[1,1,1],[2,2,2],[3,3,3]], F.int32)
res = my_client.pull(name='data_5', id_tensor=F.tensor([0, 1, 2]))
assert_array_equal(res, target)
my_client.shut_down() my_client.shut_down()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment