-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathVirtualMultiArray.h
765 lines (644 loc) · 25.9 KB
/
VirtualMultiArray.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
/*
* VirtualMultiArray.h
*
* Created on: Feb 1, 2021
* Author: tugrul
*/
/*
* A virtual array that uses graphics cards as backing store, optimized with an N-way LRU cache per physical graphics card
* Page: a chunk of data that is traded between RAM and VRAM to keep necessary content close to user
* Page Size: number of elements in a page / cache line size of LRU
* Memory Multiplier: VRAM usage ratio per graphics card, also the number of LRUs per graphics card, also the number of parallel data channels per card
* Every virtual gpu is produced from a physical gpu and is given 1 LRU cache, 1 OpenCL command queue, X amount of VRAM
* So that I/O can be overlapped between other cards or other virtual gpus of same physical gpu
* Active Page: a chunk of data that is on RAM within LRU cache (with a computed physical index)
* Frozen Page: a chunk of data in VRAM (with a computed virtual index)
* Number of Active Pages = number of cache lines for each LRU
*
*
* How to hide I/O latency: Using more threads than CPU logical cores when accessing elements with any method except uncached versions
* Currently only Linux support for the I/O latency hiding
* How to reduce average latency per element for sequential access: use bulk read/write (readOnlyGetN, writeOnlySetN,mappedReadWriteAccess)
* Also allocate more(and larger) active pages (cache lines) for higher amounts of threads accessing concurrently
* How to increase throughput for random-access: decrease page size (cache line size), increase number of active pages (cache lines)
* How to decrease latency for random-access: use getUncached/setUncached methods between streamStart/streamStop commands.
*
*/
#ifndef VIRTUALMULTIARRAY_H_
#define VIRTUALMULTIARRAY_H_
#include<limits>
#include<vector>
#include<memory>
#include<mutex>
#include <stdexcept>
#include <functional>
#include "FunctionRunner.h"
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
// windows
#include<memoryapi.h>
#define __restrict__ __restrict
#else
// linux
#include<sys/mman.h>
#endif
#include<thread>
#include"ClDevice.h"
#include"VirtualArray.h"
template<typename T>
class VirtualMultiArray
{
public:
enum MemMult {
// equal data distribution on all gpu instances
// also uses user-defined values when memMult parameter is not empty
UseDefault=0,
// distribution ratios are equal to vram sizes of cards, (uses integer GB values, 2GB = 2, 1.9 GB = 1, 2.1GB = 2)
UseVramRatios=1,
// distribution ratio that reflects pci-e specs of cards to maximize bandwidth
// not-implemented
UsePcieRatios=2
};
VirtualMultiArray():numDevice(0),pageSize(0),va(nullptr),pageLock(nullptr){};
// creates virtual array on a list of devices
// size: number of array elements (needs to be integer-multiple of pageSize)
// device: list of (physical) graphics cards that are used to generate multiple virtual cards to overlap multiple operations(device to host, host-to-device data copies) on them
// pageSizeP: number of elements per page (should not be too big for pinned-array limitations, should not be too small for pcie bandwidth efficiency)
// numActivePage: number of RAM-backed pages per (virtual) graphics card
// memMult: 1) "number of virtual graphics cards per physical card". {1,2,3} means first card will be just physical, second card will be dual v-cards, last one becomes 3 v-cards
// 2) "ratio of VRAM usage per physical card". {1,2,3} means a 6GB array will be distributed as 1GB,2GB,3GB on 3 cards
// 3) "average multithreaded usage limit per physical card". {1,2,3} means no data-copy-overlap on first card, 2 copies in-flight on second card, 3 copies concurrently on 3rd card
// 4) to disable a card, give it 0. {1,0,5} means first card is physical, second card is not used, third card will have intense pcie activity and VRAM consumption
// 5) every value in vector means extra RAM usage.
// 6) "number of LRU cache per physical card" ==> each virtual card caches every k-th page in interleaved form with other virtual cards
// for example: {1,1,1} with numActivePage=5 means pages are cached in this order: (c=cache) c1 c2 c3 c1 c2 c3 c1 c2 c3 c1 c2 c3 c1 c2 c3
// | | | | |
// c1: LRU cache that serves pages at index%3 = 0
// c2: LRU cache at index%3 = 1
// c3: LRU cache at index%3 = 2
// default: {4,4,...,4} all cards are given 4 data channels so total RAM usage becomes this: { nGpu * 4 * pageSize * sizeof(T) * numActivePage }
// which also means 4 independent LRU caches per physical card: c1_1 c2_1 c3_1 c1_2 c2_2 c3_2 .... c1_4 c2_4 c3_4 c1_1 c2_1 ...
// mem:
// UseDefault = uses user-input values from "memMult" parameter,
// UseVramRatios=allocates from gpus in tune with their vram sizes to maximize array capacity,
// UsePcieRatios(not implemented)=allocation ratio for pcie specs to maximize bandwidth
// usePinnedArraysOnly: pins all active-page buffers to stop OS paging them in/out while doing gpu copies (pageable buffers are slower but need less *resources*)
// useLRUdebugging: true=uses a LRU algorithm that keeps cache hit/miss information for query (performance difference is negligible)
// to query hit ratio, call getTotalCacheHitRatio() and other related methods
VirtualMultiArray(size_t size, std::vector<ClDevice> device, size_t pageSizeP=1024, int numActivePage=50,
std::vector<int> memMult=std::vector<int>(), MemMult mem=MemMult::UseDefault, const bool usePinnedArraysOnly=true,
const bool useLRUdebugging=false){
int numPhysicalCard = device.size();
int nDevice = 0;
std::vector<int> gpuCloneMult;
if(MemMult::UseDefault==mem)
{
if(memMult.size()==0)
{
// auto distribution rate (all graphics cards take equal data)
for(int i=0;i<numPhysicalCard;i++)
{
gpuCloneMult.push_back(4); // every card becomes 4 virtual cards (for overlapping data transfers of opposite directions, performance)
nDevice+=4;
}
}
else
{
// user-defined memory usage per card, by multiplier-per-card
for(int i=0;i<numPhysicalCard;i++)
{
if(i<memMult.size())
{
gpuCloneMult.push_back(memMult[i]); // every card becomes 4*n virtual cards (for overlapping data transfers of opposite directions, performance)
nDevice += memMult[i];
}
else
{
gpuCloneMult.push_back(4); // every card becomes 4*n virtual cards (for overlapping data transfers of opposite directions, performance)
nDevice += 4;
}
}
}
}
else if (MemMult::UseVramRatios==mem)
{
if(memMult.size()==0)
{
for(int i=0;i<numPhysicalCard;i++)
{
int vram = device[i].vramSize();
gpuCloneMult.push_back(vram);
nDevice+=vram;
}
}
else
{
// user-defined memory usage per card, only taking 0 as disabler for a card
for(int i=0;i<numPhysicalCard;i++)
{
int vram = device[i].vramSize();
// user-unspecified gpus are assumed non-zero and over-specified elements are ignored
gpuCloneMult.push_back((memMult[i]>0)?vram:0);
nDevice += ((memMult[i]>0)?vram:0);
}
}
}
else
{
throw std::invalid_argument("Not implemented: MemMult::UsePcieRatios");
}
if((size/pageSizeP)*pageSizeP !=size)
{
throw std::invalid_argument(std::string("Error :number of elements(")+std::to_string(size)+std::string(") need to be integer-multiple of page size(")+std::to_string(pageSizeP)+std::string(")."));
}
if(nDevice>size/pageSizeP)
{
auto info = std::string("Error :number of pages(")+std::to_string(size/pageSizeP)+std::string(") must be equal to or greater than number of virtual gpu instances(")+std::to_string(nDevice)+std::string(").");
auto debug3 = std::string("Total devices: ")+std::to_string(nDevice)+std::string("\r\n");
auto debug4 = std::string("Total pages: ")+std::to_string(size/pageSizeP)+std::string("\r\n");
throw std::invalid_argument(debug3+debug4+info);
}
numDevice=nDevice;
pageSize=pageSizeP;
va = std::shared_ptr<VirtualArray<T>>( new VirtualArray<T>[numDevice],[&](VirtualArray<T> * ptr){
delete [] ptr;
});
pageLock = std::shared_ptr<LMutex>(new LMutex[numDevice],[](LMutex * ptr){delete [] ptr;});
size_t numPage = size/pageSize;
size_t numInterleave = (numPage/nDevice) + 1;
size_t extraAllocDeviceIndex = numPage%nDevice; // 0: all equal, 1: first device extra allocation, 2: second device, ...
openclChannels = gpuCloneMult;
{
int total = 0;
for(const auto& e:openclChannels)
{
total+=e;
}
if(total * numActivePage > size/pageSizeP)
{
auto info = std::string("Error: total number of active pages (")+std::to_string(total * numActivePage)+std::string(")")+
std::string(" required to be less than or equal to total pages(")+std::to_string(size/pageSizeP)+std::string(")");
auto debug1 = std::string("Active page per virtual gpu: ")+std::to_string(numActivePage)+std::string("\r\n");
auto debug2 = std::string("Virtual gpus: ")+std::to_string(total)+std::string("\r\n");
auto debug3 = std::string("Total active pages: ")+std::to_string(total * numActivePage )+std::string("\r\n");
auto debug4 = std::string("Total pages: ")+std::to_string(size/pageSizeP)+std::string("\r\n");
throw std::invalid_argument(debug1+debug2+debug3+debug4+info);
}
}
int ctr = 0;
std::vector<int> actuallyUsedPhysicalGpuIndex;
actuallyUsedPhysicalGpuIndex.resize(numPhysicalCard);
int ctrPhysicalCard =0;
for(int i=0;i<numPhysicalCard;i++)
{
if(gpuCloneMult[i]>0)
{
actuallyUsedPhysicalGpuIndex[i]=ctr;
va.get()[ctr]=VirtualArray<T>( ((extraAllocDeviceIndex>=ctr)?numInterleave:(numInterleave-1)) * pageSize,device[i],pageSize,numActivePage,usePinnedArraysOnly,useLRUdebugging);
ctr++;
gpuCloneMult[i]--;
ctrPhysicalCard++;
}
}
bool work = true;
while(work)
{
ctrPhysicalCard = 0;
work=false;
for(int i=0;i<numPhysicalCard;i++)
{
if(gpuCloneMult[i]>0)
{
int index = actuallyUsedPhysicalGpuIndex[i];
va.get()[ctr]=VirtualArray<T>( ((extraAllocDeviceIndex>= ctr)?numInterleave:(numInterleave-1)) * pageSize,va.get()[index].getContext(),device[i],pageSize,numActivePage,usePinnedArraysOnly,useLRUdebugging);
ctr++;
gpuCloneMult[i]--;
ctrPhysicalCard++;
work=true;
}
}
}
funcRun = std::make_shared<Prefetcher<VirtualMultiArray<T>>>(*this);
}
int totalGpuChannels()
{
int result=0;
for(int e:openclChannels)
{
result += e;
}
return result;
}
// get data at index
// index: minimum value=0, maximum value=size-1 but not checked for overflowing/underflowing
T get(const size_t & index) const{
const size_t selectedPage = index/pageSize;
const size_t numInterleave = selectedPage/numDevice;
const size_t selectedVirtualArray = selectedPage%numDevice;
const size_t selectedElement = numInterleave*pageSize + (index%pageSize);
std::unique_lock<std::mutex> lock(pageLock.get()[selectedVirtualArray].m);
return va.get()[selectedVirtualArray].get(selectedElement);
}
// it loads the data page that holds the element at "index" from video-memory into LRU cache (or updates its position in LRU)
void prefetch(const size_t & index) const
{
funcRun->push(index);
}
// return average cache hit ratio of all LRUs of array
double getTotalCacheHitRatio()
{
double result = 0.0;
for(int i=0;i<numDevice;i++)
{
std::unique_lock<std::mutex> lock(pageLock.get()[i].m);
result += va.get()[i].getCacheHitRatio();
}
return result/numDevice;
}
// reset cache hit ratios
void resetTotalCacheHitRatio() const
{
for(int i=0;i<numDevice;i++)
{
std::unique_lock<std::mutex> lock(pageLock.get()[i].m);
va.get()[i].resetCacheHitRatio();
}
}
// put data to index
// index: minimum value=0, maximum value=size-1 but not checked for overflowing/underflowing
void set(const size_t & index, const T & val) const{
const size_t selectedPage = index/pageSize;
const size_t numInterleave = selectedPage/numDevice;
const size_t selectedVirtualArray = selectedPage%numDevice;
const size_t selectedElement = numInterleave*pageSize + (index%pageSize);
std::unique_lock<std::mutex> lock(pageLock.get()[selectedVirtualArray].m);
va.get()[selectedVirtualArray].set(selectedElement,val);
}
// read N values starting at index
// do not use this concurrently with writes if you need all N elements be in same sync
// because this doesn't guarantee atomicity between all N elements
// otherwise there is no harm in using writes and this concurrently
// safe to use with other reads
// also safe to use with writes as long as user explicitly orders operations with thread-safe pattern for N>pageSize
std::vector<T> readOnlyGetN(const size_t & index, const size_t & n) const
{
std::vector<T> result;
const size_t selectedPage = index/pageSize;
const size_t numInterleave = selectedPage/numDevice;
const size_t selectedVirtualArray = selectedPage%numDevice;
const size_t modIdx = index%pageSize;
const size_t selectedElement = numInterleave*pageSize + modIdx;
if(modIdx + n - 1 < pageSize)
{
// full read possible
std::unique_lock<std::mutex> lock(pageLock.get()[selectedVirtualArray].m);
return va.get()[selectedVirtualArray].getN(selectedElement,n);
}
else
{
size_t nToRead = n;
size_t maxThisPage = pageSize - modIdx;
size_t toBeCopied = ((nToRead<maxThisPage)? nToRead: maxThisPage);
// read this page
{
std::unique_lock<std::mutex> lock(pageLock.get()[selectedVirtualArray].m);
std::vector<T> part = va.get()[selectedVirtualArray].getN(selectedElement,toBeCopied);
std::move(part.begin(),part.end(),std::back_inserter(result));
nToRead -= toBeCopied;
}
// repeat for other pages recursively
if(nToRead>0)
{
std::vector<T> part = readOnlyGetN(index+toBeCopied, nToRead);
std::move(part.begin(),part.end(),std::back_inserter(result));
}
return result;
}
}
// write N values starting at index
// do not use this concurrently with reads if you need all N elements be in same sync
// because this doesn't guarantee atomicity between all N elements
// otherwise there is no harm(except for consistency) in using reads/writes and this concurrently as long as thread-safety is given by user
// there is only page-level thread-safety here (automatically elements within page too) so its not N-level thread-safe when N>pageSize
// val: element values to write
// valIndex: index of first element (in val) to copy
// nVal: number of elements to copy starting at index valIndex
void writeOnlySetN(const size_t & index, const std::vector<T> & val, const size_t & valIndex=0, const size_t & nVal=(size_t)-1) const
{
const size_t n = ((nVal==(size_t)-1)?val.size():nVal);
const size_t selectedPage = index/pageSize;
const size_t numInterleave = selectedPage/numDevice;
const size_t selectedVirtualArray = selectedPage%numDevice;
const size_t modIdx = index%pageSize;
const size_t selectedElement = numInterleave*pageSize + modIdx;
if(modIdx + n - 1 < pageSize)
{
// full write possible
std::unique_lock<std::mutex> lock(pageLock.get()[selectedVirtualArray].m);
va.get()[selectedVirtualArray].setN(selectedElement,val,valIndex,n);
}
else
{
size_t nToWrite = n;
size_t maxThisPage = pageSize - modIdx;
size_t toBeCopied = ((nToWrite<maxThisPage)? nToWrite: maxThisPage);
// write this page
{
std::unique_lock<std::mutex> lock(pageLock.get()[selectedVirtualArray].m);
va.get()[selectedVirtualArray].setN(selectedElement,val,valIndex, toBeCopied);
nToWrite -= toBeCopied;
}
// repeat for other pages recursively
if(nToWrite>0)
{
writeOnlySetN(index+toBeCopied, val, valIndex+toBeCopied, nToWrite);
}
}
}
// gpu --> aligned buffer ---> user function ---> gpu (user needs to take care of thread-safety of whole mapped region)
// index: start element of buffer (which is 4096-aligned)
// range: size of aligned buffer with data from virtual array
// f: user-defined (lambda or another)function to run for processing buffer
// the pointer given to user-function is not zero-based (but an offseted version of it to match same index access from outside)
// pinBuffer: true=pins buffer to stop OS paging it/probably faster data copies, false=no pinning / probably less latency to start function
// read: true=latest data from virtual array is read into buffer (+latency). default = true
// write: true=latest data from aligned buffer is written to virtual array (+latency). default = true
// userPtr: when user needs to evade allocation/free latency on each mapping call, this parameter can be used
// if userPtr == nullptr (default), internal allocation is done
// if userPtr != nullptr, userPtr is used as internal data copies and is not same address value given to the user function
// array.mappedReadWriteAccess(...[&](T * ptr){ ..compute using ptr[some_index] but not myRawPointer[some_index].. },myRawPointer);
// userPtr needs to be valid between (T *) index and (T *) index + range
void mappedReadWriteAccess(const size_t index, const size_t range,
std::function<void(T * const)> f,
const bool pinBuffer=false,
const bool read=true,
const bool write=true,
T * const userPtr=nullptr) const
{
struct TempMem
{
TempMem():buf(nullptr){ }
TempMem(T * __restrict__ const mem):buf(mem){}
T * __restrict__ const buf;
};
std::unique_ptr<T,void(*)(void *)> arr(nullptr,free);
if(nullptr == userPtr)
{
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
// windows
arr = std::unique_ptr<T,void(*)(void *)>((T *)_aligned_malloc(/* (index*sizeof(T))%4096*/ sizeof(T)*range,4096),_aligned_free);
#else
// linux
arr = std::unique_ptr<T,void(*)(void *)>((T *)aligned_alloc(/* (index*sizeof(T))%4096*/ 4096,sizeof(T)*range),free);
#endif
}
// temporarily allocate aligned buffer (for SIMD operations, faster copies to some devices)
const TempMem mem((nullptr == userPtr)?(arr.get()):userPtr);
// lock the buffer so that it will not be paged out by OS during function
if(pinBuffer)
{
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
// windows
if(0==VirtualLock(mem.buf,range))
{
throw std::invalid_argument("Error: memory pinning failed.");
}
#else
// linux
if(ENOMEM==mlock(mem.buf,range))
{
throw std::invalid_argument("Error: memory pinning failed.");
}
#endif
}
// get data from gpu
if(read)
{
size_t indexStartPage = index / pageSize;
size_t indexEndPage = (index+range)/pageSize;
size_t currentIndex = index;
size_t remainingRange = range;
size_t remainingPageElm = pageSize;
size_t currentRange = 0;
size_t currentBufElm = 0;
for(size_t selectedPage = indexStartPage; selectedPage<=indexEndPage; selectedPage++)
{
remainingPageElm = pageSize - (currentIndex % pageSize);
currentRange = ((remainingRange<remainingPageElm)? remainingRange: remainingPageElm);
const size_t selectedVirtualArray = selectedPage%numDevice;
const size_t numInterleave = selectedPage/numDevice;
const size_t selectedElement = numInterleave*pageSize + (currentIndex%pageSize);
if(currentRange>0)
{
std::unique_lock<std::mutex> lock(pageLock.get()[selectedVirtualArray].m);
va.get()[selectedVirtualArray].copyToBuffer(selectedElement, currentRange, mem.buf+currentBufElm);
}
else
{
break;
}
currentBufElm += currentRange;
currentIndex += currentRange;
remainingRange -= currentRange;
}
}
// execute function
// -index offset matches origins of buffer and virtual array such that buf[i] = va[i] for all elements
f(mem.buf-index);
// get data to gpu
if(write)
{
size_t indexStartPage = index / pageSize;
size_t indexEndPage = (index+range)/pageSize;
size_t currentIndex = index;
size_t remainingRange = range;
size_t remainingPageElm = pageSize;
size_t currentRange = 0;
size_t currentBufElm = 0;
for(size_t selectedPage = indexStartPage; selectedPage<=indexEndPage; selectedPage++)
{
remainingPageElm = pageSize - (currentIndex % pageSize);
currentRange = ((remainingRange<remainingPageElm)? remainingRange: remainingPageElm);
const size_t selectedVirtualArray = selectedPage%numDevice;
const size_t numInterleave = selectedPage/numDevice;
const size_t selectedElement = numInterleave*pageSize + (currentIndex%pageSize);
if(currentRange>0)
{
std::unique_lock<std::mutex> lock(pageLock.get()[selectedVirtualArray].m);
va.get()[selectedVirtualArray].copyFromBuffer(selectedElement, currentRange, mem.buf+currentBufElm);
}
else
{
break;
}
currentBufElm += currentRange;
currentIndex += currentRange;
remainingRange -= currentRange;
}
}
// unlock pinning
if(pinBuffer)
{
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
// windows
VirtualUnlock(mem.buf,range);
#else
// linux
munlock(mem.buf,range);
#endif
}
}
// get data directly from vram, bypassing LRU cache
// if streamStart() was not called before uncached stream-read commands, then uncached data will not be guaranteed to be updated
// not thread-safe for overlapping regions
T getUncached(const size_t index) const
{
const size_t selectedPage = index/pageSize;
const size_t numInterleave = selectedPage/numDevice;
const size_t selectedVirtualArray = selectedPage%numDevice;
const size_t selectedElement = numInterleave*pageSize + (index%pageSize);
std::unique_lock<std::mutex> lock(pageLock.get()[selectedVirtualArray].m);
return va.get()[selectedVirtualArray].getUncached(selectedElement);
}
// set data directly to vram, bypassing LRU cache
// if streamStop() was not called after uncached stream-write commands, then cached data after this will not be guaranteed to be updated
// not thread-safe for overlapping regions
void setUncached(const size_t index, const T& val ) const
{
const size_t selectedPage = index/pageSize;
const size_t numInterleave = selectedPage/numDevice;
const size_t selectedVirtualArray = selectedPage%numDevice;
const size_t selectedElement = numInterleave*pageSize + (index%pageSize);
std::unique_lock<std::mutex> lock(pageLock.get()[selectedVirtualArray].m);
va.get()[selectedVirtualArray].setUncached(selectedElement,val);
}
// writes all edited active pages to vram
// resets all active pages
// use this before a series of uncached reads/writes
void streamStart()
{
std::vector<std::thread> parallel;
for(int i=0;i<numDevice;i++)
{
parallel.push_back(std::thread([&,i]()
{
std::unique_lock<std::mutex> lock(pageLock.get()[i].m);
size_t nump = va.get()[i].getNumP();
for(size_t pg = 0;pg<nump;pg++)
{
va.get()[i].flushPage(pg);
}
}));
}
for(int i=0;i<numDevice;i++)
{
if(parallel[i].joinable())
{
parallel[i].join();
}
}
}
// reloads all edited frozen pages from vram to active pages (LRU cache)
// resets all active pages
// overwrites any cached writes happened before
// use this after a series of uncached reads/writes
void streamStop()
{
std::vector<std::thread> parallel;
for(int i=0;i<numDevice;i++)
{
parallel.push_back(std::thread([&,i]()
{
std::unique_lock<std::mutex> lock(pageLock.get()[i].m);
size_t nump = va.get()[i].getNumP();
for(size_t pg = 0;pg<nump;pg++)
{
va.get()[i].reloadPage(pg);
}
}));
}
for(int i=0;i<numDevice;i++)
{
if(parallel[i].joinable())
{
parallel[i].join();
}
}
}
// using gpu compute power, finds an element with given member value, returns its index
// obj: object instance
// member: member of object to be used for searching value
// indexListMaxSize: maximum number of indices found
template<typename S>
std::vector<size_t> find(T & obj, S & member, const int indexListMaxSize=1)
{
//throw std::invalid_argument("Error: find method not implemented.");
size_t adrObj = (size_t) &obj;
size_t adrMem = (size_t) &member;
size_t offset = adrMem - adrObj; // how many bytes the member is found after
std::vector<size_t> results;
// 1) flush all active pages, expect user not to touch any element during search
// 2) run search on all gpus on all of their data
// 3) return index
std::vector<std::thread> parallel;
std::mutex mGlobal;
for(int i=0;i<numDevice;i++)
{
parallel.push_back(std::thread([&,i]()
{
std::unique_lock<std::mutex> lock(pageLock.get()[i].m);
size_t nump = va.get()[i].getNumP();
for(size_t pg = 0;pg<nump;pg++)
{
va.get()[i].flushPage(pg);
}
std::vector<size_t> resultI = va.get()[i].find(offset,member,i,indexListMaxSize);
{
const int szResultI = resultI.size();
for(size_t k = 0; k<szResultI; k++)
{
size_t gpuPage = (resultI[k]/pageSize);
size_t realPage = (gpuPage * numDevice) + i;
size_t realIndex = (realPage * pageSize) + (resultI[k]%pageSize);
resultI[k]=realIndex;
}
if(szResultI>0)
{
std::unique_lock<std::mutex> lock(mGlobal);
std::move(resultI.begin(),resultI.end(),std::back_inserter(results));
}
}
}));
}
for(int i=0;i<numDevice;i++)
{
if(parallel[i].joinable())
{
parallel[i].join();
}
}
return results;
}
class SetterGetter
{
public:
SetterGetter():ptr(nullptr),idx(0){}
SetterGetter(size_t i, VirtualMultiArray<T> * p):idx(i),ptr(p){}
void operator = (T val) { ptr->set(idx,val); }
operator T(){ return ptr->get(idx); }
void operator = (SetterGetter sg){ ptr->set(idx,sg.ptr->get(sg.idx)); }
private:
size_t idx;
VirtualMultiArray<T> * ptr;
};
SetterGetter operator [](const size_t id) { return SetterGetter(id,this); }
T operator [](const size_t id) const { return get(id); }
~VirtualMultiArray(){}
private:
size_t numDevice;
size_t pageSize;
std::shared_ptr<VirtualArray<T>> va;
std::shared_ptr<LMutex> pageLock;
std::vector<int> openclChannels;
std::shared_ptr<Prefetcher<VirtualMultiArray<T>>> funcRun;
};
#endif /* VIRTUALMULTIARRAY_H_ */