deepspeed_py_aio_handle.h 2.08 KB
Newer Older
aiss's avatar
aiss committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/*
Copyright 2020 The Microsoft DeepSpeed Team
Licensed under the MIT license.

Functionality for swapping optimizer tensors to/from (NVMe) storage devices.
*/

#include <condition_variable>
#include <memory>
#include "deepspeed_aio_thread.h"

struct deepspeed_aio_handle_t {
    std::unique_ptr<struct aio_context> _aio_ctxt;
    const bool _single_submit;
    const bool _overlap_events;
    const int _num_threads;
    deepspeed_aio_config_t _aio_config;

    std::vector<std::shared_ptr<struct deepspeed_aio_thread_t>> _thread_contexts;
    std::vector<std::thread> _threads;
    int _num_pending_ops;

    deepspeed_aio_handle_t(const int block_size,
                           const int queue_depth,
                           const bool single_submit,
                           const bool overlap_events,
                           const int num_threads);

    ~deepspeed_aio_handle_t();

    const int get_block_size() const;
    const int get_queue_depth() const;
    const bool get_single_submit() const;
    const bool get_overlap_events() const;
    const int get_thread_count() const;

    int read(torch::Tensor& buffer, const char* filename, const bool validate);

    int write(const torch::Tensor& buffer, const char* filename, const bool validate);

    int pread(const torch::Tensor& buffer,
              const char* filename,
              const bool validate,
              const bool async);

    int pwrite(const torch::Tensor& buffer,
               const char* filename,
               const bool validate,
               const bool async);

    int sync_pread(torch::Tensor& buffer, const char* filename);

    int sync_pwrite(const torch::Tensor& buffer, const char* filename);

    int async_pread(torch::Tensor& buffer, const char* filename);

    int async_pwrite(const torch::Tensor& buffer, const char* filename);

    int wait();

    void _stop_threads();

    void _schedule_aio_work(std::shared_ptr<struct io_op_desc_t> scheduled_op);

    std::shared_ptr<struct io_op_desc_t> _wait_for_aio_work();

    bool _is_valid_parallel_aio_op(const bool read_op, const long long int num_bytes);
};