-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtorch_task.py
More file actions
132 lines (111 loc) · 4.74 KB
/
torch_task.py
File metadata and controls
132 lines (111 loc) · 4.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from __future__ import absolute_import
import os
from ctypes import CDLL, RTLD_GLOBAL
from bytescheduler.common import get_ext_suffix
from bytetask import ByteTask
import torch
import torch.distributed as dist
# Load c_lib.so
dll_path = os.path.join("/home/soboru963/anaconda3/envs/pytorch19/lib/python3.8/site-packages/bytescheduler-0.1.0-py3.8-linux-x86_64.egg/bytescheduler/pytorch",
'c_lib' + get_ext_suffix())
BYTESCHEDULER_LIB = CDLL(dll_path, RTLD_GLOBAL)
class TorchTask(ByteTask):
def _do(self):
"""Start allreduce."""
#print(f'before allreduce {self.name} {self._tensor[0]}')
handle, ctx = self._comm.allreduce_grad_async(self._tensor, self.name)
self._comm.event_queue.put((self.kwargs['parameter'], self._tensor, handle, ctx, self._finish_callback))
def _finish_callback(self):
"""Notify Core about completion of a tensor.
Returns:
a boolean value indicating whether the task or parent task is finished or not.
"""
#print("finish callback")
if self.parent is not None:
parent_finish = self.notify_finish()
return parent_finish
else:
self.notify_finish()
return True
def _prepare(self):
"""Add a CUDA query event to check the readiness of a tensor, i.e., when backward propagation is finished."""
#device = self._tensor.device
# CPU device id is -1
#device_id = -1
#if device.index is None:
# assert device.type == "cpu"
#else:
# assert device.type == "cuda"
# device_id = device.index
##print(f"#####################{device_id}########################")
#handle = BYTESCHEDULER_LIB.bytescheduler_create_event(device_id)
handle = torch.cuda.Event()
#self.kwargs['distributed'].barrier()
self._comm.event_queue.put((self.kwargs['parameter'], self._tensor, handle, "READYEVENT", self.notify_ready))
#handle.record()
#torch.cuda.synchronize()
def _tensor_size(self):
"""Returns the number of parameters of the tensor."""
size = 1
for s in list(self._tensor.size()):
size = size * s
return size
def _partition_tensor_v2(self, partition_list):
"""Partition a tensor according to a partition size list.
Arguments:
size: a list of integers indicating the relative size of each partition.
Returns:
A list of partitioned tensors.
"""
assert partition_list is not None, "partition list can not be None."
partition_list_merge = []
for key in partition_list:
if key != -1:
partition_list_merge.append(key)
partition_list = partition_list_merge
self._logger.debug(
"call v2 partition func for key {}, partition list: {}".format(self.name, partition_list))
number = sum(partition_list)
if number > self._tensor.shape[0]:
self._logger.warning(
"The number of tensor rows (with shape {}) is smaller than partition number {}.".format(self._tensor.shape, number))
number = self._tensor.shape[0]
num_per_partition = self._tensor.shape[0] // number
partitions = []
start = 0
for i in range(len(partition_list)):
end = num_per_partition * partition_list[i] + start
if i == len(partition_list) - 1:
avatar = self._tensor[start:]
else:
avatar = self._tensor[start:end]
partitions.append(avatar)
start = end
return partitions
def _partition_tensor(self, size):
"""Partition a tensor evenly.
Arguments:
size: An integer. After partitioning, each tensor partition size must be equal or smaller than `size`.
Returns:
A list of partitioned tensors.
"""
number = (self._tensor_size() - 1) // size + 1
if number > self._tensor.shape[0]:
self._logger.warning(
"The number of tensor rows (with shape {}) is smaller than partition number {}.".format(self._tensor.shape, number))
number = self._tensor.shape[0]
num_per_partition = self._tensor.shape[0] // number
partitions_with_extra = self._tensor.shape[0] % number
partitions = []
start = 0
end = num_per_partition
for i in range(number):
avatar = self._tensor[start:end]
partitions.append(avatar)
start = end
end += num_per_partition
if i >= number - partitions_with_extra - 1:
end += 1
return partitions
def _immediate_do(self):
self._do()