ipc_socket.h 4.17 KB
Newer Older
1
2
#pragma once

3
#include <type_traits>
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <poll.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/un.h>

#include "base.h"
#include "net_utils.h"
#include "socket.h"
#include "thread_pool.h"

namespace sccl {
namespace hardware {
namespace net {
namespace ipc_socket {

24
constexpr int SCCL_IPC_SOCKNAME_LEN = 64;
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

// 定义IPC套接字结构体
struct scclIpcSocketHandle {
    int fd;                                 // 文件描述符
    char socketName[SCCL_IPC_SOCKNAME_LEN]; // 套接字名称
    volatile uint32_t* abortFlag;           // 用于中止操作的标志
};

// 封装发送数据,包括rank信息和实际数据的引用
struct DataPackage {
    int rank;
    char data[]; // 灵活数组成员,用于存储实际数据
};

//////////////////////////////////////////////////////////////////////////////////////////////////////
class scclIpcSocket {
public:
    // 构造函数和析构函数
43
    scclIpcSocket(int localRank, int nlocalRanks, uint64_t hash, volatile uint32_t* abortFlag = nullptr);
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
    virtual ~scclIpcSocket();

    // 设置 abortFlag 的函数
    scclResult_t setAbortFlag(volatile uint32_t* flag);
    // 获取 abortFlag 的函数
    volatile uint32_t* getAbortFlag() const;
    // 设置IPC套接字的超时时间
    scclResult_t setTimeout(int timeout_ms);
    // 获取线程池指针
    ThreadPool* getPthreadPool();

    //////////////////////////////////////////////////////////////////////////////////////////////////////
    /*
    并行计算时,不同的进程可能需要访问相同的文件或网络资源。通过发送文件描述符,可以避免多个进程重复打开相同的文件或建立相同的网络连接,从而节省资源和时间。
    */
59
    // 发送/接收文件描述符
60
61
62
    scclResult_t scclIpcSocketSendFd(const int sendFd, int dst_rank);
    scclResult_t scclIpcSocketRecvFd(int* fd);

63
    // 通过Unix域套接字发送/接收数据到指定目标
64
65
66
    scclResult_t scclIpcSocketSendData(const void* data, size_t dataLen, int dst_rank);
    scclResult_t scclIpcSocketRecvData(void* buffer, size_t bufferLen, size_t* receivedLen);

67
68
69
70
71
    // 通过Unix域套接字发送/接收数据到指定目标,并发送ack确保发送成功
    scclResult_t scclIpcSocketSendDataWithAck(const void* data, size_t dataLen, int dst_rank);
    scclResult_t scclIpcSocketRecvDataAndSendAck(void* buffer, size_t bufferLen, size_t* receivedLen, int src_rank);

    //////////////////////////////////////////////////////////////////////////////////////////////////////
72
    // local rank内的allgather操作。保证接收顺序
73
    scclResult_t scclIpcSocketAllgather(const void* sendData, void* recvData, size_t dataLen);
74

75
    // local rank内的allgather操作。为了性能,不保证接收顺序,所以发送的信息中需要添加进程ID
76
    scclResult_t scclIpcSocketAllgatherSync(const void* sendData, void* recvData, size_t dataLen);
77
78

    // local rank内的broadcast操作
79
    scclResult_t scclIpcSocketBroadcast(void* data, size_t dataLen, int root);
80
81
82
83
84

private:
    // 初始化IPC套接字
    scclResult_t scclIpcSocketInit(volatile uint32_t* abortFlag);
    scclResult_t getScclIpcSocknameStr(int rank, uint64_t hash, char* out_str, int* out_len);
85
86
87
88
89
90
91
92
93
94
95
96
97

private:
    // 定义并初始化一个 scclIpcSocket 结构体,用于处理 IPC 套接字连接
    struct scclIpcSocketHandle* handle = nullptr;
    // 定义一个 sockaddr_un 结构体,用于存储客户端地址信息
    struct sockaddr_un my_cliaddr;

    // 用于生成唯一套接字名称的hash值
    const uint64_t ipc_hash;
    // 非阻塞套接字设置
    const volatile uint32_t* my_abortFlag;

    // 进程id信息
98
99
    int localRank   = -1;
    int nlocalRanks = 0;
100
101
102

    // 线程池指针
    ThreadPool* pthread_pool = nullptr;
103
    // 设置超时时间为无限长
104
105
106
    int timeoutMs = -1;

    // 各种数据大小的固定值
107
    static constexpr int ACK_SIZE = 8;
108
109
    // 假设 CHUNK_SIZE 是一个合适的块大小,例如 64KB
    static constexpr size_t CHUNK_SIZE = 64 * 1024;
110
111
112
113
114
115
};

} // namespace ipc_socket
} // namespace net
} // namespace hardware
} // namespace sccl