Unverified Commit 5bf17547 authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[DistTensor] add bitwise or function (#4638)

* first commit

* add test

* fixes

* ah this is how you skip setup

* fix

* ugh

* address comments

* i like black
parent 09ec5fd5
......@@ -112,6 +112,7 @@ class DistTensor:
self._shape = shape
self._dtype = dtype
self._attach = attach
self._is_gdata = is_gdata
part_policies = self.kvstore.all_possible_part_policy
# If a user doesn't provide a partition policy, we should find one based on
......@@ -180,6 +181,19 @@ class DistTensor:
# TODO(zhengda) how do we want to support broadcast (e.g., G.ndata['h'][idx] = 1).
self.kvstore.push(name=self._name, id_tensor=idx, data_tensor=val)
def __or__(self, other):
new_dist_tensor = DistTensor(
self._shape,
self._dtype,
part_policy=self._part_policy,
persistent=self._persistent,
is_gdata=self._is_gdata,
attach=self._attach
)
kvstore = self.kvstore
kvstore.union(self._name, other._name, new_dist_tensor._name)
return new_dist_tensor
def __len__(self):
return self._shape[0]
......
......@@ -1351,6 +1351,15 @@ class KVClient(object):
data_tensor = F.cat(seq=[response.data_tensor for response in response_list], dim=0)
return data_tensor[back_sorted_id] # return data with original index order
def union(self, operand1_name, operand2_name, output_name):
"""Compute the union of two mask arrays in the KVStore.
"""
# Each trainer computes its own result from its local storage.
self._data_store[output_name][:] = (
self._data_store[operand1_name] |
self._data_store[operand2_name]
)
def _take_id(self, elem):
"""Used by sort response list
"""
......
......@@ -102,3 +102,10 @@ class KVClient(object):
the number of nonzero in this data.
"""
return F.count_nonzero(self._data[name])
def union(self, operand1_name, operand2_name, output_name):
"""Compute the union of two mask arrays in the KVStore.
"""
self._data[output_name][:] = (
self._data[operand1_name] | self._data[operand2_name]
)
......@@ -18,17 +18,13 @@ import backend as F
import math
import unittest
import pickle
from utils import reset_envs, generate_ip_config
from utils import reset_envs, generate_ip_config, create_random_graph
import pytest
if os.name != 'nt':
import fcntl
import struct
def create_random_graph(n):
arr = (spsp.random(n, n, density=0.001, format='coo', random_state=100) != 0).astype(np.int64)
return dgl.from_scipy(arr)
def run_server(graph_name, server_id, server_count, num_clients, shared_mem, keep_alive=False):
g = DistGraphServer(server_id, "kv_ip_config.txt", server_count, num_clients,
'/tmp/dist_graph/{}.json'.format(graph_name),
......
import operator
import os
import unittest
import backend as F
import pytest
from utils import create_random_graph, generate_ip_config, reset_envs
import dgl
dist_g = None
def rand_mask(shape, dtype):
return F.randn(shape) > 0
@unittest.skipIf(
dgl.backend.backend_name == "tensorflow",
reason="TF doesn't support some of operations in DistGraph",
)
@unittest.skipIf(
dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support"
)
def setup_module():
global dist_g
reset_envs()
os.environ["DGL_DIST_MODE"] = "standalone"
dist_g = create_random_graph(10000)
# Partition the graph.
num_parts = 1
graph_name = "dist_graph_test_3"
dist_g.ndata["features"] = F.unsqueeze(
F.arange(0, dist_g.number_of_nodes()), 1
)
dist_g.edata["features"] = F.unsqueeze(
F.arange(0, dist_g.number_of_edges()), 1
)
dgl.distributed.partition_graph(
dist_g, graph_name, num_parts, "/tmp/dist_graph"
)
dgl.distributed.initialize("kv_ip_config.txt")
dist_g = dgl.distributed.DistGraph(
graph_name, part_config="/tmp/dist_graph/{}.json".format(graph_name)
)
dist_g.edata["mask1"] = dgl.distributed.DistTensor(
(dist_g.num_edges(),), F.bool, init_func=rand_mask
)
dist_g.edata["mask2"] = dgl.distributed.DistTensor(
(dist_g.num_edges(),), F.bool, init_func=rand_mask
)
def check_binary_op(key1, key2, key3, op):
for i in range(0, dist_g.num_edges(), 1000):
i_end = min(i + 1000, dist_g.num_edges())
assert F.array_equal(
dist_g.edata[key3][i:i_end],
op(dist_g.edata[key1][i:i_end], dist_g.edata[key2][i:i_end]),
)
@unittest.skipIf(
dgl.backend.backend_name == "tensorflow",
reason="TF doesn't support some of operations in DistGraph",
)
@unittest.skipIf(
dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support"
)
def test_op():
dist_g.edata["mask3"] = dist_g.edata["mask1"] | dist_g.edata["mask2"]
check_binary_op("mask1", "mask2", "mask3", operator.or_)
@unittest.skipIf(
dgl.backend.backend_name == "tensorflow",
reason="TF doesn't support some of operations in DistGraph",
)
@unittest.skipIf(
dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support"
)
def teardown_module():
# Since there are two tests in one process, this is needed to make sure
# the client exits properly.
dgl.distributed.exit_client()
if __name__ == "__main__":
setup_module()
test_op()
teardown_module()
......@@ -2,6 +2,9 @@
import socket
import os
import random
import scipy.sparse as spsp
import numpy as np
import dgl
def generate_ip_config(file_name, num_machines, num_servers):
......@@ -44,3 +47,7 @@ def reset_envs():
'DGL_DIST_MODE', 'DGL_NUM_CLIENT', 'DGL_DIST_MAX_TRY_TIMES']:
if key in os.environ:
os.environ.pop(key)
def create_random_graph(n):
return dgl.rand_graph(n, int(n * n * 0.001))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment