ipc_socket.h 3.98 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#pragma once

#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <poll.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/un.h>

#include "base.h"
#include "net_utils.h"
#include "socket.h"
#include "thread_pool.h"

namespace sccl {
namespace hardware {
namespace net {
namespace ipc_socket {

#define SCCL_IPC_SOCKNAME_LEN 64
#define SCCL_IPC_SOCKNAME_STR "/tmp/sccl-socket-%d-%lx"

// 定义IPC套接字结构体
struct scclIpcSocketHandle {
    int fd;                                 // 文件描述符
    char socketName[SCCL_IPC_SOCKNAME_LEN]; // 套接字名称
    volatile uint32_t* abortFlag;           // 用于中止操作的标志
};

// 封装发送数据,包括rank信息和实际数据的引用
struct DataPackage {
    int rank;
    char data[]; // 灵活数组成员,用于存储实际数据
};

//////////////////////////////////////////////////////////////////////////////////////////////////////
class scclIpcSocket {
public:
    // 构造函数和析构函数
    scclIpcSocket(int localRank, int localRanks, uint64_t hash, volatile uint32_t* abortFlag = nullptr);
    virtual ~scclIpcSocket();

    // 初始化IPC套接字
    scclResult_t scclIpcSocketInit(volatile uint32_t* abortFlag);
    // 设置 abortFlag 的函数
    scclResult_t setAbortFlag(volatile uint32_t* flag);
    // 获取 abortFlag 的函数
    volatile uint32_t* getAbortFlag() const;
    // 设置IPC套接字的超时时间
    scclResult_t setTimeout(int timeout_ms);
    // 获取线程池指针
    ThreadPool* getPthreadPool();

    //////////////////////////////////////////////////////////////////////////////////////////////////////
    /*
    并行计算时,不同的进程可能需要访问相同的文件或网络资源。通过发送文件描述符,可以避免多个进程重复打开相同的文件或建立相同的网络连接,从而节省资源和时间。
    */
    // 发送文件描述符
    scclResult_t scclIpcSocketSendFd(const int sendFd, int dst_rank);
    // 接收文件描述符
    scclResult_t scclIpcSocketRecvFd(int* fd);

    // 通过Unix域套接字发送数据到指定目标,阻塞方式
    scclResult_t scclIpcSocketSendData(const void* data, size_t dataLen, int dst_rank);
    // 通过Unix域套接字接收数据,阻塞方式
    scclResult_t scclIpcSocketRecvData(void* buffer, size_t bufferLen, size_t* receivedLen);
    // 通过Unix域套接字发送数据到指定目标,非阻塞方式
    scclResult_t scclIpcSocketSendDataNonBlocking(const void* data, size_t dataLen, int dst_rank);
    // 通过Unix域套接字接收数据,非阻塞方式
    scclResult_t scclIpcSocketRecvDataNonBlocking(void* buffer, size_t bufferLen, size_t* receivedLen);

    // local rank内的allgather操作。保证接收顺序
    scclResult_t scclIpcSocketAllgather(const void* sendData, void* recvData, size_t dataLen, bool wait = true);
    // local rank内的allgather操作。为了性能,不保证接收顺序,所以发送的信息中需要添加进程ID
    scclResult_t scclIpcSocketAllgatherSync(const void* sendData, void* recvData, size_t dataLen, bool wait = true);

    // local rank内的broadcast操作
    scclResult_t scclIpcSocketBroadcast(const void* sendData, void* recvData, size_t dataLen, int root, bool wait = true);

private:
    // 定义并初始化一个 scclIpcSocket 结构体,用于处理 IPC 套接字连接
    struct scclIpcSocketHandle* handle = nullptr;
    // 定义一个 sockaddr_un 结构体,用于存储客户端地址信息
    struct sockaddr_un my_cliaddr;

    // 用于生成唯一套接字名称的hash值
    const uint64_t ipc_hash;
    // 非阻塞套接字设置
    const volatile uint32_t* my_abortFlag;

    // 进程id信息
    int localRank  = -1;
    int localRanks = 0;

    // 线程池指针
    ThreadPool* pthread_pool = nullptr;
    // 设置超时时间为 10000 毫秒
    int timeoutMs = 10000;
};

} // namespace ipc_socket
} // namespace net
} // namespace hardware
} // namespace sccl