Unverified Commit 619d735d authored by Hongzhi (Steve), Chen's avatar Hongzhi (Steve), Chen Committed by GitHub
Browse files

[Misc] Replace \xxx with @XXX in structured comment. (#4822)



* param

* brief

* note

* return

* tparam

* brief2

* file

* return2

* return

* blabla

* all
Co-authored-by: default avatarSteve <ubuntu@ip-172-31-34-29.ap-northeast-1.compute.internal>
parent 96297fb8
/*!
* Copyright (c) 2018 by Contributors
* \file graph/traversal.cc
* \brief Graph traversal implementation
* @file graph/traversal.cc
* @brief Graph traversal implementation
*/
#include "./traversal.h"
......@@ -89,7 +89,7 @@ IdArray ComputeMergedSections(const std::vector<std::vector<DType>>& traces) {
} // namespace
/*!
* \brief Class for representing frontiers.
* @brief Class for representing frontiers.
*
* Each frontier is a list of nodes/edges (specified by their ids).
* An optional tag can be specified on each node/edge (represented by an int
......
/*!
* Copyright (c) 2018 by Contributors
* \file graph/traversal.h
* \brief Graph traversal routines.
* @file graph/traversal.h
* @brief Graph traversal routines.
*
* Traversal routines generate frontiers. Frontiers can be node frontiers or
* edge frontiers depending on the traversal function. Each frontier is a list
......@@ -21,7 +21,7 @@ namespace dgl {
namespace traverse {
/*!
* \brief Traverse the graph in a breadth-first-search (BFS) order.
* @brief Traverse the graph in a breadth-first-search (BFS) order.
*
* The queue object must suffice following interface:
* Members:
......@@ -38,12 +38,12 @@ namespace traverse {
* The frontier function must be compatible with following interface:
* void (*make_frontier)(void);
*
* \param graph The graph.
* \param sources Source nodes.
* \param reversed If true, BFS follows the in-edge direction.
* \param queue The queue used to do bfs.
* \param visit The function to call when a node is visited.
* \param make_frontier The function to indicate that a new froniter can be
* @param graph The graph.
* @param sources Source nodes.
* @param reversed If true, BFS follows the in-edge direction.
* @param queue The queue used to do bfs.
* @param visit The function to call when a node is visited.
* @param make_frontier The function to indicate that a new froniter can be
* made.
*/
template <typename Queue, typename VisitFn, typename FrontierFn>
......@@ -82,7 +82,7 @@ void BFSNodes(
}
/*!
* \brief Traverse the graph in a breadth-first-search (BFS) order, returning
* @brief Traverse the graph in a breadth-first-search (BFS) order, returning
* the edges of the BFS tree.
*
* The queue object must suffice following interface:
......@@ -100,13 +100,13 @@ void BFSNodes(
* The frontier function must be compatible with following interface:
* void (*make_frontier)(void);
*
* \param graph The graph.
* \param sources Source nodes.
* \param reversed If true, BFS follows the in-edge direction.
* \param queue The queue used to do bfs.
* \param visit The function to call when a node is visited.
* @param graph The graph.
* @param sources Source nodes.
* @param reversed If true, BFS follows the in-edge direction.
* @param queue The queue used to do bfs.
* @param visit The function to call when a node is visited.
* The argument would be edge ID.
* \param make_frontier The function to indicate that a new frontier can be
* @param make_frontier The function to indicate that a new frontier can be
* made.
*/
template <typename Queue, typename VisitFn, typename FrontierFn>
......@@ -146,7 +146,7 @@ void BFSEdges(
}
/*!
* \brief Traverse the graph in topological order.
* @brief Traverse the graph in topological order.
*
* The queue object must suffice following interface:
* Members:
......@@ -163,11 +163,11 @@ void BFSEdges(
* The frontier function must be compatible with following interface:
* void (*make_frontier)(void);
*
* \param graph The graph.
* \param reversed If true, follows the in-edge direction.
* \param queue The queue used to do bfs.
* \param visit The function to call when a node is visited.
* \param make_frontier The function to indicate that a new froniter can be
* @param graph The graph.
* @param reversed If true, follows the in-edge direction.
* @param queue The queue used to do bfs.
* @param visit The function to call when a node is visited.
* @param make_frontier The function to indicate that a new froniter can be
* made.
*/
template <typename Queue, typename VisitFn, typename FrontierFn>
......@@ -219,7 +219,7 @@ enum DFSEdgeTag {
kNonTree,
};
/*!
* \brief Traverse the graph in a depth-first-search (DFS) order.
* @brief Traverse the graph in a depth-first-search (DFS) order.
*
* The traversal visit edges in its DFS order. Edges have three tags:
* FORWARD(0), REVERSE(1), NONTREE(2).
......@@ -229,11 +229,11 @@ enum DFSEdgeTag {
* edge is in the DFS tree. A NONTREE edge is one in which both `u` and `v` have
* been visisted but the edge is NOT in the DFS tree.
*
* \param source Source node.
* \param reversed If true, DFS follows the in-edge direction.
* \param has_reverse_edge If true, REVERSE edges are included.
* \param has_nontree_edge If true, NONTREE edges are included.
* \param visit The function to call when an edge is visited; the edge id and
* @param source Source node.
* @param reversed If true, DFS follows the in-edge direction.
* @param has_reverse_edge If true, REVERSE edges are included.
* @param has_nontree_edge If true, NONTREE edges are included.
* @param visit The function to call when an edge is visited; the edge id and
* its tag will be given as the arguments.
*/
template <typename VisitFn>
......
/*!
* Copyright (c) 2019 by Contributors
* \file graph/unit_graph.cc
* \brief UnitGraph graph implementation
* @file graph/unit_graph.cc
* @brief UnitGraph graph implementation
*/
#include <dgl/array.h>
#include <dgl/base_heterograph.h>
......@@ -161,17 +161,17 @@ class UnitGraph::COO : public BaseHeteroGraph {
}
/*! \brief Pin the adj_: COOMatrix of the COO graph. */
/*! @brief Pin the adj_: COOMatrix of the COO graph. */
void PinMemory_() {
adj_.PinMemory_();
}
/*! \brief Unpin the adj_: COOMatrix of the COO graph. */
/*! @brief Unpin the adj_: COOMatrix of the COO graph. */
void UnpinMemory_() {
adj_.UnpinMemory_();
}
/*! \brief Record stream for the adj_: COOMatrix of the COO graph. */
/*! @brief Record stream for the adj_: COOMatrix of the COO graph. */
void RecordStream(DGLStreamHandle stream) override {
adj_.RecordStream(stream);
}
......@@ -433,7 +433,7 @@ class UnitGraph::COO : public BaseHeteroGraph {
}
/*!
* \brief Determines whether the graph is "hypersparse", i.e. having significantly more
* @brief Determines whether the graph is "hypersparse", i.e. having significantly more
* nodes than edges.
*/
bool IsHypersparse() const {
......@@ -457,7 +457,7 @@ class UnitGraph::COO : public BaseHeteroGraph {
private:
friend class Serializer;
/*! \brief internal adjacency matrix. Data array is empty */
/*! @brief internal adjacency matrix. Data array is empty */
aten::COOMatrix adj_;
};
......@@ -467,7 +467,7 @@ class UnitGraph::COO : public BaseHeteroGraph {
//
//////////////////////////////////////////////////////////
/*! \brief CSR graph */
/*! @brief CSR graph */
class UnitGraph::CSR : public BaseHeteroGraph {
public:
CSR(GraphPtr metagraph, int64_t num_src, int64_t num_dst,
......@@ -571,17 +571,17 @@ class UnitGraph::CSR : public BaseHeteroGraph {
}
}
/*! \brief Pin the adj_: CSRMatrix of the CSR graph. */
/*! @brief Pin the adj_: CSRMatrix of the CSR graph. */
void PinMemory_() {
adj_.PinMemory_();
}
/*! \brief Unpin the adj_: CSRMatrix of the CSR graph. */
/*! @brief Unpin the adj_: CSRMatrix of the CSR graph. */
void UnpinMemory_() {
adj_.UnpinMemory_();
}
/*! \brief Record stream for the adj_: CSRMatrix of the CSR graph. */
/*! @brief Record stream for the adj_: CSRMatrix of the CSR graph. */
void RecordStream(DGLStreamHandle stream) override {
adj_.RecordStream(stream);
}
......@@ -851,7 +851,7 @@ class UnitGraph::CSR : public BaseHeteroGraph {
private:
friend class Serializer;
/*! \brief internal adjacency matrix. Data array stores edge ids */
/*! @brief internal adjacency matrix. Data array stores edge ids */
aten::CSRMatrix adj_;
};
......
/*!
* Copyright (c) 2019 by Contributors
* \file graph/unit_graph.h
* \brief UnitGraph graph
* @file graph/unit_graph.h
* @brief UnitGraph graph
*/
#ifndef DGL_GRAPH_UNIT_GRAPH_H_
......@@ -27,7 +27,7 @@ class UnitGraph;
typedef std::shared_ptr<UnitGraph> UnitGraphPtr;
/*!
* \brief UnitGraph graph
* @brief UnitGraph graph
*
* UnitGraph graph is a special type of heterograph which
* (1) Have two types of nodes: "Src" and "Dst". All the edges are
......@@ -164,7 +164,7 @@ class UnitGraph : public BaseHeteroGraph {
const std::vector<IdArray>& eids, bool preserve_nodes = false) const override;
// creators
/*! \brief Create a graph with no edges */
/*! @brief Create a graph with no edges */
static HeteroGraphPtr Empty(
int64_t num_vtypes, int64_t num_src, int64_t num_dst,
DGLDataType dtype, DGLContext ctx) {
......@@ -173,7 +173,7 @@ class UnitGraph : public BaseHeteroGraph {
return CreateFromCOO(num_vtypes, num_src, num_dst, row, col);
}
/*! \brief Create a graph from COO arrays */
/*! @brief Create a graph from COO arrays */
static HeteroGraphPtr CreateFromCOO(
int64_t num_vtypes, int64_t num_src, int64_t num_dst,
IdArray row, IdArray col, bool row_sorted = false,
......@@ -183,7 +183,7 @@ class UnitGraph : public BaseHeteroGraph {
int64_t num_vtypes, const aten::COOMatrix& mat,
dgl_format_code_t formats = ALL_CODE);
/*! \brief Create a graph from (out) CSR arrays */
/*! @brief Create a graph from (out) CSR arrays */
static HeteroGraphPtr CreateFromCSR(
int64_t num_vtypes, int64_t num_src, int64_t num_dst,
IdArray indptr, IdArray indices, IdArray edge_ids,
......@@ -193,7 +193,7 @@ class UnitGraph : public BaseHeteroGraph {
int64_t num_vtypes, const aten::CSRMatrix& mat,
dgl_format_code_t formats = ALL_CODE);
/*! \brief Create a graph from (in) CSC arrays */
/*! @brief Create a graph from (in) CSC arrays */
static HeteroGraphPtr CreateFromCSC(
int64_t num_vtypes, int64_t num_src, int64_t num_dst,
IdArray indptr, IdArray indices, IdArray edge_ids,
......@@ -203,15 +203,15 @@ class UnitGraph : public BaseHeteroGraph {
int64_t num_vtypes, const aten::CSRMatrix& mat,
dgl_format_code_t formats = ALL_CODE);
/*! \brief Convert the graph to use the given number of bits for storage */
/*! @brief Convert the graph to use the given number of bits for storage */
static HeteroGraphPtr AsNumBits(HeteroGraphPtr g, uint8_t bits);
/*! \brief Copy the data to another context */
/*! @brief Copy the data to another context */
static HeteroGraphPtr CopyTo(HeteroGraphPtr g, const DGLContext &ctx);
/*!
* \brief Pin the in_csr_, out_scr_ and coo_ of the current graph.
* \note The graph will be pinned inplace. Behavior depends on the current context,
* @brief Pin the in_csr_, out_scr_ and coo_ of the current graph.
* @note The graph will be pinned inplace. Behavior depends on the current context,
* kDGLCPU: will be pinned;
* IsPinned: directly return;
* kDGLCUDA: invalid, will throw an error.
......@@ -220,8 +220,8 @@ class UnitGraph : public BaseHeteroGraph {
void PinMemory_() override;
/*!
* \brief Unpin the in_csr_, out_scr_ and coo_ of the current graph.
* \note The graph will be unpinned inplace. Behavior depends on the current context,
* @brief Unpin the in_csr_, out_scr_ and coo_ of the current graph.
* @note The graph will be unpinned inplace. Behavior depends on the current context,
* IsPinned: will be unpinned;
* others: directly return.
* The context check is deferred to unpinning the NDArray.
......@@ -229,42 +229,42 @@ class UnitGraph : public BaseHeteroGraph {
void UnpinMemory_();
/*!
* \brief Record stream for this graph.
* \param stream The stream that is using the graph
* @brief Record stream for this graph.
* @param stream The stream that is using the graph
*/
void RecordStream(DGLStreamHandle stream) override;
/*!
* \brief Create in-edge CSR format of the unit graph.
* \param inplace if true and the in-edge CSR format does not exist, the created
* @brief Create in-edge CSR format of the unit graph.
* @param inplace if true and the in-edge CSR format does not exist, the created
* format will be cached in this object unless the format is restricted.
* \return Return the in-edge CSR format. Create from other format if not exist.
* @return Return the in-edge CSR format. Create from other format if not exist.
*/
CSRPtr GetInCSR(bool inplace = true) const;
/*!
* \brief Create out-edge CSR format of the unit graph.
* \param inplace if true and the out-edge CSR format does not exist, the created
* @brief Create out-edge CSR format of the unit graph.
* @param inplace if true and the out-edge CSR format does not exist, the created
* format will be cached in this object unless the format is restricted.
* \return Return the out-edge CSR format. Create from other format if not exist.
* @return Return the out-edge CSR format. Create from other format if not exist.
*/
CSRPtr GetOutCSR(bool inplace = true) const;
/*!
* \brief Create COO format of the unit graph.
* \param inplace if true and the COO format does not exist, the created
* @brief Create COO format of the unit graph.
* @param inplace if true and the COO format does not exist, the created
* format will be cached in this object unless the format is restricted.
* \return Return the COO format. Create from other format if not exist.
* @return Return the COO format. Create from other format if not exist.
*/
COOPtr GetCOO(bool inplace = true) const;
/*! \return Return the COO matrix form */
/*! @return Return the COO matrix form */
aten::COOMatrix GetCOOMatrix(dgl_type_t etype) const override;
/*! \return Return the in-edge CSC in the matrix form */
/*! @return Return the in-edge CSC in the matrix form */
aten::CSRMatrix GetCSCMatrix(dgl_type_t etype) const override;
/*! \return Return the out-edge CSR in the matrix form */
/*! @return Return the out-edge CSR in the matrix form */
aten::CSRMatrix GetCSRMatrix(dgl_type_t etype) const override;
SparseFormat SelectFormat(dgl_type_t etype, dgl_format_code_t preferred_formats) const override {
......@@ -272,10 +272,10 @@ class UnitGraph : public BaseHeteroGraph {
}
/*!
* \brief Return the graph in the given format. Perform format conversion if the
* @brief Return the graph in the given format. Perform format conversion if the
* requested format does not exist.
*
* \return A graph in the requested format.
* @return A graph in the requested format.
*/
HeteroGraphPtr GetFormat(SparseFormat format) const;
......@@ -285,19 +285,19 @@ class UnitGraph : public BaseHeteroGraph {
HeteroGraphPtr GetGraphInFormat(dgl_format_code_t formats) const override;
/*! \return Load UnitGraph from stream, using CSRMatrix*/
/*! @return Load UnitGraph from stream, using CSRMatrix*/
bool Load(dmlc::Stream* fs);
/*! \return Save UnitGraph to stream, using CSRMatrix */
/*! @return Save UnitGraph to stream, using CSRMatrix */
void Save(dmlc::Stream* fs) const;
/*! \brief Creat a LineGraph of self */
/*! @brief Creat a LineGraph of self */
HeteroGraphPtr LineGraph(bool backtracking) const;
/*! \return the reversed graph */
/*! @return the reversed graph */
UnitGraphPtr Reverse() const;
/*! \return the simpled (no-multi-edge) graph
/*! @return the simpled (no-multi-edge) graph
* the count recording the number of duplicated edges from the original graph.
* the edge mapping from the edge IDs of original graph to those of the
* returned graph.
......@@ -320,25 +320,25 @@ class UnitGraph : public BaseHeteroGraph {
UnitGraph() {}
/*!
* \brief constructor
* \param metagraph metagraph
* \param in_csr in edge csr
* \param out_csr out edge csr
* \param coo coo
* @brief constructor
* @param metagraph metagraph
* @param in_csr in edge csr
* @param out_csr out edge csr
* @param coo coo
*/
UnitGraph(GraphPtr metagraph, CSRPtr in_csr, CSRPtr out_csr, COOPtr coo,
dgl_format_code_t formats = ALL_CODE);
/*!
* \brief constructor
* \param num_vtypes number of vertex types (1 or 2)
* \param metagraph metagraph
* \param in_csr in edge csr
* \param out_csr out edge csr
* \param coo coo
* \param has_in_csr whether in_csr is valid
* \param has_out_csr whether out_csr is valid
* \param has_coo whether coo is valid
* @brief constructor
* @param num_vtypes number of vertex types (1 or 2)
* @param metagraph metagraph
* @param in_csr in edge csr
* @param out_csr out edge csr
* @param coo coo
* @param has_in_csr whether in_csr is valid
* @param has_out_csr whether out_csr is valid
* @param has_coo whether coo is valid
*/
static HeteroGraphPtr CreateUnitGraphFrom(
int num_vtypes,
......@@ -350,11 +350,11 @@ class UnitGraph : public BaseHeteroGraph {
bool has_coo,
dgl_format_code_t formats = ALL_CODE);
/*! \return Return any existing format. */
/*! @return Return any existing format. */
HeteroGraphPtr GetAny() const;
/*!
* \brief Determine which format to use with a preference.
* @brief Determine which format to use with a preference.
*
* If the storage of unit graph is "locked", i.e. no conversion is allowed, then
* it will return the locked format.
......@@ -364,24 +364,24 @@ class UnitGraph : public BaseHeteroGraph {
*/
SparseFormat SelectFormat(dgl_format_code_t preferred_formats) const;
/*! \return Whether the graph is hypersparse */
/*! @return Whether the graph is hypersparse */
bool IsHypersparse() const;
GraphPtr AsImmutableGraph() const override;
// Graph stored in different format. We use an on-demand strategy: the format is
// only materialized if the operation that suitable for it is invoked.
/*! \brief CSR graph that stores reverse edges */
/*! @brief CSR graph that stores reverse edges */
CSRPtr in_csr_;
/*! \brief CSR representation */
/*! @brief CSR representation */
CSRPtr out_csr_;
/*! \brief COO representation */
/*! @brief COO representation */
COOPtr coo_;
/*!
* \brief Storage format restriction.
* @brief Storage format restriction.
*/
dgl_format_code_t formats_;
/*! \brief which streams have recorded the graph */
/*! @brief which streams have recorded the graph */
std::vector<DGLStreamHandle> recorded_streams;
};
......
/*!
* Copyright (c) 2021 by Contributors
* \file ndarray_partition.h
* \brief Operations on partition implemented in CUDA.
* @file ndarray_partition.h
* @brief Operations on partition implemented in CUDA.
*/
#include <dgl/runtime/device_api.h>
......
/*!
* Copyright (c) 2021 by Contributors
* \file ndarray_partition.cc
* \brief DGL utilities for working with the partitioned NDArrays
* @file ndarray_partition.cc
* @brief DGL utilities for working with the partitioned NDArrays
*/
#include "ndarray_partition.h"
......
/*!
* Copyright (c) 2021 by Contributors
* \file ndarray_partition.h
* \brief DGL utilities for working with the partitioned NDArrays
* @file ndarray_partition.h
* @brief DGL utilities for working with the partitioned NDArrays
*/
#ifndef DGL_PARTITION_NDARRAY_PARTITION_H_
......
/*!
* Copyright (c) 2021 by Contributors
* \file ndarray_partition.h
* \brief DGL utilities for working with the partitioned NDArrays
* @file ndarray_partition.h
* @brief DGL utilities for working with the partitioned NDArrays
*/
#ifndef DGL_PARTITION_PARTITION_OP_H_
......
/*!
* Copyright (c) 2019 by Contributors
* \file random/choice.cc
* \brief Non-uniform discrete sampling implementation
* @file random/choice.cc
* @brief Non-uniform discrete sampling implementation
*/
#include <dgl/array.h>
......
/*!
* Copyright (c) 2019 by Contributors
* \file dgl/sample_utils.h
* \brief Sampling utilities
* @file dgl/sample_utils.h
* @brief Sampling utilities
*/
#ifndef DGL_RANDOM_CPU_SAMPLE_UTILS_H_
#define DGL_RANDOM_CPU_SAMPLE_UTILS_H_
......@@ -20,12 +20,12 @@
namespace dgl {
namespace utils {
/*! \brief Base sampler class */
/*! @brief Base sampler class */
template <typename Idx>
class BaseSampler {
public:
virtual ~BaseSampler() = default;
/*! \brief Draw one integer sample */
/*! @brief Draw one integer sample */
virtual Idx Draw() {
LOG(INFO) << "Not implemented yet.";
return 0;
......
/*!
* Copyright (c) 2017 by Contributors
* \file random.cc
* \brief Random number generator interfaces
* @file random.cc
* @brief Random number generator interfaces
*/
#include <dgl/array.h>
......
/*!
* Copyright (c) 2022 by Contributors
* \file net_type.h
* \brief Base communicator for DGL distributed training.
* @file net_type.h
* @brief Base communicator for DGL distributed training.
*/
#ifndef DGL_RPC_NET_TYPE_H_
#define DGL_RPC_NET_TYPE_H_
......@@ -15,59 +15,59 @@ namespace rpc {
struct RPCBase {
/*!
* \brief Finalize Receiver
* @brief Finalize Receiver
*
* Finalize() is not thread-safe and only one thread can invoke this API.
*/
virtual void Finalize() = 0;
/*!
* \brief Communicator type: 'socket', 'tensorpipe', etc
* @brief Communicator type: 'socket', 'tensorpipe', etc
*/
virtual const std::string &NetType() const = 0;
};
struct RPCSender : RPCBase {
/*!
* \brief Connect to a receiver.
* @brief Connect to a receiver.
*
* When there are multiple receivers to be connected, application will call
* `ConnectReceiver` for each and then call `ConnectReceiverFinalize` to make
* sure that either all the connections are successfully established or some
* of them fail.
*
* \param addr Networking address, e.g., 'tcp://127.0.0.1:50091'
* \param recv_id receiver's ID
* \return True for success and False for fail
* @param addr Networking address, e.g., 'tcp://127.0.0.1:50091'
* @param recv_id receiver's ID
* @return True for success and False for fail
*
* The function is *not* thread-safe; only one thread can invoke this API.
*/
virtual bool ConnectReceiver(const std::string &addr, int recv_id) = 0;
/*!
* \brief Finalize the action to connect to receivers. Make sure that either
* @brief Finalize the action to connect to receivers. Make sure that either
* all connections are successfully established or connection fails.
* \return True for success and False for fail
* @return True for success and False for fail
*
* The function is *not* thread-safe; only one thread can invoke this API.
*/
virtual bool ConnectReceiverFinalize(const int max_try_times) { return true; }
/*!
* \brief Send RPCMessage to specified Receiver.
* \param msg data message
* \param recv_id receiver's ID
* @brief Send RPCMessage to specified Receiver.
* @param msg data message
* @param recv_id receiver's ID
*/
virtual void Send(const RPCMessage &msg, int recv_id) = 0;
};
struct RPCReceiver : RPCBase {
/*!
* \brief Wait for all the Senders to connect
* \param addr Networking address, e.g., 'tcp://127.0.0.1:50051', 'mpi://0'
* \param num_sender total number of Senders
* \param blocking whether wait blockingly
* \return True for success and False for fail
* @brief Wait for all the Senders to connect
* @param addr Networking address, e.g., 'tcp://127.0.0.1:50051', 'mpi://0'
* @param num_sender total number of Senders
* @param blocking whether wait blockingly
* @return True for success and False for fail
*
* Wait() is not thread-safe and only one thread can invoke this API.
*/
......@@ -75,11 +75,11 @@ struct RPCReceiver : RPCBase {
const std::string &addr, int num_sender, bool blocking = true) = 0;
/*!
* \brief Recv RPCMessage from Sender. Actually removing data from queue.
* \param msg pointer of RPCmessage
* \param timeout The timeout value in milliseconds. If zero, wait
* @brief Recv RPCMessage from Sender. Actually removing data from queue.
* @param msg pointer of RPCmessage
* @param timeout The timeout value in milliseconds. If zero, wait
* indefinitely.
* \return RPCStatus: kRPCSuccess or kRPCTimeOut.
* @return RPCStatus: kRPCSuccess or kRPCTimeOut.
*/
virtual RPCStatus Recv(RPCMessage *msg, int timeout) = 0;
};
......
/*!
* Copyright (c) 2019 by Contributors
* \file common.cc
* \brief This file provide basic facilities for string
* @file common.cc
* @brief This file provide basic facilities for string
* to make programming convenient.
*/
#include "common.h"
......
/*!
* Copyright (c) 2019 by Contributors
* \file common.h
* \brief This file provide basic facilities for string
* @file common.h
* @brief This file provide basic facilities for string
* to make programming convenient.
*/
#ifndef DGL_RPC_NETWORK_COMMON_H_
......
/*!
* Copyright (c) 2019 by Contributors
* \file communicator.h
* \brief Communicator for DGL distributed training.
* @file communicator.h
* @brief Communicator for DGL distributed training.
*/
#ifndef DGL_RPC_NETWORK_COMMUNICATOR_H_
#define DGL_RPC_NETWORK_COMMUNICATOR_H_
......@@ -17,7 +17,7 @@ namespace dgl {
namespace network {
/*!
* \brief Network Sender for DGL distributed training.
* @brief Network Sender for DGL distributed training.
*
* Sender is an abstract class that defines a set of APIs for sending binary
* data message over network. It can be implemented by different underlying
......@@ -28,9 +28,9 @@ namespace network {
class Sender : public rpc::RPCSender {
public:
/*!
* \brief Sender constructor
* \param queue_size size (bytes) of message queue.
* \param max_thread_count size of thread pool. 0 for no limit
* @brief Sender constructor
* @param queue_size size (bytes) of message queue.
* @param max_thread_count size of thread pool. 0 for no limit
* Note that, the queue_size parameter is optional.
*/
explicit Sender(int64_t queue_size = 0, int max_thread_count = 0) {
......@@ -43,10 +43,10 @@ class Sender : public rpc::RPCSender {
virtual ~Sender() {}
/*!
* \brief Send data to specified Receiver.
* \param msg data message
* \param recv_id receiver's ID
* \return Status code
* @brief Send data to specified Receiver.
* @param msg data message
* @param recv_id receiver's ID
* @return Status code
*
* (1) The send is non-blocking. There is no guarantee that the message has
* been physically sent out when the function returns. (2) The communicator
......@@ -59,17 +59,17 @@ class Sender : public rpc::RPCSender {
protected:
/*!
* \brief Size of message queue
* @brief Size of message queue
*/
int64_t queue_size_;
/*!
* \brief Size of thread pool. 0 for no limit
* @brief Size of thread pool. 0 for no limit
*/
int max_thread_count_;
};
/*!
* \brief Network Receiver for DGL distributed training.
* @brief Network Receiver for DGL distributed training.
*
* Receiver is an abstract class that defines a set of APIs for receiving binary
* data message over network. It can be implemented by different underlying
......@@ -80,9 +80,9 @@ class Sender : public rpc::RPCSender {
class Receiver : public rpc::RPCReceiver {
public:
/*!
* \brief Receiver constructor
* \param queue_size size of message queue.
* \param max_thread_count size of thread pool. 0 for no limit
* @brief Receiver constructor
* @param queue_size size of message queue.
* @param max_thread_count size of thread pool. 0 for no limit
* Note that, the queue_size parameter is optional.
*/
explicit Receiver(int64_t queue_size = 0, int max_thread_count = 0) {
......@@ -97,12 +97,12 @@ class Receiver : public rpc::RPCReceiver {
virtual ~Receiver() {}
/*!
* \brief Recv data from Sender
* \param msg pointer of data message
* \param send_id which sender current msg comes from
* \param timeout The timeout value in milliseconds. If zero, wait
* @brief Recv data from Sender
* @param msg pointer of data message
* @param send_id which sender current msg comes from
* @param timeout The timeout value in milliseconds. If zero, wait
* indefinitely.
* \return Status code
* @return Status code
*
* (1) The Recv() API is thread-safe.
* (2) Memory allocated by communicator but will not own it after the function
......@@ -111,12 +111,12 @@ class Receiver : public rpc::RPCReceiver {
virtual STATUS Recv(Message* msg, int* send_id, int timeout = 0) = 0;
/*!
* \brief Recv data from a specified Sender
* \param msg pointer of data message
* \param send_id sender's ID
* \param timeout The timeout value in milliseconds. If zero, wait
* @brief Recv data from a specified Sender
* @param msg pointer of data message
* @param send_id sender's ID
* @param timeout The timeout value in milliseconds. If zero, wait
* indefinitely.
* \return Status code
* @return Status code
*
* (1) The RecvFrom() API is thread-safe.
* (2) Memory allocated by communicator but will not own it after the function
......@@ -126,11 +126,11 @@ class Receiver : public rpc::RPCReceiver {
protected:
/*!
* \brief Size of message queue
* @brief Size of message queue
*/
int64_t queue_size_;
/*!
* \brief Size of thread pool. 0 for no limit
* @brief Size of thread pool. 0 for no limit
*/
int max_thread_count_;
};
......
/*!
* Copyright (c) 2019 by Contributors
* \file msg_queue.cc
* \brief Message queue for DGL distributed training.
* @file msg_queue.cc
* @brief Message queue for DGL distributed training.
*/
#include "msg_queue.h"
......
/*!
* Copyright (c) 2019 by Contributors
* \file msg_queue.h
* \brief Message queue for DGL distributed training.
* @file msg_queue.h
* @brief Message queue for DGL distributed training.
*/
#ifndef DGL_RPC_NETWORK_MSG_QUEUE_H_
#define DGL_RPC_NETWORK_MSG_QUEUE_H_
......@@ -23,7 +23,7 @@ namespace network {
typedef int STATUS;
/*!
* \brief Status code of message queue
* @brief Status code of message queue
*/
#define ADD_SUCCESS 3400 // Add message successfully
#define MSG_GT_SIZE 3401 // Message size beyond queue size
......@@ -34,45 +34,45 @@ typedef int STATUS;
#define QUEUE_EMPTY 3406 // Cannot remove when queue is empty
/*!
* \brief Message used by network communicator and message queue.
* @brief Message used by network communicator and message queue.
*/
struct Message {
/*!
* \brief Constructor
* @brief Constructor
*/
Message() {}
/*!
* \brief Constructor
* @brief Constructor
*/
Message(char* data_ptr, int64_t data_size)
: data(data_ptr), size(data_size) {}
/*!
* \brief message data
* @brief message data
*/
char* data;
/*!
* \brief message size in bytes
* @brief message size in bytes
*/
int64_t size;
/*!
* \brief message receiver id
* @brief message receiver id
*/
int receiver_id = -1;
/*!
* \brief user-defined deallocator, which can be nullptr
* @brief user-defined deallocator, which can be nullptr
*/
std::function<void(Message*)> deallocator = nullptr;
};
/*!
* \brief Free memory buffer of message
* @brief Free memory buffer of message
*/
inline void DefaultMessageDeleter(Message* msg) { delete[] msg->data; }
/*!
* \brief Message Queue for network communication.
* @brief Message Queue for network communication.
*
* MessageQueue is FIFO queue that adopts producer/consumer model for data
* message. It supports one or more producer threads and one or more consumer
......@@ -90,93 +90,93 @@ inline void DefaultMessageDeleter(Message* msg) { delete[] msg->data; }
class MessageQueue {
public:
/*!
* \brief MessageQueue constructor
* \param queue_size size (bytes) of message queue
* \param num_producers number of producers, use 1 by default
* @brief MessageQueue constructor
* @param queue_size size (bytes) of message queue
* @param num_producers number of producers, use 1 by default
*/
explicit MessageQueue(
int64_t queue_size /* in bytes */, int num_producers = 1);
/*!
* \brief MessageQueue deconstructor
* @brief MessageQueue deconstructor
*/
~MessageQueue() {}
/*!
* \brief Add message to the queue
* \param msg data message
* \param is_blocking Blocking if cannot add, else return
* \return Status code
* @brief Add message to the queue
* @param msg data message
* @param is_blocking Blocking if cannot add, else return
* @return Status code
*/
STATUS Add(Message msg, bool is_blocking = true);
/*!
* \brief Remove message from the queue
* \param msg pointer of data msg
* \param is_blocking Blocking if cannot remove, else return
* \return Status code
* @brief Remove message from the queue
* @param msg pointer of data msg
* @param is_blocking Blocking if cannot remove, else return
* @return Status code
*/
STATUS Remove(Message* msg, bool is_blocking = true);
/*!
* \brief Signal that producer producer_id will no longer produce anything
* \param producer_id An integer uniquely to identify a producer thread
* @brief Signal that producer producer_id will no longer produce anything
* @param producer_id An integer uniquely to identify a producer thread
*/
void SignalFinished(int producer_id);
/*!
* \return true if queue is empty.
* @return true if queue is empty.
*/
bool Empty() const;
/*!
* \return true if queue is empty and all num_producers have signaled.
* @return true if queue is empty and all num_producers have signaled.
*/
bool EmptyAndNoMoreAdd() const;
protected:
/*!
* \brief message queue
* @brief message queue
*/
std::queue<Message> queue_;
/*!
* \brief Size of the queue in bytes
* @brief Size of the queue in bytes
*/
int64_t queue_size_;
/*!
* \brief Free size of the queue
* @brief Free size of the queue
*/
int64_t free_size_;
/*!
* \brief Used to check all producers will no longer produce anything
* @brief Used to check all producers will no longer produce anything
*/
size_t num_producers_;
/*!
* \brief Store finished producer id
* @brief Store finished producer id
*/
std::set<int /* producer_id */> finished_producers_;
/*!
* \brief Condition when consumer should wait
* @brief Condition when consumer should wait
*/
std::condition_variable cond_not_full_;
/*!
* \brief Condition when producer should wait
* @brief Condition when producer should wait
*/
std::condition_variable cond_not_empty_;
/*!
* \brief Signal for exit wait
* @brief Signal for exit wait
*/
std::atomic<bool> exit_flag_{false};
/*!
* \brief Protect all above data and conditions
* @brief Protect all above data and conditions
*/
mutable std::mutex mutex_;
};
......
/*!
* Copyright (c) 2019 by Contributors
* \file communicator.cc
* \brief SocketCommunicator for DGL distributed training.
* @file communicator.cc
* @brief SocketCommunicator for DGL distributed training.
*/
#include "socket_communicator.h"
......
/*!
* Copyright (c) 2019 by Contributors
* \file communicator.h
* \brief SocketCommunicator for DGL distributed training.
* @file communicator.h
* @brief SocketCommunicator for DGL distributed training.
*/
#ifndef DGL_RPC_NETWORK_SOCKET_COMMUNICATOR_H_
#define DGL_RPC_NETWORK_SOCKET_COMMUNICATOR_H_
......@@ -26,7 +26,7 @@ static constexpr int kTimeOut =
static constexpr int kMaxConnection = 1024; // maximal connection: 1024
/*!
* \breif Networking address
* @breif Networking address
*/
struct IPAddr {
std::string ip;
......@@ -34,59 +34,59 @@ struct IPAddr {
};
/*!
* \brief SocketSender for DGL distributed training.
* @brief SocketSender for DGL distributed training.
*
* SocketSender is the communicator implemented by tcp socket.
*/
class SocketSender : public Sender {
public:
/*!
* \brief Sender constructor
* \param queue_size size of message queue
* \param max_thread_count size of thread pool. 0 for no limit
* @brief Sender constructor
* @param queue_size size of message queue
* @param max_thread_count size of thread pool. 0 for no limit
*/
SocketSender(int64_t queue_size, int max_thread_count)
: Sender(queue_size, max_thread_count) {}
/*!
* \brief Connect to a receiver.
* @brief Connect to a receiver.
*
* When there are multiple receivers to be connected, application will call
* `ConnectReceiver` for each and then call `ConnectReceiverFinalize` to make
* sure that either all the connections are successfully established or some
* of them fail.
*
* \param addr Networking address, e.g., 'tcp://127.0.0.1:50091'
* \param recv_id receiver's ID
* \return True for success and False for fail
* @param addr Networking address, e.g., 'tcp://127.0.0.1:50091'
* @param recv_id receiver's ID
* @return True for success and False for fail
*
* The function is *not* thread-safe; only one thread can invoke this API.
*/
bool ConnectReceiver(const std::string& addr, int recv_id) override;
/*!
* \brief Finalize the action to connect to receivers. Make sure that either
* @brief Finalize the action to connect to receivers. Make sure that either
* all connections are successfully established or connection fails.
* \return True for success and False for fail
* @return True for success and False for fail
*
* The function is *not* thread-safe; only one thread can invoke this API.
*/
bool ConnectReceiverFinalize(const int max_try_times) override;
/*!
* \brief Send RPCMessage to specified Receiver.
* \param msg data message
* \param recv_id receiver's ID
* @brief Send RPCMessage to specified Receiver.
* @param msg data message
* @param recv_id receiver's ID
*/
void Send(const rpc::RPCMessage& msg, int recv_id) override;
/*!
* \brief Finalize TPSender
* @brief Finalize TPSender
*/
void Finalize() override;
/*!
* \brief Communicator type: 'socket'
* @brief Communicator type: 'socket'
*/
const std::string& NetType() const override {
static const std::string net_type = "socket";
......@@ -94,11 +94,11 @@ class SocketSender : public Sender {
}
/*!
* \brief Send data to specified Receiver. Actually pushing message to message
* @brief Send data to specified Receiver. Actually pushing message to message
* queue.
* \param msg data message.
* \param recv_id receiver's ID.
* \return Status code.
* @param msg data message.
* @param recv_id receiver's ID.
* @return Status code.
*
* (1) The send is non-blocking. There is no guarantee that the message has
* been physically sent out when the function returns. (2) The communicator
......@@ -111,31 +111,31 @@ class SocketSender : public Sender {
private:
/*!
* \brief socket for each connection of receiver
* @brief socket for each connection of receiver
*/
std::vector<
std::unordered_map<int /* receiver ID */, std::shared_ptr<TCPSocket>>>
sockets_;
/*!
* \brief receivers' address
* @brief receivers' address
*/
std::unordered_map<int /* receiver ID */, IPAddr> receiver_addrs_;
/*!
* \brief message queue for each thread
* @brief message queue for each thread
*/
std::vector<std::shared_ptr<MessageQueue>> msg_queue_;
/*!
* \brief Independent thread
* @brief Independent thread
*/
std::vector<std::shared_ptr<std::thread>> threads_;
/*!
* \brief Send-loop for each thread
* \param sockets TCPSockets for current thread
* \param queue message_queue for current thread
* @brief Send-loop for each thread
* @param sockets TCPSockets for current thread
* @param queue message_queue for current thread
*
* Note that, the SendLoop will finish its loop-job and exit thread
* when the main thread invokes Signal() API on the message queue.
......@@ -148,26 +148,26 @@ class SocketSender : public Sender {
};
/*!
* \brief SocketReceiver for DGL distributed training.
* @brief SocketReceiver for DGL distributed training.
*
* SocketReceiver is the communicator implemented by tcp socket.
*/
class SocketReceiver : public Receiver {
public:
/*!
* \brief Receiver constructor
* \param queue_size size of message queue.
* \param max_thread_count size of thread pool. 0 for no limit
* @brief Receiver constructor
* @param queue_size size of message queue.
* @param max_thread_count size of thread pool. 0 for no limit
*/
SocketReceiver(int64_t queue_size, int max_thread_count)
: Receiver(queue_size, max_thread_count) {}
/*!
* \brief Wait for all the Senders to connect
* \param addr Networking address, e.g., 'tcp://127.0.0.1:50051', 'mpi://0'
* \param num_sender total number of Senders
* \param blocking whether wait blockingly
* \return True for success and False for fail
* @brief Wait for all the Senders to connect
* @param addr Networking address, e.g., 'tcp://127.0.0.1:50051', 'mpi://0'
* @param num_sender total number of Senders
* @param blocking whether wait blockingly
* @return True for success and False for fail
*
* Wait() is not thread-safe and only one thread can invoke this API.
*/
......@@ -175,21 +175,21 @@ class SocketReceiver : public Receiver {
const std::string& addr, int num_sender, bool blocking = true) override;
/*!
* \brief Recv RPCMessage from Sender. Actually removing data from queue.
* \param msg pointer of RPCmessage
* \param timeout The timeout value in milliseconds. If zero, wait
* @brief Recv RPCMessage from Sender. Actually removing data from queue.
* @param msg pointer of RPCmessage
* @param timeout The timeout value in milliseconds. If zero, wait
* indefinitely.
* \return RPCStatus: kRPCSuccess or kRPCTimeOut.
* @return RPCStatus: kRPCSuccess or kRPCTimeOut.
*/
rpc::RPCStatus Recv(rpc::RPCMessage* msg, int timeout) override;
/*!
* \brief Recv data from Sender. Actually removing data from msg_queue.
* \param msg pointer of data message
* \param send_id which sender current msg comes from
* \param timeout The timeout value in milliseconds. If zero, wait
* @brief Recv data from Sender. Actually removing data from msg_queue.
* @param msg pointer of data message
* @param send_id which sender current msg comes from
* @param timeout The timeout value in milliseconds. If zero, wait
* indefinitely.
* \return Status code
* @return Status code
*
* (1) The Recv() API is thread-safe.
* (2) Memory allocated by communicator but will not own it after the function
......@@ -198,13 +198,13 @@ class SocketReceiver : public Receiver {
STATUS Recv(Message* msg, int* send_id, int timeout = 0) override;
/*!
* \brief Recv data from a specified Sender. Actually removing data from
* @brief Recv data from a specified Sender. Actually removing data from
* msg_queue.
* \param msg pointer of data message.
* \param send_id sender's ID
* \param timeout The timeout value in milliseconds. If zero, wait
* @param msg pointer of data message.
* @param send_id sender's ID
* @param timeout The timeout value in milliseconds. If zero, wait
* indefinitely.
* \return Status code
* @return Status code
*
* (1) The RecvFrom() API is thread-safe.
* (2) Memory allocated by communicator but will not own it after the function
......@@ -213,14 +213,14 @@ class SocketReceiver : public Receiver {
STATUS RecvFrom(Message* msg, int send_id, int timeout = 0) override;
/*!
* \brief Finalize SocketReceiver
* @brief Finalize SocketReceiver
*
* Finalize() is not thread-safe and only one thread can invoke this API.
*/
void Finalize() override;
/*!
* \brief Communicator type: 'socket'
* @brief Communicator type: 'socket'
*/
const std::string& NetType() const override {
static const std::string net_type = "socket";
......@@ -234,24 +234,24 @@ class SocketReceiver : public Receiver {
char* buffer = nullptr;
};
/*!
* \brief number of sender
* @brief number of sender
*/
int num_sender_;
/*!
* \brief server socket for listening connections
* @brief server socket for listening connections
*/
TCPSocket* server_socket_;
/*!
* \brief socket for each client connections
* @brief socket for each client connections
*/
std::vector<std::unordered_map<
int /* Sender (virutal) ID */, std::shared_ptr<TCPSocket>>>
sockets_;
/*!
* \brief Message queue for each socket connection
* @brief Message queue for each socket connection
*/
std::unordered_map<
int /* Sender (virtual) ID */, std::shared_ptr<MessageQueue>>
......@@ -259,20 +259,20 @@ class SocketReceiver : public Receiver {
std::unordered_map<int, std::shared_ptr<MessageQueue>>::iterator mq_iter_;
/*!
* \brief Independent thead
* @brief Independent thead
*/
std::vector<std::shared_ptr<std::thread>> threads_;
/*!
* \brief queue_sem_ semphore to indicate number of messages in multiple
* @brief queue_sem_ semphore to indicate number of messages in multiple
* message queues to prevent busy wait of Recv
*/
runtime::Semaphore queue_sem_;
/*!
* \brief Recv-loop for each thread
* \param sockets client sockets of current thread
* \param queue message queues of current thread
* @brief Recv-loop for each thread
* @param sockets client sockets of current thread
* @param queue message queues of current thread
*
* Note that, the RecvLoop will finish its loop-job and exit thread
* when the main thread invokes Signal() API on the message queue.
......
/*!
* Copyright (c) 2021 by Contributors
* \file socket_pool.cc
* \brief Socket pool of nonblocking sockets for DGL distributed training.
* @file socket_pool.cc
* @brief Socket pool of nonblocking sockets for DGL distributed training.
*/
#include "socket_pool.h"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment