Unverified Commit bfdd1eaa authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Feature] Create shared memory graph store. (#468)

* accelerate gcn_ns.

* add timing.

* run infer with whole graph.

* distributed gcn_ns.

* reconstruct gcn_ns.

* minor fix.

* change graphsage_cv for numa.

* fix #OMP threads.

* accelerate graphsage_cv.

* fix a weird bug.

* add profiler in graphsage_cv.

* accelerate graphsage_cv.

manually aggregate neighbors' embeddings with pull.

* load csr directly in gcn_ns_sc.

* parallel sort for graph index.

* Revert "parallel sort for graph index."

This reverts commit 86fe2c7117fe5e56b0d481b39849c258b166945b.

* run gcn_ns_sc on GPUs.

* acc gcn_cv_sc.

* change gcn_cv for numa.

* fix gcn_cv to use numa and gpu.

* improve graphsage_cv to use numa and gpu.

* improve gcn_ns.

* improve graphsage_cv.

* init shared memory graph store.

* fix.

* enable init ndata.

* improve tests.

* add bidirectional communication.

* link to rt.

* fix compilation error.

* fix shared memory init.

* use MessageQueue for inter-process communication.

* reconstruct immutable graph csr.

* fix gcn.

* load csr to shared memory.

* fix minor bugs.

* add comments.

* refactor SharedMemory.

* fix bugs in ImmutableGraph.

* create CSR graph from shared memory.

* add more test for loading a csr graph.

* terminate graph store properly.

* allow initializing ndata in the graph store server.

* use RPC for inter-process communication.

* a script for loading a graph.

* allow customizing port.

* list all ndata and edata.

* support dtype.

* reorganize SharedMemoryGraphStore.

* fix ndata shape.

* reconstruct gcn_ns.

* print info.

* set omp in gcn_ns.

* reset sampling examples.

* fix lint.

* fix lint.

* reset gcn.

* disable shared memory in windows.

* fix.

* fix.

* reset changes.

* revert nodeflow changes.

* fix cmake.

* fix test.

* fix test.

* fix test.

* fix test.

* add comments.

* fix test.

* move vector out.

* fix lint.

* fix lint.

* move SharedMemory.

* update cmake.

* update comment.

* fix comments.

* Revert "update cmake."

This reverts commit 592445e37077f70a6e3f2e5245f9a3d086b04f3b.

* update cmake.

* add comments.

* rename.

* change the comment.

* fix a bug.

* rename.

* add comments.

* add comments.

* add init_edata.

* rewrite memory alloc.

* move vector to CSR.

* fix.

* init data.

* Revert "init data."

This reverts commit 2b217b9553911b7dd84a9f1d9b68430b5aa18e23.

* init data.

* init new columns correctly.
parent 9565db6f
...@@ -19,7 +19,11 @@ include_directories("third_party/dmlc-core/include") ...@@ -19,7 +19,11 @@ include_directories("third_party/dmlc-core/include")
# initial variables # initial variables
set(DGL_LINKER_LIBS "") set(DGL_LINKER_LIBS "")
if(MSVC OR CMAKE_SYSTEM_NAME STREQUAL "Darwin")
set(DGL_RUNTIME_LINKER_LIBS "") set(DGL_RUNTIME_LINKER_LIBS "")
else(MSVC OR CMAKE_SYSTEM_NAME STREQUAL "Darwin")
set(DGL_RUNTIME_LINKER_LIBS "rt")
endif(MSVC OR CMAKE_SYSTEM_NAME STREQUAL "Darwin")
# Generic compilation options # Generic compilation options
if(MSVC) if(MSVC)
......
...@@ -318,7 +318,9 @@ class Graph: public GraphInterface { ...@@ -318,7 +318,9 @@ class Graph: public GraphInterface {
* \return the successor vector * \return the successor vector
*/ */
DGLIdIters SuccVec(dgl_id_t vid) const { DGLIdIters SuccVec(dgl_id_t vid) const {
return DGLIdIters(adjlist_[vid].succ.begin(), adjlist_[vid].succ.end()); auto data = adjlist_[vid].succ.data();
auto size = adjlist_[vid].succ.size();
return DGLIdIters(data, data + size);
} }
/*! /*!
...@@ -327,7 +329,9 @@ class Graph: public GraphInterface { ...@@ -327,7 +329,9 @@ class Graph: public GraphInterface {
* \return the out edge id vector * \return the out edge id vector
*/ */
DGLIdIters OutEdgeVec(dgl_id_t vid) const { DGLIdIters OutEdgeVec(dgl_id_t vid) const {
return DGLIdIters(adjlist_[vid].edge_id.begin(), adjlist_[vid].edge_id.end()); auto data = adjlist_[vid].edge_id.data();
auto size = adjlist_[vid].edge_id.size();
return DGLIdIters(data, data + size);
} }
/*! /*!
...@@ -336,7 +340,9 @@ class Graph: public GraphInterface { ...@@ -336,7 +340,9 @@ class Graph: public GraphInterface {
* \return the predecessor vector * \return the predecessor vector
*/ */
DGLIdIters PredVec(dgl_id_t vid) const { DGLIdIters PredVec(dgl_id_t vid) const {
return DGLIdIters(reverse_adjlist_[vid].succ.begin(), reverse_adjlist_[vid].succ.end()); auto data = reverse_adjlist_[vid].succ.data();
auto size = reverse_adjlist_[vid].succ.size();
return DGLIdIters(data, data + size);
} }
/*! /*!
...@@ -345,8 +351,9 @@ class Graph: public GraphInterface { ...@@ -345,8 +351,9 @@ class Graph: public GraphInterface {
* \return the in edge id vector * \return the in edge id vector
*/ */
DGLIdIters InEdgeVec(dgl_id_t vid) const { DGLIdIters InEdgeVec(dgl_id_t vid) const {
return DGLIdIters(reverse_adjlist_[vid].edge_id.begin(), auto data = reverse_adjlist_[vid].edge_id.data();
reverse_adjlist_[vid].edge_id.end()); auto size = reverse_adjlist_[vid].edge_id.size();
return DGLIdIters(data, data + size);
} }
/*! /*!
......
...@@ -32,17 +32,16 @@ const dgl_id_t DGL_INVALID_ID = static_cast<dgl_id_t>(-1); ...@@ -32,17 +32,16 @@ const dgl_id_t DGL_INVALID_ID = static_cast<dgl_id_t>(-1);
* but it doesn't own data itself. instead, it only references data in std::vector. * but it doesn't own data itself. instead, it only references data in std::vector.
*/ */
class DGLIdIters { class DGLIdIters {
std::vector<dgl_id_t>::const_iterator begin_, end_; const dgl_id_t *begin_, *end_;
public: public:
DGLIdIters(std::vector<dgl_id_t>::const_iterator begin, DGLIdIters(const dgl_id_t *begin, const dgl_id_t *end) {
std::vector<dgl_id_t>::const_iterator end) {
this->begin_ = begin; this->begin_ = begin;
this->end_ = end; this->end_ = end;
} }
std::vector<dgl_id_t>::const_iterator begin() const { const dgl_id_t *begin() const {
return this->begin_; return this->begin_;
} }
std::vector<dgl_id_t>::const_iterator end() const { const dgl_id_t *end() const {
return this->end_; return this->end_;
} }
dgl_id_t operator[](int64_t i) const { dgl_id_t operator[](int64_t i) const {
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include <cstdint> #include <cstdint>
#include <utility> #include <utility>
#include <tuple> #include <tuple>
#include <algorithm>
#include "runtime/ndarray.h" #include "runtime/ndarray.h"
#include "graph_interface.h" #include "graph_interface.h"
...@@ -32,42 +33,221 @@ class ImmutableGraph: public GraphInterface { ...@@ -32,42 +33,221 @@ class ImmutableGraph: public GraphInterface {
dgl_id_t edge_id; dgl_id_t edge_id;
}; };
// Edge list indexed by edge id; struct EdgeList;
struct EdgeList {
typedef std::shared_ptr<EdgeList> Ptr;
std::vector<dgl_id_t> src_points;
std::vector<dgl_id_t> dst_points;
EdgeList(int64_t len, dgl_id_t val) { struct CSR {
src_points.resize(len, val); typedef std::shared_ptr<CSR> Ptr;
dst_points.resize(len, val);
/*
* This vector provides interfaces similar to std::vector.
* The main difference is that the memory used by the vector can be allocated
* outside the vector. The main use case is that the vector can use the shared
* memory that is created by another process. In this way, we can access the
* graph structure loaded in another process.
*/
template<class T>
class vector {
public:
vector() {
this->arr = nullptr;
this->capacity = 0;
this->curr = 0;
this->own = false;
} }
void register_edge(dgl_id_t eid, dgl_id_t src, dgl_id_t dst) { /*
CHECK_LT(eid, src_points.size()) << "Invalid edge id " << eid; * Create a vector whose memory is allocated outside.
src_points[eid] = src; * Here there are no elements in the vector.
dst_points[eid] = dst; */
vector(T *arr, size_t size) {
this->arr = arr;
this->capacity = size;
this->curr = 0;
this->own = false;
} }
static EdgeList::Ptr FromCSR( /*
const std::vector<int64_t>& indptr, * Create a vector whose memory is allocated by the vector.
const std::vector<dgl_id_t>& indices, * Here there are no elements in the vector.
const std::vector<dgl_id_t>& edge_ids, */
bool in_csr); explicit vector(size_t size) {
}; this->arr = static_cast<T *>(malloc(size * sizeof(T)));
this->capacity = size;
this->curr = 0;
this->own = true;
}
struct CSR { ~vector() {
typedef std::shared_ptr<CSR> Ptr; // If the memory is allocated by the vector, it should be free'd.
std::vector<int64_t> indptr; if (this->own) {
std::vector<dgl_id_t> indices; free(this->arr);
std::vector<dgl_id_t> edge_ids; }
}
vector(const vector &other) = delete;
/*
* Initialize the vector whose memory is allocated outside.
* There are no elements in the vector.
*/
void init(T *arr, size_t size) {
CHECK(this->arr == nullptr);
this->arr = arr;
this->capacity = size;
this->curr = 0;
this->own = false;
}
/*
* Initialize the vector whose memory is allocated outside.
* There are elements in the vector.
*/
void init(T *arr, size_t capacity, size_t size) {
CHECK(this->arr == nullptr);
CHECK_LE(size, capacity);
this->arr = arr;
this->capacity = capacity;
this->curr = size;
this->own = false;
}
/* Similar to std::vector::push_back. */
void push_back(T val) {
// If the vector doesn't own the memory, it can't adjust its memory size.
if (!this->own) {
CHECK_LT(curr, capacity);
} else if (curr == capacity) {
this->capacity = this->capacity * 2;
this->arr = static_cast<T *>(realloc(this->arr, this->capacity * sizeof(T)));
CHECK(this->arr) << "can't allocate memory for a larger vector.";
}
this->arr[curr++] = val;
}
/*
* This inserts multiple elements to the back of the vector.
*/
void insert_back(const T* val, size_t len) {
if (!this->own) {
CHECK_LE(curr + len, capacity);
} else if (curr + len > capacity) {
this->capacity = curr + len;
this->arr = static_cast<T *>(realloc(this->arr, this->capacity * sizeof(T)));
CHECK(this->arr) << "can't allocate memory for a larger vector.";
}
std::copy(val, val + len, this->arr + curr);
curr += len;
}
/*
* Similar to std::vector::[].
* It checks the boundary of the vector.
*/
T &operator[](size_t idx) {
CHECK_LT(idx, curr);
return this->arr[idx];
}
/*
* Similar to std::vector::[].
* It checks the boundary of the vector.
*/
const T &operator[](size_t idx) const {
CHECK_LT(idx, curr);
return this->arr[idx];
}
CSR(int64_t num_vertices, int64_t expected_num_edges) { /* Similar to std::vector::size. */
indptr.resize(num_vertices + 1); size_t size() const {
indices.reserve(expected_num_edges); return this->curr;
edge_ids.reserve(expected_num_edges);
} }
/* Similar to std::vector::resize. */
void resize(size_t new_size) {
if (!this->own) {
CHECK_LE(new_size, capacity);
} else if (new_size > capacity) {
this->capacity = new_size;
this->arr = static_cast<T *>(realloc(this->arr, this->capacity * sizeof(T)));
CHECK(this->arr) << "can't allocate memory for a larger vector.";
}
for (size_t i = this->curr; i < new_size; i++)
this->arr[i] = 0;
this->curr = new_size;
}
/* Similar to std::vector::clear. */
void clear() {
this->curr = 0;
}
/* Similar to std::vector::data. */
const T *data() const {
return this->arr;
}
/* Similar to std::vector::data. */
T *data() {
return this->arr;
}
/*
* This is to simulate begin() of std::vector.
* However, it returns the raw pointer instead of iterator.
*/
const T *begin() const {
return this->arr;
}
/*
* This is to simulate begin() of std::vector.
* However, it returns the raw pointer instead of iterator.
*/
T *begin() {
return this->arr;
}
/*
* This is to simulate end() of std::vector.
* However, it returns the raw pointer instead of iterator.
*/
const T *end() const {
return this->arr + this->curr;
}
/*
* This is to simulate end() of std::vector.
* However, it returns the raw pointer instead of iterator.
*/
T *end() {
return this->arr + this->curr;
}
private:
/*
* \brief the raw array that contains elements of type T.
*
* The vector may or may not own the memory of the raw array.
*/
T *arr;
/* \brief the memory size of the raw array. */
size_t capacity;
/* \brief the number of elements in the array. */
size_t curr;
/* \brief whether the vector owns the memory. */
bool own;
};
vector<int64_t> indptr;
vector<dgl_id_t> indices;
vector<dgl_id_t> edge_ids;
CSR(int64_t num_vertices, int64_t expected_num_edges);
CSR(IdArray indptr, IdArray indices, IdArray edge_ids);
CSR(IdArray indptr, IdArray indices, IdArray edge_ids,
const std::string &shared_mem_name);
CSR(const std::string &shared_mem_name, size_t num_vertices, size_t num_edges);
bool HasVertex(dgl_id_t vid) const { bool HasVertex(dgl_id_t vid) const {
return vid < NumVertices(); return vid < NumVertices();
} }
...@@ -103,7 +283,8 @@ class ImmutableGraph: public GraphInterface { ...@@ -103,7 +283,8 @@ class ImmutableGraph: public GraphInterface {
void ReadAllEdges(std::vector<Edge> *edges) const; void ReadAllEdges(std::vector<Edge> *edges) const;
CSR::Ptr Transpose() const; CSR::Ptr Transpose() const;
std::pair<CSR::Ptr, IdArray> VertexSubgraph(IdArray vids) const; std::pair<CSR::Ptr, IdArray> VertexSubgraph(IdArray vids) const;
std::pair<CSR::Ptr, IdArray> EdgeSubgraph(IdArray eids, EdgeList::Ptr edge_list) const; std::pair<CSR::Ptr, IdArray> EdgeSubgraph(IdArray eids,
std::shared_ptr<EdgeList> edge_list) const;
/* /*
* Construct a CSR from a list of edges. * Construct a CSR from a list of edges.
* *
...@@ -112,6 +293,35 @@ class ImmutableGraph: public GraphInterface { ...@@ -112,6 +293,35 @@ class ImmutableGraph: public GraphInterface {
* which is specified by `sort_on`. * which is specified by `sort_on`.
*/ */
static CSR::Ptr FromEdges(std::vector<Edge> *edges, int sort_on, uint64_t num_nodes); static CSR::Ptr FromEdges(std::vector<Edge> *edges, int sort_on, uint64_t num_nodes);
private:
#ifndef _WIN32
std::shared_ptr<runtime::SharedMemory> mem;
#endif // _WIN32
};
// Edge list indexed by edge id;
struct EdgeList {
typedef std::shared_ptr<EdgeList> Ptr;
std::vector<dgl_id_t> src_points;
std::vector<dgl_id_t> dst_points;
EdgeList(int64_t len, dgl_id_t val) {
src_points.resize(len, val);
dst_points.resize(len, val);
}
void register_edge(dgl_id_t eid, dgl_id_t src, dgl_id_t dst) {
CHECK_LT(eid, src_points.size()) << "Invalid edge id " << eid;
src_points[eid] = src;
dst_points[eid] = dst;
}
static EdgeList::Ptr FromCSR(
const CSR::vector<int64_t>& indptr,
const CSR::vector<dgl_id_t>& indices,
const CSR::vector<dgl_id_t>& edge_ids,
bool in_csr);
}; };
/*! \brief Construct an immutable graph from the COO format. */ /*! \brief Construct an immutable graph from the COO format. */
......
...@@ -394,6 +394,29 @@ DGL_DLL int DGLArrayAlloc(const dgl_index_t* shape, ...@@ -394,6 +394,29 @@ DGL_DLL int DGLArrayAlloc(const dgl_index_t* shape,
int device_id, int device_id,
DGLArrayHandle* out); DGLArrayHandle* out);
/*!
* \brief Allocate a nd-array's with shared memory,
* including space of shape, of given spec.
*
* \param the name of the shared memory
* \param shape The shape of the array, the data content will be copied to out
* \param ndim The number of dimension of the array.
* \param dtype_code The type code of the dtype
* \param dtype_bits The number of bits of dtype
* \param dtype_lanes The number of lanes in the dtype.
* \param is_create whether the shared memory is created
* \param out The output handle.
* \return 0 when success, -1 when failure happens
*/
int DGLArrayAllocSharedMem(const char *mem_name,
const dgl_index_t *shape,
int ndim,
int dtype_code,
int dtype_bits,
int dtype_lanes,
bool is_create,
DGLArrayHandle* out);
/*! /*!
* \brief Free the DGL Array. * \brief Free the DGL Array.
* \param handle The array handle to be freed. * \param handle The array handle to be freed.
......
...@@ -6,11 +6,13 @@ ...@@ -6,11 +6,13 @@
#ifndef DGL_RUNTIME_NDARRAY_H_ #ifndef DGL_RUNTIME_NDARRAY_H_
#define DGL_RUNTIME_NDARRAY_H_ #define DGL_RUNTIME_NDARRAY_H_
#include <string>
#include <atomic> #include <atomic>
#include <vector> #include <vector>
#include <utility> #include <utility>
#include "c_runtime_api.h" #include "c_runtime_api.h"
#include "serializer.h" #include "serializer.h"
#include "shared_mem.h"
namespace dgl { namespace dgl {
namespace runtime { namespace runtime {
...@@ -147,6 +149,24 @@ class NDArray { ...@@ -147,6 +149,24 @@ class NDArray {
DGL_DLL static NDArray Empty(std::vector<int64_t> shape, DGL_DLL static NDArray Empty(std::vector<int64_t> shape,
DLDataType dtype, DLDataType dtype,
DLContext ctx); DLContext ctx);
/*!
* \brief Create an empty NDArray with shared memory.
* \param name The name of shared memory.
* \param shape The shape of the new array.
* \param dtype The data type of the new array.
* \param ctx The context of the Array.
* \param is_create whether to create shared memory.
* \return The created Array
*/
DGL_DLL static NDArray EmptyShared(const std::string &name,
std::vector<int64_t> shape,
DLDataType dtype,
DLContext ctx,
bool is_create);
/*!
* \brief Get the size of the array in the number of bytes.
*/
size_t GetSize() const;
/*! /*!
* \brief Create a NDArray backed by a dlpack tensor. * \brief Create a NDArray backed by a dlpack tensor.
* *
...@@ -207,6 +227,10 @@ struct NDArray::Container { ...@@ -207,6 +227,10 @@ struct NDArray::Container {
* The head ptr of this struct can be viewed as DLTensor*. * The head ptr of this struct can be viewed as DLTensor*.
*/ */
DLTensor dl_tensor; DLTensor dl_tensor;
#ifndef _WIN32
std::shared_ptr<SharedMemory> mem;
#endif // _WIN32
/*! /*!
* \brief addtional context, reserved for recycling * \brief addtional context, reserved for recycling
* \note We can attach additional content here * \note We can attach additional content here
......
/*!
* Copyright (c) 2017 by Contributors
* \file dgl/runtime/ndarray.h
* \brief shared memory management.
*/
#ifndef DGL_RUNTIME_SHARED_MEM_H_
#define DGL_RUNTIME_SHARED_MEM_H_
#include <string>
namespace dgl {
namespace runtime {
#ifndef _WIN32
/*
* \brief This class owns shared memory.
*
* When the object is gone, the shared memory will also be destroyed.
* When the shared memory is destroyed, the file corresponding to
* the shared memory is removed.
*/
class SharedMemory {
/*
* \brief whether the shared memory is owned by the object.
*
* If shared memory is created in the object, it'll be owned by the object
* and will be responsible for deleting it when the object is destroyed.
*/
bool own;
/*
* \brief the name of the object.
*
* In Unix, shared memory is identified by a file. Thus, `name` is actually
* the file name that identifies the shared memory.
*/
std::string name;
/* \brief the file descripter of the shared memory. */
int fd;
/* \brief the address of the shared memory. */
void *ptr;
/* \brief the size of the shared memory. */
size_t size;
public:
/*
* \brief constructor of the shared memory.
* \param name The file corresponding to the shared memory.
*/
explicit SharedMemory(const std::string &name);
/*
* \brief destructor of the shared memory.
* It deallocates the shared memory and removes the corresponding file.
*/
~SharedMemory();
/*
* \brief create shared memory.
* It creates the file and shared memory.
* \param size the size of the shared memory.
* \return the address of the shared memory
*/
void *create_new(size_t size);
/*
* \brief allocate shared memory that has been created.
* \param size the size of the shared memory.
* \return the address of the shared memory
*/
void *open(size_t size);
};
#endif // _WIN32
} // namespace runtime
} // namespace dgl
#endif // DGL_RUNTIME_SHARED_MEM_H_
...@@ -94,6 +94,14 @@ cdef extern from "dgl/runtime/c_runtime_api.h": ...@@ -94,6 +94,14 @@ cdef extern from "dgl/runtime/c_runtime_api.h":
DLDataType dtype, DLDataType dtype,
DLContext ctx, DLContext ctx,
DLTensorHandle* out) DLTensorHandle* out)
int DGLArrayAllocSharedMem(const char *mem_name,
const dgl_index_t *shape,
int ndim,
int dtype_code,
int dtype_bits,
int dtype_lanes,
bool is_create,
DGLArrayHandle* out)
int DGLArrayFree(DLTensorHandle handle) int DGLArrayFree(DLTensorHandle handle)
int DGLArrayCopyFromTo(DLTensorHandle src, int DGLArrayCopyFromTo(DLTensorHandle src,
DLTensorHandle to, DLTensorHandle to,
......
...@@ -113,6 +113,43 @@ def empty(shape, dtype="float32", ctx=context(1, 0)): ...@@ -113,6 +113,43 @@ def empty(shape, dtype="float32", ctx=context(1, 0)):
return _make_array(handle, False) return _make_array(handle, False)
def empty_shared_mem(name, is_create, shape, dtype="float32"):
"""Create an empty array with shared memory given shape and dtype
Parameters
----------
name : string
The name of the shared memory. It's a file name in Unix.
is_create : bool
Whether to create the shared memory or use the one created by somewhere else.
shape : tuple of int
The shape of the array
dtype : type or str
The data type of the array.
Returns
-------
arr : dgl.nd.NDArray
The array dgl supported.
"""
name = ctypes.c_char_p(name.encode('utf-8'))
shape = c_array(dgl_shape_index_t, shape)
ndim = ctypes.c_int(len(shape))
handle = DGLArrayHandle()
dtype = DGLType(dtype)
check_call(_LIB.DGLArrayAllocSharedMem(
name, shape, ndim,
ctypes.c_int(dtype.type_code),
ctypes.c_int(dtype.bits),
ctypes.c_int(dtype.lanes),
is_create,
ctypes.byref(handle)))
return _make_array(handle, False)
def from_dlpack(dltensor): def from_dlpack(dltensor):
"""Produce an array from a DLPack tensor without memory copy. """Produce an array from a DLPack tensor without memory copy.
Retrieves the underlying DLPack tensor's pointer to create an array from the Retrieves the underlying DLPack tensor's pointer to create an array from the
......
from . import sampling from . import sampling
from . import graph_store
import os
import time
import scipy
from xmlrpc.server import SimpleXMLRPCServer
import xmlrpc.client
import numpy as np
from collections.abc import MutableMapping
from ..base import ALL, is_all, DGLError
from .. import backend as F
from ..graph import DGLGraph
from .. import utils
from ..graph_index import GraphIndex, create_graph_index
from .._ffi.ndarray import empty_shared_mem
from .._ffi.function import _init_api
from .. import ndarray as nd
def _get_ndata_path(graph_name, ndata_name):
return "/" + graph_name + "_node_" + ndata_name
def _get_edata_path(graph_name, edata_name):
return "/" + graph_name + "_edge_" + edata_name
def _get_edata_path(graph_name, edata_name):
return "/" + graph_name + "_edge_" + edata_name
def _get_graph_path(graph_name):
return "/" + graph_name
def _move_data_to_shared_mem_array(arr, name):
dlpack = F.zerocopy_to_dlpack(arr)
dgl_tensor = nd.from_dlpack(dlpack)
new_arr = empty_shared_mem(name, True, F.shape(arr), np.dtype(F.dtype(arr)).name)
dgl_tensor.copyto(new_arr)
dlpack = new_arr.to_dlpack()
return F.zerocopy_from_dlpack(dlpack)
class NodeDataView(MutableMapping):
"""The data view class when G.nodes[...].data is called.
See Also
--------
dgl.DGLGraph.nodes
"""
__slots__ = ['_graph', '_nodes', '_graph_name']
def __init__(self, graph, nodes, graph_name):
self._graph = graph
self._nodes = nodes
self._graph_name = graph_name
def __getitem__(self, key):
return self._graph.get_n_repr(self._nodes)[key]
def __setitem__(self, key, val):
# Move the data in val to shared memory.
val = _move_data_to_shared_mem_array(val, _get_ndata_path(self._graph_name, key))
self._graph.set_n_repr({key : val}, self._nodes)
def __delitem__(self, key):
if not is_all(self._nodes):
raise DGLError('Delete feature data is not supported on only a subset'
' of nodes. Please use `del G.ndata[key]` instead.')
self._graph.pop_n_repr(key)
def __len__(self):
return len(self._graph._node_frame)
def __iter__(self):
return iter(self._graph._node_frame)
def __repr__(self):
data = self._graph.get_n_repr(self._nodes)
return repr({key : data[key] for key in self._graph._node_frame})
class EdgeDataView(MutableMapping):
"""The data view class when G.edges[...].data is called.
See Also
--------
dgl.DGLGraph.edges
"""
__slots__ = ['_graph', '_edges', '_graph_name']
def __init__(self, graph, edges, graph_name):
self._graph = graph
self._edges = edges
self._graph_name = graph_name
def __getitem__(self, key):
return self._graph.get_e_repr(self._edges)[key]
def __setitem__(self, key, val):
# Move the data in val to shared memory.
val = _move_data_to_shared_mem_array(val, _get_edata_path(self._graph_name, key))
self._graph.set_e_repr({key : val}, self._edges)
def __delitem__(self, key):
if not is_all(self._edges):
raise DGLError('Delete feature data is not supported on only a subset'
' of nodes. Please use `del G.edata[key]` instead.')
self._graph.pop_e_repr(key)
def __len__(self):
return len(self._graph._edge_frame)
def __iter__(self):
return iter(self._graph._edge_frame)
def __repr__(self):
data = self._graph.get_e_repr(self._edges)
return repr({key : data[key] for key in self._graph._edge_frame})
def _to_csr(graph_data, edge_dir, multigraph):
try:
indptr = graph_data.indptr
indices = graph_data.indices
return indptr, indices
except:
if isinstance(graph_data, scipy.sparse.spmatrix):
csr = graph_data.tocsr()
return csr.indptr, csr.indices
else:
idx = create_graph_index(graph_data=graph_data, multigraph=multigraph, readonly=True)
transpose = (edge_dir != 'in')
csr = idx.adjacency_matrix_scipy(transpose, 'csr')
return csr.indptr, csr.indices
class SharedMemoryStoreServer(object):
"""The graph store server.
The server loads graph structure and node embeddings and edge embeddings
and store them in shared memory. The loaded graph can be identified by
the graph name in the input argument.
Parameters
----------
graph_data : graph data
Data to initialize graph. Same as networkx's semantics.
edge_dir : string
the edge direction for the graph structure ("in" or "out")
graph_name : string
Define the name of the graph, so the client can use the name to access the graph.
multigraph : bool, optional
Whether the graph would be a multigraph (default: False)
num_workers : int
The number of workers that will connect to the server.
port : int
The port that the server listens to.
"""
def __init__(self, graph_data, edge_dir, graph_name, multigraph, num_workers, port):
graph_idx = GraphIndex(multigraph=multigraph, readonly=True)
indptr, indices = _to_csr(graph_data, edge_dir, multigraph)
graph_idx.from_csr_matrix(indptr, indices, edge_dir, _get_graph_path(graph_name))
self._graph = DGLGraph(graph_idx, multigraph=multigraph, readonly=True)
self._num_workers = num_workers
self._graph_name = graph_name
self._edge_dir = edge_dir
# RPC command: get the graph information from the graph store server.
def get_graph_info():
return self._graph.number_of_nodes(), self._graph.number_of_edges(), \
self._graph.is_multigraph, edge_dir
# RPC command: initialize node embedding in the server.
def init_ndata(ndata_name, shape, dtype):
if ndata_name in self._graph.ndata:
ndata = self._graph.ndata[ndata_name]
assert np.all(ndata.shape == tuple(shape))
return 0
assert self._graph.number_of_nodes() == shape[0]
data = empty_shared_mem(_get_ndata_path(graph_name, ndata_name), True, shape, dtype)
dlpack = data.to_dlpack()
self._graph.ndata[ndata_name] = F.zerocopy_from_dlpack(dlpack)
return 0
# RPC command: initialize edge embedding in the server.
def init_edata(edata_name, shape, dtype):
if edata_name in self._graph.edata:
edata = self._graph.edata[edata_name]
assert np.all(edata.shape == tuple(shape))
return 0
assert self._graph.number_of_edges() == shape[0]
data = empty_shared_mem(_get_edata_path(graph_name, edata_name), True, shape, dtype)
dlpack = data.to_dlpack()
self._graph.edata[edata_name] = F.zerocopy_from_dlpack(dlpack)
return 0
# RPC command: get the names of all node embeddings.
def list_ndata():
ndata = self._graph.ndata
return [[key, F.shape(ndata[key]), np.dtype(F.dtype(ndata[key])).name] for key in ndata]
# RPC command: get the names of all edge embeddings.
def list_edata():
edata = self._graph.edata
return [[key, F.shape(edata[key]), np.dtype(F.dtype(edata[key])).name] for key in edata]
# RPC command: notify the server of the termination of the client.
def terminate():
self._num_workers -= 1
return 0
self.server = SimpleXMLRPCServer(("localhost", port))
self.server.register_function(get_graph_info, "get_graph_info")
self.server.register_function(init_ndata, "init_ndata")
self.server.register_function(init_edata, "init_edata")
self.server.register_function(terminate, "terminate")
self.server.register_function(list_ndata, "list_ndata")
self.server.register_function(list_edata, "list_edata")
def __del__(self):
self._graph = None
@property
def ndata(self):
"""Return the data view of all the nodes.
DGLGraph.ndata is an abbreviation of DGLGraph.nodes[:].data
See Also
--------
dgl.DGLGraph.nodes
"""
return NodeDataView(self._graph, ALL, self._graph_name)
@property
def edata(self):
"""Return the data view of all the edges.
DGLGraph.data is an abbreviation of DGLGraph.edges[:].data
See Also
--------
dgl.DGLGraph.edges
"""
return EdgeDataView(self._graph, ALL, self._graph_name)
def run(self):
"""Run the graph store server.
The server runs to process RPC requests from clients.
"""
while self._num_workers > 0:
self.server.handle_request()
self._graph = None
class SharedMemoryDGLGraph(DGLGraph):
"""Shared-memory DGLGraph.
This is a client to access data in the shared-memory graph store that has loads
the graph structure and node embeddings and edge embeddings to shared memory.
It provides the DGLGraph interface.
Parameters
----------
graph_name : string
Define the name of the graph.
port : int
The port that the server listens to.
"""
def __init__(self, graph_name, port):
self._graph_name = graph_name
self._pid = os.getpid()
self.proxy = xmlrpc.client.ServerProxy("http://localhost:" + str(port) + "/")
num_nodes, num_edges, multigraph, edge_dir = self.proxy.get_graph_info()
graph_idx = GraphIndex(multigraph=multigraph, readonly=True)
graph_idx.from_shared_mem_csr_matrix(_get_graph_path(graph_name), num_nodes, num_edges, edge_dir)
super(SharedMemoryDGLGraph, self).__init__(graph_idx, multigraph=multigraph, readonly=True)
# map all ndata and edata from the server.
ndata_infos = self.proxy.list_ndata()
for name, shape, dtype in ndata_infos:
self._init_ndata(name, shape, dtype)
edata_infos = self.proxy.list_edata()
for name, shape, dtype in edata_infos:
self._init_edata(name, shape, dtype)
# Set the ndata and edata initializers.
# so that when a new node/edge embedding is created, it'll be created on the server as well.
def node_initializer(name, arr):
shape = F.shape(arr)
dtype = np.dtype(F.dtype(arr)).name
self.proxy.init_ndata(name, shape, dtype)
data = empty_shared_mem(_get_ndata_path(self._graph_name, name),
False, shape, dtype)
dlpack = data.to_dlpack()
arr1 = F.zerocopy_from_dlpack(dlpack)
arr1[:] = arr
return arr1
def edge_initializer(name, arr):
shape = F.shape(arr)
dtype = np.dtype(F.dtype(arr)).name
self.proxy.init_edata(name, shape, dtype)
data = empty_shared_mem(_get_edata_path(self._graph_name, name),
False, shape, dtype)
dlpack = data.to_dlpack()
arr1 = F.zerocopy_from_dlpack(dlpack)
arr1[:] = arr
return arr1
self._node_frame.set_remote_initializer(node_initializer)
self._edge_frame.set_remote_initializer(edge_initializer)
self._msg_frame.set_remote_initializer(edge_initializer)
def __del__(self):
if self.proxy is not None:
self.proxy.terminate()
def _init_ndata(self, ndata_name, shape, dtype):
assert self.number_of_nodes() == shape[0]
data = empty_shared_mem(_get_ndata_path(self._graph_name, ndata_name), False, shape, dtype)
dlpack = data.to_dlpack()
self.ndata[ndata_name] = F.zerocopy_from_dlpack(dlpack)
def _init_edata(self, edata_name, shape, dtype):
assert self.number_of_edges() == shape[0]
data = empty_shared_mem(_get_edata_path(self._graph_name, edata_name), False, shape, dtype)
dlpack = data.to_dlpack()
self.edata[edata_name] = F.zerocopy_from_dlpack(dlpack)
def destroy(self):
"""Destroy the graph store.
This notifies the server that this client has terminated.
"""
if self.proxy is not None:
self.proxy.terminate()
self.proxy = None
def create_graph_store_server(graph_data, graph_name, store_type, num_workers,
multigraph=False, edge_dir='in', port=8000):
"""Create the graph store server.
The server loads graph structure and node embeddings and edge embeddings.
Currently, only shared-memory graph store server is supported, so `store_type`
can only be "shared_mem".
After the server runs, the graph store clients can access the graph data
with the specified graph name.
Parameters
----------
graph_data : graph data
Data to initialize graph. Same as networkx's semantics.
graph_name : string
Define the name of the graph.
store_type : string
The type of the graph store. The current option is "shared_mem".
num_workers : int
The number of workers that will connect to the server.
multigraph : bool, optional
Whether the graph would be a multigraph (default: False)
edge_dir : string
the edge direction for the graph structure. The supported option is
"in" and "out".
port : int
The port that the server listens to.
Returns
-------
SharedMemoryStoreServer
The graph store server
"""
return SharedMemoryStoreServer(graph_data, edge_dir, graph_name, multigraph,
num_workers, port)
def create_graph_from_store(graph_name, store_type, port=8000):
"""Create a client from the graph store.
The client constructs the graph structure and node embeddings and edge embeddings
that has been loaded by the graph store server.
Currently, only shared-memory graph store server is supported, so `store_type`
can only be "shared_memory".
Parameters
----------
graph_name : string
Define the name of the graph.
store_type : string
The type of the graph store. The current option is "shared_mem".
port : int
The port that the server listens to.
Returns
-------
SharedMemoryDGLGraph
The shared-memory DGLGraph
"""
return SharedMemoryDGLGraph(graph_name, port)
_init_api("dgl.contrib.graph_store")
...@@ -210,6 +210,7 @@ class Frame(MutableMapping): ...@@ -210,6 +210,7 @@ class Frame(MutableMapping):
# If is none, then a warning will be raised # If is none, then a warning will be raised
# in the first call and zero initializer will be used later. # in the first call and zero initializer will be used later.
self._initializers = {} # per-column initializers self._initializers = {} # per-column initializers
self._remote_initializer = None
self._default_initializer = None self._default_initializer = None
def _warn_and_set_initializer(self): def _warn_and_set_initializer(self):
...@@ -251,6 +252,18 @@ class Frame(MutableMapping): ...@@ -251,6 +252,18 @@ class Frame(MutableMapping):
else: else:
self._initializers[column] = initializer self._initializers[column] = initializer
def set_remote_initializer(self, initializer):
"""Set the remote initializer when a column is added to the frame.
Initializer is a callable that returns a tensor given a local tensor and tensor name.
Parameters
----------
initializer : callable
The initializer.
"""
self._remote_initializer = initializer
@property @property
def schemes(self): def schemes(self):
"""Return a dictionary of column name to column schemes.""" """Return a dictionary of column name to column schemes."""
...@@ -329,6 +342,10 @@ class Frame(MutableMapping): ...@@ -329,6 +342,10 @@ class Frame(MutableMapping):
initializer = self.get_initializer(name) initializer = self.get_initializer(name)
init_data = initializer((self.num_rows,) + scheme.shape, scheme.dtype, init_data = initializer((self.num_rows,) + scheme.shape, scheme.dtype,
ctx, slice(0, self.num_rows)) ctx, slice(0, self.num_rows))
# If the data is backed by a remote server, we need to move data
# to the remote server.
if self._remote_initializer is not None:
init_data = self._remote_initializer(name, init_data)
self._columns[name] = Column(init_data, scheme) self._columns[name] = Column(init_data, scheme)
def add_rows(self, num_rows): def add_rows(self, num_rows):
...@@ -365,6 +382,10 @@ class Frame(MutableMapping): ...@@ -365,6 +382,10 @@ class Frame(MutableMapping):
data : Column or data convertible to Column data : Column or data convertible to Column
The column data. The column data.
""" """
# If the data is backed by a remote server, we need to move data
# to the remote server.
if self._remote_initializer is not None:
data = self._remote_initializer(name, data)
col = Column.create(data) col = Column.create(data)
if len(col) != self.num_rows: if len(col) != self.num_rows:
raise DGLError('Expected data to have %d rows, got %d.' % raise DGLError('Expected data to have %d rows, got %d.' %
...@@ -372,6 +393,8 @@ class Frame(MutableMapping): ...@@ -372,6 +393,8 @@ class Frame(MutableMapping):
self._columns[name] = col self._columns[name] = col
def _append(self, other): def _append(self, other):
assert self._remote_initializer is None, \
"We don't support append if data in the frame is mapped from a remote server."
# NOTE: `other` can be empty. # NOTE: `other` can be empty.
if self.num_rows == 0: if self.num_rows == 0:
# if no rows in current frame; append is equivalent to # if no rows in current frame; append is equivalent to
...@@ -489,6 +512,18 @@ class FrameRef(MutableMapping): ...@@ -489,6 +512,18 @@ class FrameRef(MutableMapping):
""" """
self._frame.set_initializer(initializer, column=column) self._frame.set_initializer(initializer, column=column)
def set_remote_initializer(self, initializer):
"""Set the remote initializer when a column is added to the frame.
Initializer is a callable that returns a tensor given a local tensor and tensor name.
Parameters
----------
initializer : callable
The initializer.
"""
self._frame.set_remote_initializer(initializer)
def get_initializer(self, column=None): def get_initializer(self, column=None):
"""Get the initializer for empty values for the given column. """Get the initializer for empty values for the given column.
...@@ -664,11 +699,10 @@ class FrameRef(MutableMapping): ...@@ -664,11 +699,10 @@ class FrameRef(MutableMapping):
True if the update is performed inplacely. True if the update is performed inplacely.
""" """
if self.is_span_whole_column(): if self.is_span_whole_column():
col = Column.create(data)
if self.num_columns == 0: if self.num_columns == 0:
# the frame is empty # the frame is empty
self._index = utils.toindex(slice(0, len(col))) self._index = utils.toindex(slice(0, len(data)))
self._frame[name] = col self._frame[name] = data
else: else:
if name not in self._frame: if name not in self._frame:
ctx = F.context(data) ctx = F.context(data)
......
...@@ -812,6 +812,56 @@ class GraphIndex(object): ...@@ -812,6 +812,56 @@ class GraphIndex(object):
self._init(src, dst, edge_ids, num_nodes) self._init(src, dst, edge_ids, num_nodes)
def from_csr_matrix(self, indptr, indices, edge_dir, shared_mem_name=""):
"""Load a graph from the CSR matrix.
Parameters
----------
indptr : a 1D tensor
index pointer in the CSR format
indices : a 1D tensor
column index array in the CSR format
edge_dir : string
the edge direction. The supported option is "in" and "out".
shared_mem_name : string
the name of shared memory
"""
assert self.is_readonly()
indptr = utils.toindex(indptr)
indices = utils.toindex(indices)
edge_ids = utils.toindex(F.arange(0, len(indices)))
self._handle = _CAPI_DGLGraphCSRCreate(
indptr.todgltensor(),
indices.todgltensor(),
edge_ids.todgltensor(),
shared_mem_name,
self._multigraph,
edge_dir)
def from_shared_mem_csr_matrix(self, shared_mem_name,
num_nodes, num_edges, edge_dir):
"""Load a graph from the shared memory in the CSR format.
Parameters
----------
shared_mem_name : string
the name of shared memory
num_nodes : int
the number of nodes
num_edges : int
the number of edges
edge_dir : string
the edge direction. The supported option is "in" and "out".
"""
assert self.is_readonly()
self._handle = _CAPI_DGLGraphCSRCreateMMap(
shared_mem_name,
num_nodes, num_edges,
self._multigraph,
edge_dir)
def from_edge_list(self, elist): def from_edge_list(self, elist):
"""Convert from an edge list. """Convert from an edge list.
......
...@@ -96,6 +96,45 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCreate") ...@@ -96,6 +96,45 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCreate")
*rv = ghandle; *rv = ghandle;
}); });
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCSRCreate")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
const IdArray indptr = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[0]));
const IdArray indices = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[1]));
const IdArray edge_ids = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[2]));
const std::string shared_mem_name = args[3];
const bool multigraph = static_cast<bool>(args[4]);
const std::string edge_dir = args[5];
ImmutableGraph::CSR::Ptr csr;
if (shared_mem_name.empty())
csr.reset(new ImmutableGraph::CSR(indptr, indices, edge_ids));
else
csr.reset(new ImmutableGraph::CSR(indptr, indices, edge_ids, shared_mem_name));
GraphHandle ghandle;
if (edge_dir == "in")
ghandle = new ImmutableGraph(csr, nullptr, multigraph);
else
ghandle = new ImmutableGraph(nullptr, csr, multigraph);
*rv = ghandle;
});
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCSRCreateMMap")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
const std::string shared_mem_name = args[0];
const int64_t num_vertices = args[1];
const int64_t num_edges = args[2];
const bool multigraph = static_cast<bool>(args[3]);
const std::string edge_dir = args[4];
ImmutableGraph::CSR::Ptr csr(new ImmutableGraph::CSR(shared_mem_name,
num_vertices, num_edges));
GraphHandle ghandle;
if (edge_dir == "in")
ghandle = new ImmutableGraph(csr, nullptr, multigraph);
else
ghandle = new ImmutableGraph(nullptr, csr, multigraph);
*rv = ghandle;
});
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphFree") DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphFree")
.set_body([] (DGLArgs args, DGLRetValue* rv) { .set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0]; GraphHandle ghandle = args[0];
......
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
* \brief DGL immutable graph index implementation * \brief DGL immutable graph index implementation
*/ */
#include <string.h>
#include <sys/types.h>
#include <dgl/immutable_graph.h> #include <dgl/immutable_graph.h>
#ifdef _MSC_VER #ifdef _MSC_VER
...@@ -20,6 +22,80 @@ bool binary_search(ForwardIt first, ForwardIt last, const T& value) { ...@@ -20,6 +22,80 @@ bool binary_search(ForwardIt first, ForwardIt last, const T& value) {
return (!(first == last) && !(value < *first)); return (!(first == last) && !(value < *first));
} }
ImmutableGraph::CSR::CSR(int64_t num_vertices, int64_t expected_num_edges):
indptr(num_vertices + 1), indices(expected_num_edges), edge_ids(expected_num_edges) {
indptr.resize(num_vertices + 1);
}
ImmutableGraph::CSR::CSR(IdArray indptr_arr, IdArray index_arr, IdArray edge_id_arr):
indptr(indptr_arr->shape[0]), indices(index_arr->shape[0]), edge_ids(index_arr->shape[0]) {
size_t num_vertices = indptr_arr->shape[0] - 1;
size_t num_edges = index_arr->shape[0];
const int64_t *indptr_data = static_cast<int64_t*>(indptr_arr->data);
const dgl_id_t *indices_data = static_cast<dgl_id_t*>(index_arr->data);
const dgl_id_t *edge_id_data = static_cast<dgl_id_t*>(edge_id_arr->data);
CHECK_EQ(indptr_data[0], 0);
CHECK_EQ(indptr_data[num_vertices], num_edges);
indptr.insert_back(indptr_data, num_vertices + 1);
indices.insert_back(indices_data, num_edges);
edge_ids.insert_back(edge_id_data, num_edges);
}
ImmutableGraph::CSR::CSR(IdArray indptr_arr, IdArray index_arr, IdArray edge_id_arr,
const std::string &shared_mem_name) {
#ifndef _WIN32
size_t num_vertices = indptr_arr->shape[0] - 1;
size_t num_edges = index_arr->shape[0];
CHECK_EQ(num_edges, edge_id_arr->shape[0]);
size_t file_size = (num_vertices + 1) * sizeof(int64_t) + num_edges * sizeof(dgl_id_t) * 2;
auto mem = std::make_shared<runtime::SharedMemory>(shared_mem_name);
auto ptr = mem->create_new(file_size);
int64_t *addr1 = static_cast<int64_t *>(ptr);
indptr.init(addr1, num_vertices + 1);
void *addr = addr1 + num_vertices + 1;
dgl_id_t *addr2 = static_cast<dgl_id_t *>(addr);
indices.init(addr2, num_edges);
addr = addr2 + num_edges;
dgl_id_t *addr3 = static_cast<dgl_id_t *>(addr);
edge_ids.init(addr3, num_edges);
const int64_t *indptr_data = static_cast<int64_t*>(indptr_arr->data);
const dgl_id_t *indices_data = static_cast<dgl_id_t*>(index_arr->data);
const dgl_id_t *edge_id_data = static_cast<dgl_id_t*>(edge_id_arr->data);
CHECK_EQ(indptr_data[0], 0);
CHECK_EQ(indptr_data[num_vertices], num_edges);
indptr.insert_back(indptr_data, num_vertices + 1);
indices.insert_back(indices_data, num_edges);
edge_ids.insert_back(edge_id_data, num_edges);
this->mem = mem;
#else
LOG(FATAL) << "ImmutableGraph doesn't support shared memory in Windows yet";
#endif // _WIN32
}
ImmutableGraph::CSR::CSR(const std::string &shared_mem_name,
size_t num_vertices, size_t num_edges) {
#ifndef _WIN32
size_t file_size = (num_vertices + 1) * sizeof(int64_t) + num_edges * sizeof(dgl_id_t) * 2;
auto mem = std::make_shared<runtime::SharedMemory>(shared_mem_name);
auto ptr = mem->open(file_size);
int64_t *addr1 = static_cast<int64_t *>(ptr);
indptr.init(addr1, num_vertices + 1, num_vertices + 1);
void *addr = addr1 + num_vertices + 1;
dgl_id_t *addr2 = static_cast<dgl_id_t *>(addr);
indices.init(addr2, num_edges, num_edges);
addr = addr2 + num_edges;
dgl_id_t *addr3 = static_cast<dgl_id_t *>(addr);
edge_ids.init(addr3, num_edges, num_edges);
this->mem = mem;
#else
LOG(FATAL) << "ImmutableGraph doesn't support shared memory in Windows yet";
#endif // _WIN32
}
ImmutableGraph::EdgeArray ImmutableGraph::CSR::GetEdges(dgl_id_t vid) const { ImmutableGraph::EdgeArray ImmutableGraph::CSR::GetEdges(dgl_id_t vid) const {
CHECK(HasVertex(vid)) << "invalid vertex: " << vid; CHECK(HasVertex(vid)) << "invalid vertex: " << vid;
const int64_t off = this->indptr[vid]; const int64_t off = this->indptr[vid];
...@@ -57,10 +133,12 @@ ImmutableGraph::EdgeArray ImmutableGraph::CSR::GetEdges(IdArray vids) const { ...@@ -57,10 +133,12 @@ ImmutableGraph::EdgeArray ImmutableGraph::CSR::GetEdges(IdArray vids) const {
for (int64_t i = 0; i < len; ++i) { for (int64_t i = 0; i < len; ++i) {
dgl_id_t vid = vid_data[i]; dgl_id_t vid = vid_data[i];
int64_t off = this->indptr[vid]; int64_t off = this->indptr[vid];
const int64_t len = this->GetDegree(vid); const int64_t deg = this->GetDegree(vid);
if (deg == 0)
continue;
const auto *pred = &this->indices[off]; const auto *pred = &this->indices[off];
const auto *eids = &this->edge_ids[off]; const auto *eids = &this->edge_ids[off];
for (int64_t j = 0; j < len; ++j) { for (int64_t j = 0; j < deg; ++j) {
*(src_ptr++) = pred[j]; *(src_ptr++) = pred[j];
*(dst_ptr++) = vid; *(dst_ptr++) = vid;
*(eid_ptr++) = eids[j]; *(eid_ptr++) = eids[j];
...@@ -119,7 +197,7 @@ class HashTableChecker { ...@@ -119,7 +197,7 @@ class HashTableChecker {
* and the source vertex. `col_idx` and `orig_eids` store the collected edges. * and the source vertex. `col_idx` and `orig_eids` store the collected edges.
*/ */
void Collect(const dgl_id_t old_id, const dgl_id_t old_eid, void Collect(const dgl_id_t old_id, const dgl_id_t old_eid,
std::vector<dgl_id_t> *col_idx, ImmutableGraph::CSR::vector<dgl_id_t> *col_idx,
std::vector<dgl_id_t> *orig_eids) { std::vector<dgl_id_t> *orig_eids) {
if (!map.test(old_id)) if (!map.test(old_id))
return; return;
...@@ -147,7 +225,7 @@ class HashTableChecker { ...@@ -147,7 +225,7 @@ class HashTableChecker {
* The collected edges are stored in `new_neigh_idx` and `orig_eids`. * The collected edges are stored in `new_neigh_idx` and `orig_eids`.
*/ */
void CollectOnRow(const dgl_id_t neigh_idx[], const dgl_id_t eids[], size_t row_len, void CollectOnRow(const dgl_id_t neigh_idx[], const dgl_id_t eids[], size_t row_len,
std::vector<dgl_id_t> *new_neigh_idx, ImmutableGraph::CSR::vector<dgl_id_t> *new_neigh_idx,
std::vector<dgl_id_t> *orig_eids) { std::vector<dgl_id_t> *orig_eids) {
// TODO(zhengda) I need to make sure the column index in each row is sorted. // TODO(zhengda) I need to make sure the column index in each row is sorted.
for (size_t j = 0; j < row_len; ++j) { for (size_t j = 0; j < row_len; ++j) {
...@@ -159,9 +237,9 @@ class HashTableChecker { ...@@ -159,9 +237,9 @@ class HashTableChecker {
}; };
ImmutableGraph::EdgeList::Ptr ImmutableGraph::EdgeList::FromCSR( ImmutableGraph::EdgeList::Ptr ImmutableGraph::EdgeList::FromCSR(
const std::vector<int64_t>& indptr, const CSR::vector<int64_t>& indptr,
const std::vector<dgl_id_t>& indices, const CSR::vector<dgl_id_t>& indices,
const std::vector<dgl_id_t>& edge_ids, const CSR::vector<dgl_id_t>& edge_ids,
bool in_csr) { bool in_csr) {
const auto n = indptr.size() - 1; const auto n = indptr.size() - 1;
const auto len = edge_ids.size(); const auto len = edge_ids.size();
...@@ -288,6 +366,9 @@ ImmutableGraph::CSR::Ptr ImmutableGraph::CSR::FromEdges(std::vector<Edge> *edges ...@@ -288,6 +366,9 @@ ImmutableGraph::CSR::Ptr ImmutableGraph::CSR::FromEdges(std::vector<Edge> *edges
void ImmutableGraph::CSR::ReadAllEdges(std::vector<Edge> *edges) const { void ImmutableGraph::CSR::ReadAllEdges(std::vector<Edge> *edges) const {
edges->resize(NumEdges()); edges->resize(NumEdges());
for (size_t i = 0; i < NumVertices(); i++) { for (size_t i = 0; i < NumVertices(); i++) {
// If all the remaining nodes don't have edges.
if (indptr[i] == indptr[NumVertices()])
break;
const dgl_id_t *indices_begin = &indices[indptr[i]]; const dgl_id_t *indices_begin = &indices[indptr[i]];
const dgl_id_t *eid_begin = &edge_ids[indptr[i]]; const dgl_id_t *eid_begin = &edge_ids[indptr[i]];
for (size_t j = 0; j < GetDegree(i); j++) { for (size_t j = 0; j < GetDegree(i); j++) {
......
...@@ -601,9 +601,9 @@ namespace { ...@@ -601,9 +601,9 @@ namespace {
const dgl_id_t *eids, const dgl_id_t *eids,
const std::vector<dgl_id_t> &node_mapping, const std::vector<dgl_id_t> &node_mapping,
const std::vector<int64_t> &actl_layer_sizes, const std::vector<int64_t> &actl_layer_sizes,
std::vector<int64_t> *sub_indptr, ImmutableGraph::CSR::vector<int64_t> *sub_indptr,
std::vector<dgl_id_t> *sub_indices, ImmutableGraph::CSR::vector<dgl_id_t> *sub_indices,
std::vector<dgl_id_t> *sub_eids, ImmutableGraph::CSR::vector<dgl_id_t> *sub_eids,
std::vector<dgl_id_t> *flow_offsets, std::vector<dgl_id_t> *flow_offsets,
std::vector<dgl_id_t> *edge_mapping) { std::vector<dgl_id_t> *edge_mapping) {
/* /*
...@@ -611,7 +611,8 @@ namespace { ...@@ -611,7 +611,8 @@ namespace {
* subgraphs (flows) between consecutive layers. * subgraphs (flows) between consecutive layers.
*/ */
auto n_flows = actl_layer_sizes.size() - 1; auto n_flows = actl_layer_sizes.size() - 1;
sub_indptr->insert(sub_indptr->end(), actl_layer_sizes.front() + 1, 0); for (int64_t i = 0; i < actl_layer_sizes.front() + 1; i++)
sub_indptr->push_back(0);
flow_offsets->push_back(0); flow_offsets->push_back(0);
int64_t first = 0; int64_t first = 0;
for (size_t i = 0; i < n_flows; ++i) { for (size_t i = 0; i < n_flows; ++i) {
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
* \file ndarray.cc * \file ndarray.cc
* \brief NDArray container infratructure. * \brief NDArray container infratructure.
*/ */
#include <string.h>
#include <dmlc/logging.h> #include <dmlc/logging.h>
#include <dgl/runtime/ndarray.h> #include <dgl/runtime/ndarray.h>
#include <dgl/runtime/c_runtime_api.h> #include <dgl/runtime/c_runtime_api.h>
...@@ -46,6 +47,10 @@ struct NDArray::Internal { ...@@ -46,6 +47,10 @@ struct NDArray::Internal {
using dgl::runtime::NDArray; using dgl::runtime::NDArray;
if (ptr->manager_ctx != nullptr) { if (ptr->manager_ctx != nullptr) {
static_cast<NDArray::Container*>(ptr->manager_ctx)->DecRef(); static_cast<NDArray::Container*>(ptr->manager_ctx)->DecRef();
#ifndef _WIN32
} else if (ptr->mem) {
ptr->mem = nullptr;
#endif // _WIN32
} else if (ptr->dl_tensor.data != nullptr) { } else if (ptr->dl_tensor.data != nullptr) {
dgl::runtime::DeviceAPI::Get(ptr->dl_tensor.ctx)->FreeDataSpace( dgl::runtime::DeviceAPI::Get(ptr->dl_tensor.ctx)->FreeDataSpace(
ptr->dl_tensor.ctx, ptr->dl_tensor.data); ptr->dl_tensor.ctx, ptr->dl_tensor.data);
...@@ -112,6 +117,10 @@ struct NDArray::Internal { ...@@ -112,6 +117,10 @@ struct NDArray::Internal {
} }
}; };
size_t NDArray::GetSize() const {
return GetDataSize(data_->dl_tensor);
}
NDArray NDArray::CreateView(std::vector<int64_t> shape, NDArray NDArray::CreateView(std::vector<int64_t> shape,
DLDataType dtype) { DLDataType dtype) {
CHECK(data_ != nullptr); CHECK(data_ != nullptr);
...@@ -135,6 +144,28 @@ DLManagedTensor* NDArray::ToDLPack() const { ...@@ -135,6 +144,28 @@ DLManagedTensor* NDArray::ToDLPack() const {
return Internal::ToDLPack(data_); return Internal::ToDLPack(data_);
} }
NDArray NDArray::EmptyShared(const std::string &name,
std::vector<int64_t> shape,
DLDataType dtype,
DLContext ctx, bool is_create) {
NDArray ret = Internal::Create(shape, dtype, ctx);
// setup memory content
size_t size = GetDataSize(ret.data_->dl_tensor);
#ifndef _WIN32
auto mem = std::make_shared<SharedMemory>(name);
if (is_create) {
ret.data_->dl_tensor.data = mem->create_new(size);
} else {
ret.data_->dl_tensor.data = mem->open(size);
}
ret.data_->mem = mem;
#else
LOG(FATAL) << "Windows doesn't support NDArray with shared memory";
#endif // _WIN32
return ret;
}
NDArray NDArray::Empty(std::vector<int64_t> shape, NDArray NDArray::Empty(std::vector<int64_t> shape,
DLDataType dtype, DLDataType dtype,
DLContext ctx) { DLContext ctx) {
...@@ -210,6 +241,26 @@ int DGLArrayAlloc(const dgl_index_t* shape, ...@@ -210,6 +241,26 @@ int DGLArrayAlloc(const dgl_index_t* shape,
API_END(); API_END();
} }
int DGLArrayAllocSharedMem(const char *mem_name,
const dgl_index_t *shape,
int ndim,
int dtype_code,
int dtype_bits,
int dtype_lanes,
bool is_create,
DGLArrayHandle* out) {
API_BEGIN();
DLDataType dtype;
dtype.code = static_cast<uint8_t>(dtype_code);
dtype.bits = static_cast<uint8_t>(dtype_bits);
dtype.lanes = static_cast<uint16_t>(dtype_lanes);
std::vector<int64_t> shape_vec(shape, shape + ndim);
NDArray arr = NDArray::EmptyShared(mem_name, shape_vec, dtype,
DLContext{kDLCPU, 0}, is_create);
*out = NDArray::Internal::MoveAsDLTensor(arr);
API_END();
}
int DGLArrayFree(DGLArrayHandle handle) { int DGLArrayFree(DGLArrayHandle handle) {
API_BEGIN(); API_BEGIN();
reinterpret_cast<NDArray::Container*>(handle)->DecRef(); reinterpret_cast<NDArray::Container*>(handle)->DecRef();
......
/*!
* Copyright (c) 2019 by Contributors
* \file shared_mem.cc
* \brief Shared memory management.
*/
#ifndef _WIN32
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
#endif
#include <stdio.h>
#include <string.h>
#include <dmlc/logging.h>
#include <dgl/runtime/shared_mem.h>
namespace dgl {
namespace runtime {
#ifndef _WIN32
SharedMemory::SharedMemory(const std::string &name) {
this->name = name;
this->own = false;
this->fd = -1;
this->ptr = nullptr;
this->size = 0;
}
SharedMemory::~SharedMemory() {
munmap(ptr, size);
close(fd);
if (own) {
LOG(INFO) << "remove " << name << " for shared memory";
shm_unlink(name.c_str());
}
}
void *SharedMemory::create_new(size_t size) {
this->own = true;
int flag = O_RDWR|O_EXCL|O_CREAT;
fd = shm_open(name.c_str(), flag, S_IRUSR | S_IWUSR);
CHECK_NE(fd, -1) << "fail to open " << name << ": " << strerror(errno);
auto res = ftruncate(fd, size);
CHECK_NE(res, -1)
<< "Failed to truncate the file. " << strerror(errno);
ptr = mmap(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
CHECK_NE(ptr, MAP_FAILED)
<< "Failed to map shared memory. mmap failed with error " << strerror(errno);
return ptr;
}
void *SharedMemory::open(size_t size) {
int flag = O_RDWR;
fd = shm_open(name.c_str(), flag, S_IRUSR | S_IWUSR);
CHECK_NE(fd, -1) << "fail to open " << name << ": " << strerror(errno);
ptr = mmap(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
CHECK_NE(ptr, MAP_FAILED)
<< "Failed to map shared memory. mmap failed with error " << strerror(errno);
return ptr;
}
#endif // _WIN32
} // namespace runtime
} // namespace dgl
import os
import backend as F import backend as F
import networkx as nx import networkx as nx
import numpy as np import numpy as np
...@@ -153,8 +154,37 @@ def test_create_graph(): ...@@ -153,8 +154,37 @@ def test_create_graph():
for edge in elist: for edge in elist:
assert g.edge_id(edge[0], edge[1]) == ig.edge_id(edge[0], edge[1]) assert g.edge_id(edge[0], edge[1]) == ig.edge_id(edge[0], edge[1])
def test_load_csr():
n = 100
csr = (sp.sparse.random(n, n, density=0.1, format='csr') != 0).astype(np.int64)
# Load CSR normally.
idx = dgl.graph_index.GraphIndex(multigraph=False, readonly=True)
idx.from_csr_matrix(csr.indptr, csr.indices, 'out')
assert idx.number_of_nodes() == n
assert idx.number_of_edges() == csr.nnz
src, dst, eid = idx.edges()
src, dst, eid = src.tousertensor(), dst.tousertensor(), eid.tousertensor()
coo = csr.tocoo()
assert np.all(F.asnumpy(src) == coo.row)
assert np.all(F.asnumpy(dst) == coo.col)
# Load CSR to shared memory.
# Shared memory isn't supported in Windows.
if os.name is not 'nt':
idx = dgl.graph_index.GraphIndex(multigraph=False, readonly=True)
idx.from_csr_matrix(csr.indptr, csr.indices, 'out', '/test_graph_struct')
assert idx.number_of_nodes() == n
assert idx.number_of_edges() == csr.nnz
src, dst, eid = idx.edges()
src, dst, eid = src.tousertensor(), dst.tousertensor(), eid.tousertensor()
coo = csr.tocoo()
assert np.all(F.asnumpy(src) == coo.row)
assert np.all(F.asnumpy(dst) == coo.col)
if __name__ == '__main__': if __name__ == '__main__':
test_basics() test_basics()
test_graph_gen() test_graph_gen()
test_node_subgraph() test_node_subgraph()
test_create_graph() test_create_graph()
test_load_csr()
import dgl
import time
import numpy as np
from multiprocessing import Process
from scipy import sparse as spsp
import mxnet as mx
import backend as F
num_nodes = 100
num_edges = int(num_nodes * num_nodes * 0.1)
def worker_func(worker_id):
time.sleep(3)
print("worker starts")
np.random.seed(0)
csr = (spsp.random(num_nodes, num_nodes, density=0.1, format='csr') != 0).astype(np.int64)
g = dgl.contrib.graph_store.create_graph_from_store("test_graph5", "shared_mem")
# Verify the graph structure loaded from the shared memory.
src, dst = g.all_edges()
coo = csr.tocoo()
assert F.array_equal(dst, F.tensor(coo.row))
assert F.array_equal(src, F.tensor(coo.col))
assert F.array_equal(g.ndata['feat'][0], F.tensor(np.arange(10), dtype=np.float32))
assert F.array_equal(g.edata['feat'][0], F.tensor(np.arange(10), dtype=np.float32))
g.ndata['test4'] = mx.nd.zeros((g.number_of_nodes(), 10))
g.edata['test4'] = mx.nd.zeros((g.number_of_edges(), 10))
if worker_id == 0:
time.sleep(3)
g.ndata['test4'][0] = 1
g.edata['test4'][0] = 2
else:
time.sleep(5)
assert np.all(g.ndata['test4'][0].asnumpy() == 1)
assert np.all(g.edata['test4'][0].asnumpy() == 2)
g.destroy()
def server_func(num_workers):
print("server starts")
np.random.seed(0)
csr = (spsp.random(num_nodes, num_nodes, density=0.1, format='csr') != 0).astype(np.int64)
g = dgl.contrib.graph_store.create_graph_store_server(csr, "test_graph5", "shared_mem", num_workers,
False, edge_dir="in")
assert num_nodes == g._graph.number_of_nodes()
assert num_edges == g._graph.number_of_edges()
g.ndata['feat'] = mx.nd.arange(num_nodes * 10).reshape((num_nodes, 10))
g.edata['feat'] = mx.nd.arange(num_edges * 10).reshape((num_edges, 10))
g.run()
def test_worker_server():
serv_p = Process(target=server_func, args=(2,))
work_p1 = Process(target=worker_func, args=(0,))
work_p2 = Process(target=worker_func, args=(1,))
serv_p.start()
work_p1.start()
work_p2.start()
serv_p.join()
work_p1.join()
work_p2.join()
if __name__ == '__main__':
test_worker_server()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment