+my_accelerator.name='cuda'
+my_accelerator.communication_backend='nccl'
+my_accelerator.HalfTensor().device=device(type='cuda', index=0)
+my_accelerator.total_memory()=34089730048
+---[output] python test_set.py---------
+'''
diff --git a/azure/README.md b/azure/README.md
index 1cca695bfa7e1ef6a45a5f680134c97b86a46948..df222b9a2759f7e5e16516456c56689a15de1f6b 100644
--- a/azure/README.md
+++ b/azure/README.md
@@ -1,3 +1,3 @@
# Getting Started with DeepSpeed on Azure
-Please see our [Azure tutorial](https://www.deepspeed.ai/tutorials/azure/) to get started with DeepSpeed on Azure!
+The recommended and simplest method to try DeepSpeed on Azure is through [AzureML](https://azure.microsoft.com/en-us/services/machine-learning/). For more details, please see our [Azure tutorial](https://www.deepspeed.ai/tutorials/azure/).
diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..fcb45ab2b68516814a4bfbffebf2e01cbfefd527
--- /dev/null
+++ b/benchmarks/__init__.py
@@ -0,0 +1 @@
+'''Copyright The Microsoft DeepSpeed Team'''
diff --git a/benchmarks/communication/README.md b/benchmarks/communication/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..f760465b5c972f9b6364e7bbcfb11871ac079cf1
--- /dev/null
+++ b/benchmarks/communication/README.md
@@ -0,0 +1,75 @@
+# Running Communication Benchmarks
+
+
+To run benchmarks, there are two options:
+
+1. Run a single communication operation:
+
+For example, run with a single large message size:
+
+deepspeed all_reduce.py
+
+
+Scan across message sizes:
+
+deepspeed all_reduce.py --scan
+
+
+2. Run all available communication benchmarks:
+
+
+deepspeed run_all.py
+
+
+Like the individual benchmarks, `run_all.py` supports scanning arguments for the max message size, bw-unit, etc. Simply pass the desired arguments to `run_all.py` and they'll be propagated to each comm op.
+
+
+usage: ds_bench [-h] [--local_rank LOCAL_RANK] [--trials TRIALS] [--warmups WARMUPS] [--maxsize MAXSIZE] [--async-op] [--bw-unit {Gbps,GBps}] [--backend {nccl}] [--dist {deepspeed,torch}] [--scan] [--raw] [--all-reduce] [--all-gather] [--all-to-all]
+ [--pt2pt] [--broadcast] [--dtype DTYPE] [--mem-factor MEM_FACTOR] [--debug]
+
+optional arguments:
+ -h, --help show this help message and exit
+ --local_rank LOCAL_RANK
+ --trials TRIALS Number of timed iterations
+ --warmups WARMUPS Number of warmup (non-timed) iterations
+ --maxsize MAXSIZE Max message size as a power of 2
+ --async-op Enables non-blocking communication
+ --bw-unit {Gbps,GBps}
+ --backend {nccl} Communication library to use
+ --dist {deepspeed,torch}
+ Distributed DL framework to use
+ --scan Enables scanning all message sizes
+ --raw Print the message size and latency without units
+ --all-reduce Run all_reduce
+ --all-gather Run all_gather
+ --all-to-all Run all_to_all
+ --pt2pt Run pt2pt
+ --broadcast Run broadcast
+ --dtype DTYPE PyTorch tensor dtype
+ --mem-factor MEM_FACTOR
+ Proportion of max available GPU memory to use for single-size evals
+ --debug Enables all_to_all debug prints
+
+
+Note that `ds_bench` is a pre-packaged wrapper around `run_all.py`. Users can pass the same arguments as well:
+
+
+/bin/ds_bench --scan --trials=10
+
+
+Finally, users can choose specific communication operations to run in `run_all.py` or `ds_bench` by passing them as arguments (all operations are run by default). For example:
+
+
+deepspeed run_all.py --scan --all-reduce --all-to-all --broadcast
+
+
+
+# Adding Communication Benchmarks
+
+To add new communication benchmarks, follow this general procedure:
+
+1. Copy a similar benchmark file (e.g. to add `reduce_scatter`, copy `all_reduce.py` as a template)
+2. Add a new bw formula in `utils.get_bw`, a new maximum tensor element formula in `utils.max_numel`, and a new arg in `utils.benchmark_parser`
+3. Replace comm op calls in new file with find-replace
+4. Find a good default `mem_factor` for use in `run__single()` function
+5. Add new comm op to `run_all.py`
diff --git a/benchmarks/communication/__init__.py b/benchmarks/communication/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..fcb45ab2b68516814a4bfbffebf2e01cbfefd527
--- /dev/null
+++ b/benchmarks/communication/__init__.py
@@ -0,0 +1 @@
+'''Copyright The Microsoft DeepSpeed Team'''
diff --git a/benchmarks/communication/all_gather.py b/benchmarks/communication/all_gather.py
new file mode 100644
index 0000000000000000000000000000000000000000..dc97267b384020e408739017c3b9051434211b34
--- /dev/null
+++ b/benchmarks/communication/all_gather.py
@@ -0,0 +1,159 @@
+'''Copyright The Microsoft DeepSpeed Team'''
+
+from benchmarks.communication.utils import *
+from benchmarks.communication.constants import *
+from deepspeed.accelerator import get_accelerator
+
+import time
+
+
+# Run all_gather and print metrics
+def timed_all_gather(input, output, args):
+ if args.dist == 'torch':
+ import torch.distributed as dist
+ elif args.dist == 'deepspeed':
+ import deepspeed.comm as dist
+
+ sync_all()
+ # Warmups, establish connections, etc.
+ for i in range(args.warmups):
+ # use all_gather_base if available
+ if args.dist == 'torch':
+ if hasattr(torch.distributed, "_all_gather_base"):
+ dist._all_gather_base(output, input, group=None, async_op=args.async_op)
+ else:
+ output_tensors = list(
+ torch.chunk(output_tensor,
+ cdb.get_world_size(group)))
+ dist.all_gather(output_tensors, input_tensor, group=group, async_op=True)
+ elif args.dist == 'deepspeed':
+ dist.allgather_fn(output, input, group=None, async_op=args.async_op)
+ sync_all()
+
+ # time the actual comm op trials times and average it
+ pre = time.perf_counter()
+ for i in range(args.trials):
+ # use all_gather_base if available
+ if args.dist == 'torch':
+ if hasattr(torch.distributed, "_all_gather_base"):
+ dist._all_gather_base(output, input, group=None, async_op=args.async_op)
+ else:
+ output_tensors = list(
+ torch.chunk(output_tensor,
+ cdb.get_world_size(group)))
+ dist.all_gather(output_tensors, input_tensor, group=group, async_op=True)
+ elif args.dist == 'deepspeed':
+ dist.allgather_fn(output, input, group=None, async_op=args.async_op)
+ sync_all()
+ duration = time.perf_counter() - pre
+
+ # maintain and clean performance data
+ avg_duration = duration / args.trials
+ size = input.element_size() * input.nelement()
+ n = dist.get_world_size()
+ tput, busbw = get_bw('all_gather', size, avg_duration, args)
+ tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
+ desc = f'{input.nelement()}x{input.element_size()}'
+
+ if not args.raw:
+ size = convert_size(size)
+
+ print_rank_0(
+ f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
+
+
+def run_all_gather(local_rank, args):
+ if args.dist == 'torch':
+ import torch.distributed as dist
+ elif args.dist == 'deepspeed':
+ import deepspeed.comm as dist
+
+ # Prepare benchmark header
+ print_header(args, 'all_gather')
+ global_rank = dist.get_rank()
+ world_size = dist.get_world_size()
+
+ if args.scan:
+ # Create list of message sizes
+ M_LIST = []
+ for x in (2**p for p in range(1, args.maxsize)):
+ M_LIST.append(x)
+
+ sync_all()
+ # loop over various tensor sizes
+ for M in M_LIST:
+ global_rank = dist.get_rank()
+ try:
+ mat = torch.ones(world_size,
+ M,
+ dtype=getattr(
+ torch,
+ args.dtype)).to(
+ get_accelerator().device_name(local_rank))
+ sync_all()
+ input = ((mat.mul_(float(global_rank))).view(-1))
+ # Delete original mat to avoid OOM
+ del mat
+ get_accelerator().empty_cache()
+ output = torch.zeros(input.nelement() * world_size,
+ dtype=getattr(
+ torch,
+ args.dtype)).to(
+ get_accelerator().device_name(local_rank))
+ except RuntimeError as e:
+ if 'out of memory' in str(e):
+ if dist.get_rank() == 0:
+ print('WARNING: Ran out of GPU memory. Exiting comm op.')
+ sync_all()
+ break
+ sync_all()
+ timed_all_gather(input, output, args)
+ else:
+ # all_gather_base saves memory
+ if (args.dist == 'torch'
+ and hasattr(torch.distributed,
+ "_all_gather_base")) or (args.dist == 'deepspeed'
+ and dist.has_allgather_base):
+ mem_factor = args.mem_factor + 0.2
+ else:
+ mem_factor = args.mem_factor
+ # Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
+ sync_all()
+ elements_per_gpu = max_numel(comm_op='all_gather',
+ dtype=getattr(torch,
+ args.dtype),
+ mem_factor=mem_factor,
+ local_rank=local_rank,
+ args=args)
+ try:
+ mat = torch.ones(elements_per_gpu,
+ dtype=getattr(torch,
+ args.dtype)).to(
+ get_accelerator().device_name(local_rank))
+ # multiply each GPU's tensor by the rank to ease debugging
+ input = ((mat.mul_(float(global_rank))).view(-1))
+ # Delete original mat to avoid OOM
+ del mat
+ get_accelerator().empty_cache()
+ output = torch.zeros(
+ elements_per_gpu * world_size,
+ dtype=getattr(torch,
+ args.dtype)).to(get_accelerator().device_name(local_rank))
+ except RuntimeError as e:
+ if 'out of memory' in str(e):
+ if dist.get_rank() == 0:
+ print(
+ 'WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!'
+ )
+ sync_all()
+ return
+
+ sync_all()
+ timed_all_gather(input, output, args)
+
+
+if __name__ == "__main__":
+ args = benchmark_parser().parse_args()
+ rank = args.local_rank
+ init_processes(local_rank=rank, args=args)
+ run_all_gather(local_rank=rank, args=args)
diff --git a/benchmarks/communication/all_reduce.py b/benchmarks/communication/all_reduce.py
new file mode 100644
index 0000000000000000000000000000000000000000..edc1b99301c06e7c8c4b5807e35ad2afa39bf17b
--- /dev/null
+++ b/benchmarks/communication/all_reduce.py
@@ -0,0 +1,113 @@
+'''Copyright The Microsoft DeepSpeed Team'''
+
+from benchmarks.communication.utils import *
+from benchmarks.communication.constants import *
+from deepspeed.accelerator import get_accelerator
+
+import time
+
+
+def timed_all_reduce(input, args):
+ if args.dist == 'torch':
+ import torch.distributed as dist
+ elif args.dist == 'deepspeed':
+ import deepspeed.comm as dist
+
+ sync_all()
+ # Warmups, establish connections, etc.
+ for i in range(args.warmups):
+ dist.all_reduce(input, async_op=args.async_op)
+ sync_all()
+
+ # time the actual comm op trials times and average it
+ pre = time.perf_counter()
+ for i in range(args.trials):
+ dist.all_reduce(input, async_op=args.async_op)
+ sync_all()
+ duration = time.perf_counter() - pre
+
+ # maintain and clean performance data
+ avg_duration = duration / args.trials
+ size = input.element_size() * input.nelement()
+ n = dist.get_world_size()
+ tput, busbw = get_bw('all_reduce', size, avg_duration, args)
+ tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
+ desc = f'{input.nelement()}x{input.element_size()}'
+
+ if not args.raw:
+ size = convert_size(size)
+
+ print_rank_0(
+ f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
+
+
+def run_all_reduce(local_rank, args):
+ if args.dist == 'torch':
+ import torch.distributed as dist
+ elif args.dist == 'deepspeed':
+ import deepspeed.comm as dist
+
+ # Prepare benchmark header
+ print_header(args, 'all_reduce')
+
+ world_size = dist.get_world_size()
+ global_rank = dist.get_rank()
+
+ if args.scan:
+ M_LIST = []
+ for x in (2**p for p in range(1, args.maxsize)):
+ M_LIST.append(x)
+
+ sync_all()
+ # loop over various tensor sizes
+ for M in M_LIST:
+ global_rank = dist.get_rank()
+ try:
+ mat = torch.ones(world_size,
+ M,
+ dtype=getattr(
+ torch,
+ args.dtype)).to(
+ get_accelerator().device_name(local_rank))
+ sync_all()
+ input = ((mat.mul_(float(global_rank))).view(-1))
+ except RuntimeError as e:
+ if 'out of memory' in str(e):
+ if dist.get_rank() == 0:
+ print('WARNING: Ran out of GPU memory. Exiting comm op.')
+ sync_all()
+ break
+ sync_all()
+ timed_all_reduce(input, args)
+ else:
+ # Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
+ # Don't need output tensor, so we double mem_factor
+ elements_per_gpu = max_numel(comm_op='all_reduce',
+ dtype=getattr(torch,
+ args.dtype),
+ mem_factor=args.mem_factor * 2,
+ local_rank=local_rank,
+ args=args)
+ try:
+ mat = torch.ones(elements_per_gpu,
+ dtype=getattr(torch,
+ args.dtype)).to(
+ get_accelerator().device_name(local_rank))
+ input = ((mat.mul_(float(global_rank))).view(-1))
+ except RuntimeError as e:
+ if 'out of memory' in str(e):
+ if dist.get_rank() == 0:
+ print(
+ 'WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!'
+ )
+ sync_all()
+ return
+ sync_all()
+ timed_all_reduce(input, args)
+
+
+if __name__ == "__main__":
+ args = benchmark_parser().parse_args()
+ rank = args.local_rank
+ init_processes(local_rank=rank, args=args)
+ run_all_reduce(local_rank=rank, args=args)
diff --git a/benchmarks/communication/all_to_all.py b/benchmarks/communication/all_to_all.py
new file mode 100644
index 0000000000000000000000000000000000000000..bd35cf290e4c0f35b2b2ce4b2c4ea17f876e52ec
--- /dev/null
+++ b/benchmarks/communication/all_to_all.py
@@ -0,0 +1,134 @@
+'''Copyright The Microsoft DeepSpeed Team'''
+
+from benchmarks.communication.utils import *
+from benchmarks.communication.constants import *
+from deepspeed.accelerator import get_accelerator
+
+import time
+
+
+def timed_all_to_all(input, output, args):
+ if args.dist == 'torch':
+ import torch.distributed as dist
+ elif args.dist == 'deepspeed':
+ import deepspeed.comm as dist
+
+ sync_all()
+ # Warmups, establish connections, etc.
+ for i in range(args.warmups):
+ dist.all_to_all_single(output, input, async_op=args.async_op)
+ sync_all()
+
+ # time the actual comm op trials times and average it
+ pre = time.perf_counter()
+ for i in range(args.trials):
+ dist.all_to_all_single(output, input, async_op=args.async_op)
+ sync_all()
+ duration = time.perf_counter() - pre
+
+ # maintain and clean performance data
+ avg_duration = duration / args.trials
+ size = input.element_size() * input.nelement()
+ n = dist.get_world_size()
+ tput, busbw = get_bw('all_to_all', size, avg_duration, args)
+ tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
+ desc = f'{input.nelement()}x{input.element_size()}'
+
+ if not args.raw:
+ size = convert_size(size)
+
+ print_rank_0(
+ f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
+
+
+def run_all_to_all(local_rank, args):
+ if args.dist == 'torch':
+ import torch.distributed as dist
+ elif args.dist == 'deepspeed':
+ import deepspeed.comm as dist
+
+ world_size = dist.get_world_size()
+ global_rank = dist.get_rank()
+ # Prepare benchmark header
+ print_header(args, 'all_to_all')
+
+ if args.scan:
+ M_LIST = []
+ for x in (2**p for p in range(1, args.maxsize)):
+ M_LIST.append(x)
+
+ sync_all()
+ # loop over various tensor sizes
+ for M in M_LIST:
+ global_rank = dist.get_rank()
+ try:
+ mat = torch.ones(world_size,
+ M,
+ dtype=getattr(
+ torch,
+ args.dtype)).to(
+ get_accelerator().device_name(local_rank))
+ assert mat.numel() % world_size == 0, f"tensor cannot be divided in {world_size} chunks"
+ sync_all()
+ input = ((mat.mul_(float(global_rank))).view(-1))
+ output = (mat.clone().view(-1))
+ except RuntimeError as e:
+ if 'out of memory' in str(e):
+ if dist.get_rank() == 0:
+ print('WARNING: Ran out of GPU memory. Exiting comm op.')
+ sync_all()
+ break
+ sync_all()
+ timed_all_to_all(input, output, args)
+ else:
+ # Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
+ elements_per_gpu = max_numel(comm_op='all_to_all',
+ dtype=getattr(torch,
+ args.dtype),
+ mem_factor=args.mem_factor,
+ local_rank=local_rank,
+ args=args)
+ try:
+ mat = torch.ones(elements_per_gpu,
+ dtype=getattr(torch,
+ args.dtype)).to(
+ get_accelerator().device_name(local_rank))
+ assert mat.numel() % world_size == 0, f"tensor with {mat.numel()} elements cannot be divided in {world_size} chunks"
+ input = ((mat.mul_(float(global_rank))).view(-1))
+ # Delete original mat to avoid OOM
+ del mat
+ get_accelerator().empty_cache()
+ output = torch.zeros(
+ elements_per_gpu,
+ dtype=getattr(torch,
+ args.dtype)).to(get_accelerator().device_name(local_rank))
+ except RuntimeError as e:
+ if 'out of memory' in str(e):
+ if dist.get_rank() == 0:
+ print(
+ 'WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!'
+ )
+ sync_all()
+ return
+ sync_all()
+
+ if args.debug:
+ for i in range(world_size):
+ if i == global_rank:
+ print(f"Before AllToAll Input List at rank {global_rank}: {input}")
+ dist.barrier()
+
+ timed_all_to_all(input, output, args)
+
+ if args.debug:
+ for i in range(world_size):
+ if i == global_rank:
+ print(f"AllToAll Results at rank {global_rank}: {output}")
+ dist.barrier()
+
+
+if __name__ == "__main__":
+ args = benchmark_parser().parse_args()
+ rank = args.local_rank
+ init_processes(local_rank=rank, args=args)
+ run_all_to_all(local_rank=rank, args=args)
diff --git a/benchmarks/communication/broadcast.py b/benchmarks/communication/broadcast.py
new file mode 100644
index 0000000000000000000000000000000000000000..633e46638fac775920b89b0fc2ab9b0f4401dc79
--- /dev/null
+++ b/benchmarks/communication/broadcast.py
@@ -0,0 +1,114 @@
+'''Copyright The Microsoft DeepSpeed Team'''
+
+import torch
+from benchmarks.communication.utils import *
+from benchmarks.communication.constants import *
+from deepspeed.accelerator import get_accelerator
+
+import time
+
+
+def timed_broadcast(input, args):
+ if args.dist == 'torch':
+ import torch.distributed as dist
+ elif args.dist == 'deepspeed':
+ import deepspeed.comm as dist
+
+ sync_all()
+ # Warmups, establish connections, etc.
+ for i in range(args.warmups):
+ dist.broadcast(input, 0, async_op=args.async_op)
+ sync_all()
+
+ # time the actual comm op trials times and average it
+ pre = time.perf_counter()
+ for i in range(args.trials):
+ dist.broadcast(input, 0, async_op=args.async_op)
+ sync_all()
+ duration = time.perf_counter() - pre
+
+ # maintain and clean performance data
+ avg_duration = duration / args.trials
+ size = input.element_size() * input.nelement()
+ n = dist.get_world_size()
+ tput, busbw = get_bw('broadcast', size, avg_duration, args)
+ tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
+ desc = f'{input.nelement()}x{input.element_size()}'
+
+ if not args.raw:
+ size = convert_size(size)
+
+ print_rank_0(
+ f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
+
+
+def run_broadcast(local_rank, args):
+ if args.dist == 'torch':
+ import torch.distributed as dist
+ elif args.dist == 'deepspeed':
+ import deepspeed.comm as dist
+
+ # Prepare benchmark header
+ print_header(args, 'broadcast')
+
+ world_size = dist.get_world_size()
+ global_rank = dist.get_rank()
+
+ if args.scan:
+ M_LIST = []
+ for x in (2**p for p in range(1, args.maxsize)):
+ M_LIST.append(x)
+
+ sync_all()
+ # loop over various tensor sizes
+ for M in M_LIST:
+ global_rank = dist.get_rank()
+ try:
+ mat = torch.ones(world_size,
+ M,
+ dtype=getattr(
+ torch,
+ args.dtype)).to(
+ get_accelerator().device_name(local_rank))
+ sync_all()
+ input = ((mat.mul_(float(global_rank))).view(-1))
+ except RuntimeError as e:
+ if 'out of memory' in str(e):
+ if dist.get_rank() == 0:
+ print('WARNING: Ran out of GPU memory. Exiting comm op.')
+ sync_all()
+ break
+ sync_all()
+ timed_broadcast(input, args)
+ else:
+ # Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
+ # Don't need output tensor, so we double mem_factor
+ elements_per_gpu = max_numel(comm_op='broadcast',
+ dtype=getattr(torch,
+ args.dtype),
+ mem_factor=args.mem_factor * 2,
+ local_rank=local_rank,
+ args=args)
+ try:
+ mat = torch.ones(elements_per_gpu,
+ dtype=getattr(torch,
+ args.dtype)).to(
+ get_accelerator().device_name(local_rank))
+ input = ((mat.mul_(float(global_rank))).view(-1))
+ except RuntimeError as e:
+ if 'out of memory' in str(e):
+ if dist.get_rank() == 0:
+ print(
+ 'WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!'
+ )
+ sync_all()
+ return
+ sync_all()
+ timed_broadcast(input, args)
+
+
+if __name__ == "__main__":
+ args = benchmark_parser().parse_args()
+ rank = args.local_rank
+ init_processes(local_rank=rank, args=args)
+ run_broadcast(local_rank=rank, args=args)
diff --git a/benchmarks/communication/constants.py b/benchmarks/communication/constants.py
new file mode 100644
index 0000000000000000000000000000000000000000..935927acd174256fe7cd552a3181977e2dfdb7d8
--- /dev/null
+++ b/benchmarks/communication/constants.py
@@ -0,0 +1,10 @@
+'''Copyright The Microsoft DeepSpeed Team'''
+from deepspeed.accelerator import get_accelerator
+
+DEFAULT_WARMUPS = 5
+DEFAULT_TRIALS = 50
+DEFAULT_TYPE = 'float'
+DEFAULT_BACKEND = get_accelerator().communication_backend_name()
+DEFAULT_UNIT = 'Gbps'
+DEFAULT_DIST = 'deepspeed'
+DEFAULT_MAXSIZE = 24
diff --git a/benchmarks/communication/pt2pt.py b/benchmarks/communication/pt2pt.py
new file mode 100644
index 0000000000000000000000000000000000000000..1c890fc42e93585a067ef815f163a96d069096ef
--- /dev/null
+++ b/benchmarks/communication/pt2pt.py
@@ -0,0 +1,132 @@
+'''Copyright The Microsoft DeepSpeed Team'''
+
+from benchmarks.communication.utils import *
+from benchmarks.communication.constants import *
+from deepspeed.accelerator import get_accelerator
+
+import time
+
+
+def timed_pt2pt(input, args):
+ if args.dist == 'torch':
+ import torch.distributed as dist
+ elif args.dist == 'deepspeed':
+ import deepspeed.comm as dist
+
+ sync_all()
+ # Warmups, establish connections, etc.
+ for i in range(args.warmups):
+ if dist.get_rank() == 0:
+ if args.async_op:
+ dist.isend(input, 1)
+ else:
+ dist.send(input, 1)
+ if dist.get_rank() == 1:
+ if args.async_op:
+ dist.irecv(input, src=0)
+ else:
+ dist.recv(input, src=0)
+ sync_all()
+
+ # time the actual comm op trials times and average it
+ pre = time.perf_counter()
+ for i in range(args.trials):
+ if dist.get_rank() == 0:
+ if args.async_op:
+ dist.isend(input, 1)
+ else:
+ dist.send(input, 1)
+ if dist.get_rank() == 1:
+ if args.async_op:
+ dist.irecv(input, src=0)
+ else:
+ dist.recv(input, src=0)
+
+ sync_all()
+ duration = time.perf_counter() - pre
+
+ # maintain and clean performance data
+ avg_duration = duration / args.trials
+ size = input.element_size() * input.nelement()
+ n = dist.get_world_size()
+ tput, busbw = get_bw('pt2pt', size, avg_duration, args)
+ tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
+ desc = f'{input.nelement()}x{input.element_size()}'
+
+ if not args.raw:
+ size = convert_size(size)
+
+ print_rank_0(
+ f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
+
+
+def run_pt2pt(local_rank, args):
+ if args.dist == 'torch':
+ import torch.distributed as dist
+ elif args.dist == 'deepspeed':
+ import deepspeed.comm as dist
+
+ # Prepare benchmark header
+ print_header(args, 'pt2pt')
+ global_rank = dist.get_rank()
+ world_size = dist.get_world_size()
+
+ if args.scan:
+ # Create list of message sizes
+ M_LIST = []
+ for x in (2**p for p in range(1, args.maxsize)):
+ M_LIST.append(x)
+
+ sync_all()
+ # loop over various tensor sizes
+ for M in M_LIST:
+ global_rank = dist.get_rank()
+ try:
+ mat = torch.ones(world_size,
+ M,
+ dtype=getattr(
+ torch,
+ args.dtype)).to(
+ get_accelerator().device_name(local_rank))
+ sync_all()
+ input = ((mat.mul_(float(global_rank))).view(-1))
+ except RuntimeError as e:
+ if 'out of memory' in str(e):
+ if dist.get_rank() == 0:
+ print('WARNING: Ran out of GPU memory. Exiting comm op.')
+ sync_all()
+ break
+ sync_all()
+ timed_pt2pt(input, args)
+ else:
+ # Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
+ # Don't need output tensor, so double mem_factor
+ elements_per_gpu = max_numel(comm_op='pt2pt',
+ dtype=getattr(torch,
+ args.dtype),
+ mem_factor=args.mem_factor * 2,
+ local_rank=local_rank,
+ args=args)
+ try:
+ mat = torch.ones(elements_per_gpu,
+ dtype=getattr(torch,
+ args.dtype)).to(
+ get_accelerator().device_name(local_rank))
+ input = ((mat.mul_(float(global_rank))).view(-1))
+ except RuntimeError as e:
+ if 'out of memory' in str(e):
+ if dist.get_rank() == 0:
+ print(
+ 'WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!'
+ )
+ sync_all()
+ return
+ sync_all()
+ timed_pt2pt(input, args)
+
+
+if __name__ == "__main__":
+ args = benchmark_parser().parse_args()
+ rank = args.local_rank
+ init_processes(local_rank=rank, args=args)
+ run_pt2pt(local_rank=rank, args=args)
diff --git a/benchmarks/communication/run_all.py b/benchmarks/communication/run_all.py
new file mode 100644
index 0000000000000000000000000000000000000000..7ec562cc9ae0dcc101477117a6158b54a1f4272a
--- /dev/null
+++ b/benchmarks/communication/run_all.py
@@ -0,0 +1,49 @@
+'''Copyright The Microsoft DeepSpeed Team'''
+
+from benchmarks.communication.utils import *
+from benchmarks.communication.all_reduce import run_all_reduce
+from benchmarks.communication.all_gather import run_all_gather
+from benchmarks.communication.all_to_all import run_all_to_all
+from benchmarks.communication.pt2pt import run_pt2pt
+from benchmarks.communication.broadcast import run_broadcast
+from benchmarks.communication.constants import *
+
+
+# For importing
+def main(args, rank):
+
+ init_processes(local_rank=rank, args=args)
+
+ ops_to_run = []
+ if args.all_reduce:
+ ops_to_run.append('all_reduce')
+ if args.all_gather:
+ ops_to_run.append('all_gather')
+ if args.broadcast:
+ ops_to_run.append('broadcast')
+ if args.pt2pt:
+ ops_to_run.append('pt2pt')
+ if args.all_to_all:
+ ops_to_run.append('all_to_all')
+
+ if len(ops_to_run) == 0:
+ ops_to_run = ['all_reduce', 'all_gather', 'all_to_all', 'broadcast', 'pt2pt']
+
+ for comm_op in ops_to_run:
+ if comm_op == 'all_reduce':
+ run_all_reduce(local_rank=rank, args=args)
+ if comm_op == 'all_gather':
+ run_all_gather(local_rank=rank, args=args)
+ if comm_op == 'all_to_all':
+ run_all_to_all(local_rank=rank, args=args)
+ if comm_op == 'pt2pt':
+ run_pt2pt(local_rank=rank, args=args)
+ if comm_op == 'broadcast':
+ run_broadcast(local_rank=rank, args=args)
+
+
+# For directly calling benchmark
+if __name__ == "__main__":
+ args = benchmark_parser().parse_args()
+ rank = args.local_rank
+ main(args, rank)
diff --git a/benchmarks/communication/utils.py b/benchmarks/communication/utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..b913dda14fe552cd7dc2f9fb46d878f5366d4c2a
--- /dev/null
+++ b/benchmarks/communication/utils.py
@@ -0,0 +1,220 @@
+'''Copyright The Microsoft DeepSpeed Team'''
+
+import torch
+import os
+import math
+import argparse
+from benchmarks.communication.constants import *
+from deepspeed.accelerator import get_accelerator
+
+global dist
+
+
+def init_torch_distributed(backend):
+ global dist
+ import torch.distributed as dist
+ torch.distributed.init_process_group(backend)
+ local_rank = int(os.environ['LOCAL_RANK'])
+ get_accelerator().set_device(local_rank)
+
+
+def init_deepspeed_comm(backend):
+ global dist
+ import deepspeed
+ import deepspeed.comm as dist
+ deepspeed.init_distributed(dist_backend=backend)
+ local_rank = int(os.environ['LOCAL_RANK'])
+ get_accelerator().set_device(local_rank)
+
+
+def init_processes(local_rank, args):
+ if args.dist == 'deepspeed':
+ init_deepspeed_comm(args.backend)
+ elif args.dist == 'torch':
+ init_torch_distributed(args.backend)
+ else:
+ print_rank_0(f"distributed framework {args.dist} not supported")
+ exit(0)
+
+
+def print_rank_0(message):
+ if dist.get_rank() == 0:
+ print(message)
+
+
+def print_header(args, comm_op):
+ if comm_op == 'pt2pt':
+ world_size = 2
+ else:
+ world_size = dist.get_world_size()
+ tput = f'Throughput ({args.bw_unit})'
+ busbw = f'BusBW ({args.bw_unit})'
+ header = f"\n---- Performance of {comm_op} on {world_size} devices ---------------------------------------------------------\n"
+ duration_str = 'Duration'
+ if args.raw:
+ duration_str += ' (us)'
+ header += f"{'Size (Bytes)':20s} {'Description':25s} {duration_str:20s} {tput:20s} {busbw:20s}\n"
+ header += "----------------------------------------------------------------------------------------------------"
+ print_rank_0(header)
+
+
+def get_bw(comm_op, size, duration, args):
+ n = dist.get_world_size()
+ tput = 0
+ busbw = 0
+ if comm_op == "all_to_all":
+ tput = (size / duration)
+ busbw = (size / duration) * ((n - 1) / n)
+ elif comm_op == "all_gather":
+ size *= n
+ tput = (size / duration)
+ busbw = (size / duration) * ((n - 1) / n)
+ elif comm_op == "all_reduce":
+ tput = (size * 2 / duration)
+ busbw = (size / duration) * (2 * (n - 1) / n)
+ elif comm_op == "pt2pt" or comm_op == "broadcast":
+ tput = (size / duration)
+ busbw = tput
+ else:
+ print_rank_0("wrong comm_op specified")
+ exit(0)
+
+ if args.bw_unit == 'Gbps':
+ tput *= 8
+ busbw *= 8
+
+ return tput, busbw
+
+
+def get_metric_strings(args, tput, busbw, duration):
+ duration_ms = duration * 1e3
+ duration_us = duration * 1e6
+ tput = f'{tput / 1e9:.3f}'
+ busbw = f'{busbw /1e9:.3f}'
+
+ if duration_us < 1e3 or args.raw:
+ duration = f'{duration_us:.3f}'
+ if not args.raw:
+ duration += ' us'
+ else:
+ duration = f'{duration_ms:.3f} ms'
+ return tput, busbw, duration
+
+
+def sync_all():
+ get_accelerator().synchronize()
+ dist.barrier()
+
+
+def max_numel(comm_op, dtype, mem_factor, local_rank, args):
+ dtype_size = _element_size(dtype)
+ max_memory_per_gpu = get_accelerator().total_memory(local_rank) * mem_factor
+ if comm_op == 'all_reduce' or comm_op == 'pt2pt' or comm_op == 'broadcast':
+ elements_per_gpu = int(max_memory_per_gpu // dtype_size)
+ elif comm_op == 'all_gather':
+ # all_gather performance is lower for non-powers of two, and the output buffer size scales with world size
+ # Therefore, divide by world size and round down to nearest power of 2
+ elements_per_gpu = int(max_memory_per_gpu // dtype_size // dist.get_world_size())
+ elements_per_gpu = int(pow(2, int(math.log(elements_per_gpu, 2))))
+ elif comm_op == 'all_to_all':
+ # Number of elements must be divisible by world_size
+ # all_to_all performance is lower for non-powers of two. Round down like all_gather.
+ elements_per_gpu = int(max_memory_per_gpu // dtype_size)
+ elements_per_gpu = int(dist.get_world_size() *
+ round(elements_per_gpu / dist.get_world_size()))
+ elements_per_gpu = int(pow(2, int(math.log(elements_per_gpu, 2))))
+ else:
+ print(f"This communication operation: {comm_op} is not supported yet")
+ exit(0)
+ return elements_per_gpu
+
+
+# Helper function to pretty-print message sizes
+def convert_size(size_bytes):
+ if size_bytes == 0:
+ return "0B"
+ size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
+ i = int(math.floor(math.log(size_bytes, 1024)))
+ p = math.pow(1024, i)
+ s = round(size_bytes / p, 2)
+ return "%s %s" % (s, size_name[i])
+
+
+# Copied from torch. Need to add the func here for old torch compatibility.
+def _element_size(dtype):
+ """
+ Returns the element size for a dtype, in bytes
+ """
+ if not isinstance(dtype, torch.dtype):
+ raise RuntimeError(f'expected torch.dtype, but got {type(dtype)}')
+
+ if dtype.is_complex:
+ return torch.finfo(dtype).bits >> 2
+ elif dtype.is_floating_point:
+ return torch.finfo(dtype).bits >> 3
+ elif dtype == torch.bool:
+ # NOTE: torch.bool is not supported in torch.iinfo()
+ return 1
+ else:
+ return torch.iinfo(dtype).bits >> 3
+
+
+def benchmark_parser():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--local_rank", type=int)
+ parser.add_argument("--trials",
+ type=int,
+ default=DEFAULT_TRIALS,
+ help='Number of timed iterations')
+ parser.add_argument("--warmups",
+ type=int,
+ default=DEFAULT_WARMUPS,
+ help='Number of warmup (non-timed) iterations')
+ parser.add_argument("--maxsize",
+ type=int,
+ default=24,
+ help='Max message size as a power of 2')
+ parser.add_argument("--async-op",
+ action="store_true",
+ help='Enables non-blocking communication')
+ parser.add_argument("--bw-unit",
+ type=str,
+ default=DEFAULT_UNIT,
+ choices=['Gbps',
+ 'GBps'])
+ parser.add_argument("--backend",
+ type=str,
+ default=DEFAULT_BACKEND,
+ choices=['nccl',
+ 'ccl'],
+ help='Communication library to use')
+ parser.add_argument("--dist",
+ type=str,
+ default=DEFAULT_DIST,
+ choices=['deepspeed',
+ 'torch'],
+ help='Distributed DL framework to use')
+ parser.add_argument("--scan",
+ action="store_true",
+ help='Enables scanning all message sizes')
+ parser.add_argument("--raw",
+ action="store_true",
+ help='Print the message size and latency without units')
+ parser.add_argument("--all-reduce", action="store_true", help='Run all_reduce')
+ parser.add_argument("--all-gather", action="store_true", help='Run all_gather')
+ parser.add_argument("--all-to-all", action="store_true", help='Run all_to_all')
+ parser.add_argument("--pt2pt", action="store_true", help='Run pt2pt')
+ parser.add_argument("--broadcast", action="store_true", help='Run broadcast')
+ parser.add_argument("--dtype",
+ type=str,
+ default=DEFAULT_TYPE,
+ help='PyTorch tensor dtype')
+ parser.add_argument(
+ "--mem-factor",
+ type=float,
+ default=.4,
+ help='Proportion of max available GPU memory to use for single-size evals')
+ parser.add_argument("--debug",
+ action="store_true",
+ help='Enables all_to_all debug prints')
+ return parser
diff --git a/benchmarks/inference/bert-bench.py b/benchmarks/inference/bert-bench.py
new file mode 100644
index 0000000000000000000000000000000000000000..9d586d033cd7b375f5e82dfef53445199846630f
--- /dev/null
+++ b/benchmarks/inference/bert-bench.py
@@ -0,0 +1,92 @@
+'''Copyright The Microsoft DeepSpeed Team'''
+
+import torch
+import time
+import deepspeed
+import argparse
+from transformers import pipeline
+from deepspeed.accelerator import get_accelerator
+
+parser = argparse.ArgumentParser()
+parser.add_argument("--model", "-m", type=str, help="hf model name")
+parser.add_argument("--deepspeed", action="store_true", help="use deepspeed inference")
+parser.add_argument("--dtype", type=str, default="fp16", help="fp16 or fp32")
+parser.add_argument("--max-tokens", type=int, default=50, help="max new tokens")
+parser.add_argument("--local_rank", type=int, default=0, help="local rank")
+parser.add_argument("--trials", type=int, default=30, help="number of trials")
+parser.add_argument("--kernel-inject", action="store_true", help="inject kernels on")
+parser.add_argument("--graphs", action="store_true", help="CUDA Graphs on")
+args = parser.parse_args()
+
+
+def print_latency(latency_set, title, warmup=3):
+ # trim warmup queries
+ latency_set = latency_set[warmup:]
+ count = len(latency_set)
+ if count > 0:
+ latency_set.sort()
+ n50 = (count - 1) * 0.5 + 1
+ n90 = (count - 1) * 0.9 + 1
+ n95 = (count - 1) * 0.95 + 1
+ n99 = (count - 1) * 0.99 + 1
+ n999 = (count - 1) * 0.999 + 1
+
+ avg = sum(latency_set) / count
+ p50 = latency_set[int(n50) - 1]
+ p90 = latency_set[int(n90) - 1]
+ p95 = latency_set[int(n95) - 1]
+ p99 = latency_set[int(n99) - 1]
+ p999 = latency_set[int(n999) - 1]
+
+ print(f"====== latency stats {title} ======")
+ print("\tAvg Latency: {0:8.2f} ms".format(avg * 1000))
+ print("\tP50 Latency: {0:8.2f} ms".format(p50 * 1000))
+ print("\tP90 Latency: {0:8.2f} ms".format(p90 * 1000))
+ print("\tP95 Latency: {0:8.2f} ms".format(p95 * 1000))
+ print("\tP99 Latency: {0:8.2f} ms".format(p99 * 1000))
+ print("\t999 Latency: {0:8.2f} ms".format(p999 * 1000))
+
+
+deepspeed.init_distributed()
+
+print(args.model, args.max_tokens, args.dtype)
+
+if args.dtype.lower() == "fp16":
+ dtype = torch.float16
+else:
+ dtype = torch.float32
+
+pipe = pipeline("fill-mask", model=args.model, framework="pt", device=args.local_rank)
+
+if dtype == torch.half:
+ pipe.model.half()
+
+mask = pipe.tokenizer.mask_token
+
+br = pipe(f"Hello I'm a {mask} model")
+if args.deepspeed:
+ pipe.model = deepspeed.init_inference(pipe.model,
+ dtype=dtype,
+ mp_size=1,
+ replace_with_kernel_inject=args.kernel_inject,
+ enable_cuda_graph=args.graphs)
+ pipe.model.profile_model_time()
+
+responses = []
+times = []
+mtimes = []
+for i in range(args.trials):
+ get_accelerator().synchronize()
+ start = time.time()
+ r = pipe(f"Hello I'm a {mask} model")
+ get_accelerator().synchronize()
+ end = time.time()
+ responses.append(r)
+ times.append((end - start))
+ mtimes += pipe.model.model_times()
+ #print(f"{pipe.model.model_times()=}")
+
+print_latency(times, "e2e latency")
+print_latency(mtimes, "model latency")
+
+print(responses[0:3])
diff --git a/benchmarks/inference/collect_results.py b/benchmarks/inference/collect_results.py
new file mode 100644
index 0000000000000000000000000000000000000000..0e51033114db848d2d2ff14b2f33b009a2090672
--- /dev/null
+++ b/benchmarks/inference/collect_results.py
@@ -0,0 +1,147 @@
+'''Copyright The Microsoft DeepSpeed Team'''
+
+import os
+import re
+import argparse
+import pandas as pd
+
+parser = argparse.ArgumentParser()
+parser.add_argument(
+ "--results-dir",
+ "-r",
+ type=str,
+ default="./results",
+ help="directory containing sweep results",
+)
+parser.add_argument("--version",
+ "-v",
+ type=int,
+ default=0,
+ help="version to be collected")
+parser.add_argument("--gen-text-n",
+ "-n",
+ type=int,
+ default=1,
+ help="expected number of generated text")
+parser.add_argument("--output",
+ "-o",
+ type=str,
+ default="./results.csv",
+ help="output file")
+args = parser.parse_args()
+
+
+def get_branch(file_path):
+ match = re.match(r".*\/(.*)\.log", file_path)
+ if match is None:
+ return False
+ else:
+ return match.groups()[0]
+
+
+def get_benchmark_params(root_dir, file_path):
+ match = re.match(
+ rf"{root_dir}\/(.+?)_(fp\d+)_(true|false)_(true|false)_(\d+)gpus_v(\d+)\/",
+ file_path,
+ )
+ if match is None:
+ return False
+ else:
+ model, dtype, graphs, kernel, gpus, version = match.groups()
+ bool_dict = {"true": True, "false": False}
+ return {
+ "model": model,
+ "dtype": dtype,
+ "graphs": bool_dict[graphs.lower()],
+ "kernel": bool_dict[kernel.lower()],
+ "gpus": int(gpus),
+ "version": int(version),
+ }
+
+
+def get_perf_data(file_content):
+ matches = re.findall(r"\s+(.+?)\sLatency:\s+(\d+\.\d+)\sms", file_content)
+ if matches is []:
+ return False
+ else:
+ return {f"latency-{key}": float(val) for key, val in matches}
+
+
+def get_generated_text(file_content, gen_text_n):
+ file_content = file_content.replace("\n", " ")
+ file_content = file_content.replace("\t", " ")
+ matches = re.findall(r"RESPONSE\s(\d+):\s+[-]{30}\s+(.+?)\s+[-]{30}", file_content)
+ if len(matches) != gen_text_n:
+ return False
+ else:
+ return {f"generated-text-{key}": val for key, val in matches}
+
+
+def get_error(file_content):
+ matches = re.findall(r"Error:\s+(.+?)\n", file_content)
+ if matches is []:
+ return False
+ else:
+ return {f"error": val for val in matches}
+
+
+if __name__ == "__main__":
+ # List to collect data from all benchmarks
+ benchmarks_data = []
+
+ # Walk through directory of results from sweep.sh
+ for root, dirs, files in os.walk(args.results_dir):
+ # Because of how some models are named, the dir structure for results can vary, e.g.:
+ # "EleutherAI/gpt-neo_*/baseline.log" versus "gpt2_*/baseline.log"
+ if dirs:
+ continue
+
+ # Get data from baseline and each tested branch
+ for name in files:
+ file_path = os.path.join(root, name)
+
+ branch = get_branch(file_path)
+ if not branch:
+ print(f"WARNING: Could not detect branch for file {file_path}, skipping")
+ continue
+
+ params = get_benchmark_params(args.results_dir, file_path)
+ if not params:
+ print(
+ f"WARNING: Could not detect benchmark settings for file {file_path}, skipping"
+ )
+ continue
+
+ # Verify that the version matches that which we want to collect
+ if params["version"] != args.version:
+ continue
+
+ with open(file_path, "r") as f:
+ file_content = f.read()
+
+ perf_data = get_perf_data(file_content)
+ if not perf_data:
+ print(
+ f"WARNING: Could not detect benchmark performance data for file {file_path}"
+ )
+
+ generated_text = get_generated_text(file_content, args.gen_text_n)
+ if not generated_text:
+ print(f"WARNING: Could not detect generated text for file {file_path}")
+
+ error = get_error(file_content)
+ if error:
+ print(f"Error found in {file_path}, collecting error info...")
+ benchmarks_data.append({"branch": branch, **params, **error})
+ continue
+
+ benchmarks_data.append({
+ "branch": branch,
+ **params,
+ **perf_data,
+ **generated_text
+ })
+
+ # Convert to a DataFrame and save
+ benchmarks_df = pd.DataFrame(benchmarks_data)
+ benchmarks_df.to_csv(args.output)
diff --git a/benchmarks/inference/gpt-bench.py b/benchmarks/inference/gpt-bench.py
new file mode 100644
index 0000000000000000000000000000000000000000..29578b30cf1faf1638bbd0a84865ee9c283e9443
--- /dev/null
+++ b/benchmarks/inference/gpt-bench.py
@@ -0,0 +1,124 @@
+'''Copyright The Microsoft DeepSpeed Team'''
+
+import os
+import torch
+import time
+import deepspeed
+import argparse
+from transformers import pipeline
+from deepspeed.accelerator import get_accelerator
+
+parser = argparse.ArgumentParser()
+parser.add_argument("--model", "-m", type=str, help="hf model name")
+parser.add_argument("--deepspeed", action="store_true", help="use deepspeed inference")
+parser.add_argument("--dtype",
+ type=str,
+ default="fp16",
+ choices=["fp16",
+ "fp32",
+ "int8"],
+ help="int8, fp16, or fp32")
+parser.add_argument("--graphs", action="store_true", help="CUDA Graphs on")
+parser.add_argument("--kernel-inject", action="store_true", help="inject kernels on")
+parser.add_argument("--max-tokens", type=int, default=50, help="max new tokens")
+parser.add_argument("--local_rank",
+ type=int,
+ default=int(os.getenv("LOCAL_RANK",
+ "0")),
+ help="local rank")
+parser.add_argument("--world_size",
+ type=int,
+ default=int(os.getenv("WORLD_SIZE",
+ "1")),
+ help="world size")
+parser.add_argument("--trials", type=int, default=30, help="number of trials")
+args = parser.parse_args()
+
+
+def print_latency(latency_set, title, warmup=3):
+ # trim warmup queries
+ latency_set = list(latency_set)
+ latency_set = latency_set[warmup:]
+ count = len(latency_set)
+ if count > 0:
+ latency_set.sort()
+ n50 = (count - 1) * 0.5 + 1
+ n90 = (count - 1) * 0.9 + 1
+ n95 = (count - 1) * 0.95 + 1
+ n99 = (count - 1) * 0.99 + 1
+ n999 = (count - 1) * 0.999 + 1
+
+ avg = sum(latency_set) / count
+ p50 = latency_set[int(n50) - 1]
+ p90 = latency_set[int(n90) - 1]
+ p95 = latency_set[int(n95) - 1]
+ p99 = latency_set[int(n99) - 1]
+ p999 = latency_set[int(n999) - 1]
+
+ print(f"====== latency stats {title} ======")
+ print("\tAvg Latency: {0:8.2f} ms".format(avg * 1000))
+ print("\tP50 Latency: {0:8.2f} ms".format(p50 * 1000))
+ print("\tP90 Latency: {0:8.2f} ms".format(p90 * 1000))
+ print("\tP95 Latency: {0:8.2f} ms".format(p95 * 1000))
+ print("\tP99 Latency: {0:8.2f} ms".format(p99 * 1000))
+ print("\t999 Latency: {0:8.2f} ms".format(p999 * 1000))
+
+
+deepspeed.init_distributed()
+
+if args.local_rank == 0:
+ print("BENCHMARK SETTINGS:")
+ print(f"\tMODEL: {args.model}")
+ print(f"\tMAX_TOKENS: {args.max_tokens}")
+ print(f"\tDTYPE: {args.dtype}")
+ print(f"\tCUDA_GRAPHS: {args.graphs}")
+ print(f"\tKERNEL_INJECT: {args.kernel_inject}")
+
+if args.dtype == "int8":
+ dtype = torch.int8
+elif args.dtype == "fp16":
+ dtype = torch.float16
+else:
+ dtype = torch.float32
+
+pipe = pipeline("text-generation",
+ model=args.model,
+ framework="pt",
+ device=args.local_rank)
+
+if dtype == torch.float16:
+ pipe.model.half()
+
+if args.deepspeed:
+ pipe.model = deepspeed.init_inference(
+ pipe.model,
+ dtype=dtype,
+ mp_size=args.world_size,
+ replace_with_kernel_inject=args.kernel_inject,
+ enable_cuda_graph=args.graphs,
+ )
+ pipe.model.profile_model_time()
+
+responses = []
+times = []
+mtimes = []
+for i in range(args.trials):
+ get_accelerator().synchronize()
+ start = time.time()
+ r = pipe("DeepSpeed is", do_sample=False, max_new_tokens=args.max_tokens)
+ get_accelerator().synchronize()
+ end = time.time()
+ responses.append(r)
+ times.append(end - start) # / (args.max_tokens - 3))
+ mtimes.append(sum(pipe.model.model_times()))
+
+if args.local_rank == 0:
+ print_latency(times, "(e2e) latency")
+ print_latency(mtimes, "(model-only) latency")
+ print_latency(map(lambda t: t / (args.max_tokens - 3),
+ times),
+ "(e2e) per token latency")
+ print(f"RESPONSE 0:")
+ print("-" * 30)
+ print(responses[0][0]["generated_text"])
+ print("-" * 30)
diff --git a/benchmarks/inference/requirements.txt b/benchmarks/inference/requirements.txt
new file mode 100644
index 0000000000000000000000000000000000000000..00899dd5f4858229e4115fd2b80b7807636892bd
--- /dev/null
+++ b/benchmarks/inference/requirements.txt
@@ -0,0 +1 @@
+transformers>=4.21.3
diff --git a/benchmarks/inference/run_model.sh b/benchmarks/inference/run_model.sh
new file mode 100644
index 0000000000000000000000000000000000000000..8e5fe3ac0133150a5de05f76da4951a2ead6be58
--- /dev/null
+++ b/benchmarks/inference/run_model.sh
@@ -0,0 +1,36 @@
+set -x
+
+model=$1
+branch1=$2
+branch2=$3
+dtype=$4
+graphs=$5
+kernel=$6
+gpus=$7
+
+version=0
+log_path=results/${model}_${dtype}_${graphs}_${kernel}_${gpus}gpus_v${version}
+mkdir -p ${log_path}
+
+params="--dtype $dtype "
+if [[ "$graphs" == "true" ]]; then
+ params+="--graphs "
+fi
+if [[ "$kernel" == "true" ]]; then
+ params+="--kernel "
+fi
+
+echo "baseline $log_path"
+deepspeed --num_gpus 1 gpt-bench.py -m "${model}" $params &> ${log_path}/baseline.log
+
+cd ../../
+git checkout ${branch1}
+cd -
+echo "ds ${branch1} $log_path"
+deepspeed --num_gpus $gpus gpt-bench.py --deepspeed -m "${model}" $params &> ${log_path}/ds-${branch1}.log
+
+cd ../../
+git checkout ${branch2}
+cd -
+echo "ds ${branch2} $log_path"
+deepspeed --num_gpus $gpus gpt-bench.py --deepspeed -m "${model}" $params&> ${log_path}/ds-${branch2}.log
diff --git a/benchmarks/inference/sweep.sh b/benchmarks/inference/sweep.sh
new file mode 100644
index 0000000000000000000000000000000000000000..aabcb0bfdbd89e2eedd97d3e6afa74c0e50e7803
--- /dev/null
+++ b/benchmarks/inference/sweep.sh
@@ -0,0 +1,41 @@
+set -x
+
+export TRANSFORMERS_CACHE=/tmp/hf-cache
+
+branch1=$1
+branch2=$2
+
+gptneo_models="EleutherAI/gpt-neo-2.7B EleutherAI/gpt-neo-1.3B EleutherAI/gpt-neo-125M"
+gpt2_models="gpt2 gpt2-large gpt2-xl"
+gptj_models="EleutherAI/gpt-j-6B"
+opt_models="facebook/opt-125m facebook/opt-1.3b facebook/opt-2.7b facebook/opt-6.7b facebook/opt-13b"
+bloom_models="bigscience/bloom-560m bigscience/bloom-1b7 bigscience/bloom-3b bigscience/bloom-7b1"
+
+for gpus in `echo "1 2 4 8"`; do
+ for dtype in `echo "fp16 fp32"`; do
+ for graphs in `echo "true false"`; do
+ for kernel in `echo "true false"`; do
+ params="$dtype $graphs $kernel $gpus"
+ for m in `echo "$gptneo_models"`; do
+ bash run_model.sh $m $branch1 $branch2 $params
+ done
+
+ for m in `echo "$gpt2_models"`; do
+ bash run_model.sh $m $branch1 $branch2 $params
+ done
+
+ for m in `echo "$gptj_models"`; do
+ bash run_model.sh $m $branch1 $branch2 $params
+ done
+
+ for m in `echo "$opt_models"`; do
+ bash run_model.sh $m $branch1 $branch2 $params
+ done
+
+ for m in `echo "$bloom_models"`; do
+ bash run_model.sh $m $branch1 $branch2 $params
+ done
+ done
+ done
+ done
+done
diff --git a/bin/deepspeed b/bin/deepspeed
deleted file mode 100644
index 5ec8820db922fcdb284ff18cbe7f21c3b2e4d38b..0000000000000000000000000000000000000000
--- a/bin/deepspeed
+++ /dev/null
@@ -1,6 +0,0 @@
-#!/usr/bin/env python3
-
-from deepspeed.launcher.runner import main
-
-if __name__ == '__main__':
- main()
diff --git a/bin/deepspeed b/bin/deepspeed
new file mode 120000
index 0000000000000000000000000000000000000000..6b768564101983015fd56c8d604e439c2374ad06
--- /dev/null
+++ b/bin/deepspeed
@@ -0,0 +1 @@
+ds
\ No newline at end of file
diff --git a/bin/deepspeed.pt b/bin/deepspeed.pt
deleted file mode 100644
index 5ec8820db922fcdb284ff18cbe7f21c3b2e4d38b..0000000000000000000000000000000000000000
--- a/bin/deepspeed.pt
+++ /dev/null
@@ -1,6 +0,0 @@
-#!/usr/bin/env python3
-
-from deepspeed.launcher.runner import main
-
-if __name__ == '__main__':
- main()
diff --git a/bin/deepspeed.pt b/bin/deepspeed.pt
new file mode 120000
index 0000000000000000000000000000000000000000..6b768564101983015fd56c8d604e439c2374ad06
--- /dev/null
+++ b/bin/deepspeed.pt
@@ -0,0 +1 @@
+ds
\ No newline at end of file
diff --git a/bin/ds b/bin/ds
old mode 100644
new mode 100755
diff --git a/bin/ds_bench b/bin/ds_bench
new file mode 100755
index 0000000000000000000000000000000000000000..bfacbc8e25c8d21958a7cd44572f7177d5ff3cb5
--- /dev/null
+++ b/bin/ds_bench
@@ -0,0 +1,17 @@
+#!/usr/bin/env python3
+
+from benchmarks.communication.run_all import main
+from benchmarks.communication.constants import *
+from benchmarks.communication.utils import *
+import os
+import sys
+
+# Run the same file with deepspeed launcher. This is required since setuptools will auto-detect python files and insert a python shebang for both 'scripts' and 'entry_points', and this benchmarks require the DS launcher
+required_env = ["RANK", "WORLD_SIZE", "MASTER_ADDR", "MASTER_PORT", "LOCAL_RANK"]
+if not all(map(lambda v: v in os.environ, required_env)):
+ import subprocess
+ subprocess.run("deepspeed $(which ds_bench) " + " ".join(sys.argv[1:]), shell=True)
+else:
+ args = benchmark_parser().parse_args()
+ rank = args.local_rank
+ main(args, rank)
diff --git a/bin/ds_elastic b/bin/ds_elastic
old mode 100644
new mode 100755
diff --git a/bin/ds_report b/bin/ds_report
old mode 100644
new mode 100755
diff --git a/bin/ds_ssh b/bin/ds_ssh
old mode 100644
new mode 100755
diff --git a/bin/dsr b/bin/dsr
new file mode 120000
index 0000000000000000000000000000000000000000..747bf4722c429a1e845cc1fc7527e249bec6af2c
--- /dev/null
+++ b/bin/dsr
@@ -0,0 +1 @@
+ds_report
\ No newline at end of file
diff --git a/build_win.bat b/build_win.bat
new file mode 100644
index 0000000000000000000000000000000000000000..ec8c8a362a783df677862d8b500ade56896422de
--- /dev/null
+++ b/build_win.bat
@@ -0,0 +1,19 @@
+@echo off
+
+set DS_BUILD_AIO=0
+set DS_BUILD_SPARSE_ATTN=0
+
+echo Administrative permissions required. Detecting permissions...
+
+net session >nul 2>&1
+if %errorLevel% == 0 (
+ echo Success: Administrative permissions confirmed.
+) else (
+ echo Failure: Current permissions inadequate.
+ goto end
+)
+
+
+python setup.py bdist_wheel
+
+:end
diff --git a/csrc/adagrad/cpu_adagrad.cpp b/csrc/adagrad/cpu_adagrad.cpp
index 4f2a9b69ef966599d1bd6664f79e312c9240671b..9f8f95c4a876fdd5883cceea70f809d46544e994 100644
--- a/csrc/adagrad/cpu_adagrad.cpp
+++ b/csrc/adagrad/cpu_adagrad.cpp
@@ -1,16 +1,21 @@
+#ifdef __HIPCC__
+#include "cpu_adagrad_hip.h"
+#else
#include "cpu_adagrad.h"
-#include
-#include
-#include
+#endif
+
#include
#include
#include
#include
#include
+#if defined(__ENABLE_CUDA__)
+#include
#include "cublas_v2.h"
#include "cuda.h"
#include "curand.h"
#include "custom_cuda_layers.h"
+#endif
static std::unordered_map> s_optimizers;
@@ -20,7 +25,7 @@ void Adagrad_Optimizer::Step_1(float* _params,
float* grads,
float* _exp_avg_sq,
size_t _param_size,
- __half* dev_params,
+ ds_half_precision_t* dev_params,
bool half_precision)
{
size_t rounded_size = 0;
@@ -30,17 +35,19 @@ void Adagrad_Optimizer::Step_1(float* _params,
#endif
if (_param_size > rounded_size) {
float step_size = -1 * _alpha;
- __half* grads_cast_h;
- __half* params_cast_h;
+ ds_half_precision_t* grads_cast_h;
+ ds_half_precision_t* params_cast_h;
if (half_precision) {
- grads_cast_h = reinterpret_cast<__half*>(grads);
- params_cast_h = reinterpret_cast<__half*>(_params);
+ grads_cast_h = reinterpret_cast(grads);
+ params_cast_h = reinterpret_cast(_params);
}
for (size_t t = rounded_size; t < _param_size; t += TILE) {
size_t copy_size = TILE;
if ((t + TILE) > _param_size) copy_size = _param_size - t;
size_t offset = copy_size + t;
+#if defined(__ENABLE_CUDA__)
if ((t / TILE) >= 2) { cudaStreamSynchronize(_streams[_buf_index]); }
+#endif
#pragma omp parallel for
for (size_t k = t; k < offset; k++) {
float grad = half_precision ? (float)grads_cast_h[k] : grads[k];
@@ -55,21 +62,24 @@ void Adagrad_Optimizer::Step_1(float* _params,
grad += _eps;
grad = momentum / grad;
param = grad * step_size + param;
+#if defined(__ENABLE_CUDA__)
if (dev_params) _doubled_buffer[_buf_index][k - t] = param;
-
+#endif
if (half_precision)
- params_cast_h[k] = (__half)param;
+ params_cast_h[k] = (ds_half_precision_t)param;
else
_params[k] = param;
// STORE UPDATE TERM TO GRAD'S MEMORY
grads[k] = grad * step_size;
_exp_avg_sq[k] = variance;
}
+#if defined(__ENABLE_CUDA__)
if (dev_params) {
launch_param_update(
_doubled_buffer[_buf_index], dev_params + t, (copy_size), _streams[_buf_index]);
_buf_index = !_buf_index;
}
+#endif
}
}
}
@@ -78,7 +88,7 @@ void Adagrad_Optimizer::Step_4(float* _params,
float* grads,
float* _exp_avg_sq,
size_t _param_size,
- __half* dev_params,
+ ds_half_precision_t* dev_params,
bool half_precision)
{
size_t rounded_size = 0;
@@ -130,7 +140,7 @@ void Adagrad_Optimizer::Step_8(float* _params,
float* grads,
float* _exp_avg_sq,
size_t _param_size,
- __half* dev_params,
+ ds_half_precision_t* dev_params,
bool half_precision)
{
size_t rounded_size = 0;
@@ -170,7 +180,9 @@ int ds_adagrad_step(int optimizer_id,
opt->update_state(lr, epsilon, weight_decay);
opt->Step_8(params_ptr, grads_ptr, exp_avg_sq_ptr, params_c.size(0));
+#if defined(__ENABLE_CUDA__)
opt->SynchronizeStreams();
+#endif
return 0;
}
@@ -184,6 +196,7 @@ int ds_adagrad_step_plus_copy(int optimizer_id,
torch::Tensor& exp_avg_sq,
torch::Tensor& gpu_params)
{
+#if defined(__ENABLE_CUDA__)
auto params_c = params.contiguous();
auto gpu_params_c = gpu_params.contiguous();
auto exp_avg_sq_c = exp_avg_sq.contiguous();
@@ -191,7 +204,7 @@ int ds_adagrad_step_plus_copy(int optimizer_id,
float* params_ptr = (float*)params_c.data_ptr();
float* grads_ptr = (float*)grads_c.data_ptr();
- __half* gpu_params_ptr = (__half*)gpu_params_c.data_ptr();
+ ds_half_precision_t* gpu_params_ptr = (ds_half_precision_t*)gpu_params_c.data_ptr();
float* exp_avg_sq_ptr = (float*)exp_avg_sq_c.data_ptr();
std::shared_ptr opt =
@@ -206,6 +219,9 @@ int ds_adagrad_step_plus_copy(int optimizer_id,
(params.options().dtype() == at::kHalf));
opt->SynchronizeStreams();
+#else
+ assert(false);
+#endif
return 0;
}
diff --git a/csrc/adam/cpu_adam.cpp b/csrc/adam/cpu_adam.cpp
index 727eec8182c12cdcc0dcb0df53a38918bdeae4b8..f17f22535ab8dfd56260daff7a2479e771f376a4 100644
--- a/csrc/adam/cpu_adam.cpp
+++ b/csrc/adam/cpu_adam.cpp
@@ -1,16 +1,18 @@
#include "cpu_adam.h"
-#include
-#include
-#include
#include
+#include
#include
#include
#include
#include
+
+#if defined(__ENABLE_CUDA__)
+#include
#include "cublas_v2.h"
#include "cuda.h"
#include "curand.h"
#include "custom_cuda_layers.h"
+#endif
static std::unordered_map> s_optimizers;
@@ -21,7 +23,7 @@ void Adam_Optimizer::Step_1(float* _params,
float* _exp_avg,
float* _exp_avg_sq,
size_t _param_size,
- __half* dev_params,
+ ds_half_precision_t* dev_params,
bool half_precision)
{
size_t rounded_size = 0;
@@ -41,19 +43,20 @@ void Adam_Optimizer::Step_1(float* _params,
float step_size = -1 * _alpha / _bias_correction1;
float w_decay = -1 * _alpha * _weight_decay;
- __half* grads_cast_h;
- __half* params_cast_h;
+ ds_half_precision_t* grads_cast_h;
+ ds_half_precision_t* params_cast_h;
if (half_precision) {
- grads_cast_h = reinterpret_cast<__half*>(grads);
- params_cast_h = reinterpret_cast<__half*>(_params);
+ grads_cast_h = reinterpret_cast(grads);
+ params_cast_h = reinterpret_cast(_params);
}
for (size_t t = rounded_size; t < _param_size; t += TILE) {
size_t copy_size = TILE;
if ((t + TILE) > _param_size) copy_size = _param_size - t;
size_t offset = copy_size + t;
+#if defined(__ENABLE_CUDA__)
if ((t / TILE) >= 2) { cudaStreamSynchronize(_streams[_buf_index]); }
-
+#endif
#pragma omp parallel for
for (size_t k = t; k < offset; k++) {
float grad = half_precision ? (float)grads_cast_h[k] : grads[k];
@@ -73,21 +76,24 @@ void Adam_Optimizer::Step_1(float* _params,
grad = momentum / grad;
if (_weight_decay > 0 && _adamw_mode) { param += w_decay * param; }
param = grad * step_size + param;
+#if defined(__ENABLE_CUDA__)
if (dev_params) _doubled_buffer[_buf_index][k - t] = param;
-
+#endif
if (half_precision)
- params_cast_h[k] = (__half)param;
+ params_cast_h[k] = (ds_half_precision_t)param;
else
_params[k] = param;
_exp_avg[k] = momentum;
_exp_avg_sq[k] = variance;
}
+#if defined(__ENABLE_CUDA__)
if (dev_params) {
launch_param_update(
_doubled_buffer[_buf_index], dev_params + t, (copy_size), _streams[_buf_index]);
_buf_index = !_buf_index;
}
+#endif
}
}
}
@@ -97,7 +103,7 @@ void Adam_Optimizer::Step_4(float* _params,
float* _exp_avg,
float* _exp_avg_sq,
size_t _param_size,
- __half* dev_params,
+ ds_half_precision_t* dev_params,
bool half_precision)
{
size_t rounded_size = 0;
@@ -166,7 +172,7 @@ void Adam_Optimizer::Step_8(float* _params,
float* _exp_avg,
float* _exp_avg_sq,
size_t _param_size,
- __half* dev_params,
+ ds_half_precision_t* dev_params,
bool half_precision)
{
size_t rounded_size = 0;
@@ -228,7 +234,9 @@ int ds_adam_step(int optimizer_id,
nullptr,
(params.options().dtype() == at::kHalf));
+#if defined(__ENABLE_CUDA__)
opt->SynchronizeStreams();
+#endif
return 0;
}
@@ -246,6 +254,7 @@ int ds_adam_step_plus_copy(int optimizer_id,
torch::Tensor& exp_avg_sq,
torch::Tensor& gpu_params)
{
+#if defined(__ENABLE_CUDA__)
auto params_c = params.contiguous();
auto gpu_params_c = gpu_params.contiguous();
auto exp_avg_c = exp_avg.contiguous();
@@ -254,7 +263,7 @@ int ds_adam_step_plus_copy(int optimizer_id,
float* params_ptr = (float*)params_c.data_ptr();
float* grads_ptr = (float*)grads_c.data_ptr();
- __half* gpu_params_ptr = (__half*)gpu_params_c.data_ptr();
+ ds_half_precision_t* gpu_params_ptr = (ds_half_precision_t*)gpu_params_c.data_ptr();
float* exp_avg_ptr = (float*)exp_avg_c.data_ptr();
float* exp_avg_sq_ptr = (float*)exp_avg_sq_c.data_ptr();
@@ -271,6 +280,9 @@ int ds_adam_step_plus_copy(int optimizer_id,
(params.options().dtype() == at::kHalf));
opt->SynchronizeStreams();
+#else
+ assert(false);
+#endif
return 0;
}
diff --git a/csrc/adam/multi_tensor_adam.cu b/csrc/adam/multi_tensor_adam.cu
index 3cb9763befcea663966347f23a2d9b925a7db8bd..611d9ffbe375306d94356abfefe81b29442454d4 100644
--- a/csrc/adam/multi_tensor_adam.cu
+++ b/csrc/adam/multi_tensor_adam.cu
@@ -12,7 +12,11 @@
#include
+#ifdef __HIPCC__
+#include "multi_tensor_apply_hip.cuh"
+#else
#include "multi_tensor_apply.cuh"
+#endif
#include "type_shim.h"
#define BLOCK_SIZE 512
diff --git a/csrc/aio/common/deepspeed_aio_utils.cpp b/csrc/aio/common/deepspeed_aio_utils.cpp
index 200c7030f120366c2e2a45cb6cc20785ec4518fd..e8bf9de1125907c898e21f52e81cef2cb08b131a 100644
--- a/csrc/aio/common/deepspeed_aio_utils.cpp
+++ b/csrc/aio/common/deepspeed_aio_utils.cpp
@@ -6,6 +6,7 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices.
*/
#include
+#include
#include "deepspeed_aio_utils.h"
@@ -113,8 +114,8 @@ void* ds_page_aligned_alloc(const size_t size, const bool lock)
auto mlock_ret = mlock(ptr, size);
if (mlock_ret != 0) {
auto mlock_error = errno;
- printf("mlock failed with %d %s\n", mlock_error, strerror(mlock_error));
-
+ std::cerr << "mlock failed to allocate " << size << " bytes with error no " << mlock_error
+ << " msg " << strerror(mlock_error) << std::endl;
free(ptr);
return nullptr;
}
diff --git a/csrc/aio/py_lib/deepspeed_pin_tensor.cpp b/csrc/aio/py_lib/deepspeed_pin_tensor.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..20bdf5b142a63a88e3636d6375d62b30bf4fd841
--- /dev/null
+++ b/csrc/aio/py_lib/deepspeed_pin_tensor.cpp
@@ -0,0 +1,43 @@
+/*
+Copyright 2023 The Microsoft DeepSpeed Team
+Licensed under the MIT license.
+
+Functionality for managing CPU tensors occupying page-locked memory.
+*/
+
+#include "deepspeed_pin_tensor.h"
+
+using namespace std;
+
+deepspeed_pin_tensor_t::~deepspeed_pin_tensor_t()
+{
+ for (auto iter = _locked_tensors.begin(); iter != _locked_tensors.end(); ++iter) {
+ munlock(iter->first, iter->second);
+ }
+ _locked_tensors.clear();
+}
+
+torch::Tensor deepspeed_pin_tensor_t::alloc(const size_t num_elem, const at::ScalarType& elem_type)
+{
+ const auto num_bytes = num_elem * elementSize(elem_type);
+ auto pinned_buffer = ds_page_aligned_alloc(num_bytes, true);
+ assert(nullptr != pinned_buffer);
+
+ _locked_tensors[pinned_buffer] = num_bytes;
+
+ auto options = torch::TensorOptions().dtype(elem_type).device(torch::kCPU);
+
+ return at::from_blob(pinned_buffer, static_cast(num_bytes), options);
+}
+
+bool deepspeed_pin_tensor_t::free(torch::Tensor& locked_tensor)
+{
+ auto addr = locked_tensor.data_ptr();
+ if (_locked_tensors.find(addr) != _locked_tensors.end()) {
+ munlock(addr, _locked_tensors[addr]);
+ _locked_tensors.erase(addr);
+ return true;
+ }
+
+ return false;
+}
diff --git a/csrc/aio/py_lib/deepspeed_pin_tensor.h b/csrc/aio/py_lib/deepspeed_pin_tensor.h
new file mode 100644
index 0000000000000000000000000000000000000000..a421bbc8f3b1ac155348a48387f40601dfe7c429
--- /dev/null
+++ b/csrc/aio/py_lib/deepspeed_pin_tensor.h
@@ -0,0 +1,24 @@
+/*
+Copyright 2023 The Microsoft DeepSpeed Team
+Licensed under the MIT license.
+
+Functionality for managing CPU tensors occupying page-locked memory.
+TODO: Implement a full-featured manager that
+ 1. Avoid page-locked memory leaks
+ 2. Minimize page-locked memory usage by reducing internal fragmentation
+*/
+
+#include