forked from kaigai/pg_strom
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathopencl_hashjoin.h
More file actions
986 lines (916 loc) · 29.5 KB
/
opencl_hashjoin.h
File metadata and controls
986 lines (916 loc) · 29.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
/*
* opencl_hashjoin.h
*
* Parallel hash join accelerated by OpenCL device
* --
* Copyright 2011-2014 (C) KaiGai Kohei <kaigai@kaigai.gr.jp>
* Copyright 2014 (C) The PG-Strom Development Team
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*/
#ifndef OPENCL_HASHJOIN_H
#define OPENCL_HASHJOIN_H
/*
* Format of kernel hash table; to be prepared
*
* +--------------------+
* | kern_multihash |
* | +------------------+
* | | length | <--- total length of multiple hash-tables; that
* | +------------------+ also meand length to be send via DMA
* | | ntables (=M) | <--- number of hash-tables
* | +------------------+
* | | htbl_offset[0] o---> htbl_offset[0] is always NULL
* | | htbl_offset[1] o------+
* | | : | |
* | | : | |
* | | htbl_offset[M-1] | |
* +-+------------------+ |
* | : | |
* +--------------------+ |
* | kern_hashtable(0) | |
* | : | |
* +--------------------+ <--+
* | kern_hashtable(1) |
* | : |
* +--------------------+
* | : |
* +--------------------+
* | kern_hashtable(M-1)|
* | : |
* +--------------------+
* | region for each |
* | kern_hashentry |
* | items |
* | |
* | |
* +--------------------+
*
* +--------------------+
* | kern_hashtable |
* | +------------------+
* | | nslots (=N) |
* | +------------------+
* | | nkeys (=M) |
* | +------------------+
* | | colmeta[0] |
* | | colmeta[1] |
* | | : |
* | | colmeta[M-1] |
* | +------------------+
* | | hash_slot[0] |
* | | hash_slot[1] |
* | | : |
* | | hash_slot[N-2] o-------+ single directioned link
* | | hash_slot[N-1] | | from the hash_slot[]
* +-+------------------+ <---+
* | kern_hashentry |
* | +------------------+
* | | next o------------+ If multiple entries
* | +------------------+ | has same hash value,
* | | hash | | these are linked.
* | +------------------+ |
* | | rowidx | |
* | +------------------+ |
* | | matched | |
* | +------------------+ |
* | | keydata: | |
* | | nullmap[...] | |
* | | values[...] | |
* | | | |
* | | values are put | |
* | | next to nullmap | |
* +-+------------------+ <---+
* | kern_hashentry |
* | +------------------+
* | | next o-----------> NULL
* | +------------------+
* | | hash |
* | +------------------+
* | | : |
* | | : |
* +-+------------------+
*/
typedef struct
{
cl_uint next; /* offset of the next */
cl_uint hash; /* 32-bit hash value */
cl_uint rowid; /* identifier of inner rows */
cl_uint t_len; /* length of the tuple */
HeapTupleHeaderData htup; /* tuple of the inner relation */
} kern_hashentry;
typedef struct
{
cl_uint length; /* length of this hashtable chunk */
cl_uint ncols; /* number of inner relation's columns */
cl_uint nslots; /* width of hash slot */
cl_char is_outer; /* true, if outer join (not supported now) */
cl_char __padding__[3]; /* for 64bit alignment */
kern_colmeta colmeta[FLEXIBLE_ARRAY_MEMBER];
} kern_hashtable;
typedef struct
{
hostptr_t hostptr; /* address of this multihash on the host */
cl_uint pg_crc32_table[256];
/* MEMO: Originally, we put 'pg_crc32_table[]' as a static array
* deployed on __constant memory region, however, a particular
* OpenCL runtime had (has?) a problem on references to values
* on __constant memory. So, we moved the 'pg_crc32_table' into
* __global memory area as a workaround....
*/
cl_uint ntables; /* number of hash tables (= # of inner rels) */
cl_uint htable_offset[FLEXIBLE_ARRAY_MEMBER];
} kern_multihash;
#define KERN_HASHTABLE(kmhash, depth) \
((__global kern_hashtable *)((__global char *)(kmhash) + \
(kmhash)->htable_offset[(depth)]))
#define KERN_HASHTABLE_SLOT(khtable) \
((__global cl_uint *)((__global char *)(khtable)+ \
LONGALIGN(offsetof(kern_hashtable, \
colmeta[(khtable)->ncols]))))
#define KERN_HASHENTRY_SIZE(khentry) \
LONGALIGN(offsetof(kern_hashentry, htup) + (khentry)->t_len)
static inline __global kern_hashentry *
KERN_HASH_FIRST_ENTRY(__global kern_hashtable *khtable, cl_uint hash)
{
__global cl_uint *slot = KERN_HASHTABLE_SLOT(khtable);
cl_uint index = hash % khtable->nslots;
if (slot[index] == 0)
return NULL;
return (__global kern_hashentry *)((__global char *) khtable +
slot[index]);
}
static inline __global kern_hashentry *
KERN_HASH_NEXT_ENTRY(__global kern_hashtable *khtable,
__global kern_hashentry *khentry)
{
if (khentry->next == 0)
return NULL;
return (__global kern_hashentry *)((__global char *)khtable +
khentry->next);
}
/*
* Hash-Joining using GPU/MIC acceleration
*
* It packs a kern_parambuf and kern_resultbuf structure within a continuous
* memory ares, to transfer (usually) small chunk by one DMA call.
*
*
*
* +-+-----------------+ ---
* | kern_parambuf | ^
* | +-----------------+ | Region to be sent to the m_join device memory
* | | length | |
* | +-----------------+ |
* | | nparams | |
* | +-----------------+ |
* | | poffset[0] | |
* | | poffset[1] | |
* | | : | |
* | | poffset[M-1] | |
* | +-----------------+ |
* | | variable length | |
* | | fields for | |
* | | Param / Const | |
* | | : | |
* +-------------------+ -|----
* | kern_resultbuf | | ^
* |(only fixed fields)| | | Region to be written back from the device
* | +-----------------+ | | memory to the host-side
* | | nrels | | |
* | +-----------------+ | |
* | | nrooms | | |
* | +-----------------+ | |
* | | nitems | | |
* | +-----------------+ | |
* | | errcode | | |
* | +-----------------+ | |
* | | has_recheckes | | |
* | +-----------------+ | |
* | | __padding__[] | | V
* +-+-----------------+ ------
* | kern_row_map | ^ Region to be sent to the m_rowmap device memory,
* | +-----------------+ | on demand.
* | | nvalids | |
* | +-----------------+ |
* | | rindex[0] | |
* | | rindex[1] | |
* | | : | |
* | | rindex[N-1] | V
* +-+-----------------+ ---
*/
typedef struct
{
kern_parambuf kparams;
} kern_hashjoin;
#define KERN_HASHJOIN_PARAMBUF(khashjoin) \
((__global kern_parambuf *)(&(khashjoin)->kparams))
#define KERN_HASHJOIN_PARAMBUF_LENGTH(khashjoin) \
STROMALIGN(KERN_HASHJOIN_PARAMBUF(khashjoin)->length)
#define KERN_HASHJOIN_RESULTBUF(khashjoin) \
((__global kern_resultbuf *) \
((__global char *)KERN_HASHJOIN_PARAMBUF(khashjoin) + \
KERN_HASHJOIN_PARAMBUF_LENGTH(khashjoin)))
#define KERN_HASHJOIN_RESULTBUF_LENGTH(khashjoin) \
STROMALIGN(offsetof(kern_resultbuf, results[0]))
#define KERN_HASHJOIN_ROWMAP(khashjoin) \
((__global kern_row_map *) \
((__global char *)KERN_HASHJOIN_RESULTBUF(khashjoin) + \
KERN_HASHJOIN_RESULTBUF_LENGTH(khashjoin)))
#define KERN_HASHJOIN_ROWMAP_LENGTH(khashjoin) \
(KERN_HASHJOIN_ROWMAP(khashjoin)->nvalids < 0 ? \
STROMALIGN(offsetof(kern_row_map, rindex[0])) :\
STROMALIGN(offsetof(kern_row_map, \
rindex[KERN_HASHJOIN_ROWMAP(khashjoin)->nvalids])))
#define KERN_HASHJOIN_DMA_SENDPTR(khashjoin) \
KERN_HASHJOIN_PARAMBUF(khashjoin)
#define KERN_HASHJOIN_DMA_SENDOFS(khashjoin) 0UL
#define KERN_HASHJOIN_DMA_SENDLEN(khashjoin) \
((uintptr_t)KERN_HASHJOIN_ROWMAP(khashjoin) - \
(uintptr_t)KERN_HASHJOIN_PARAMBUF(khashjoin))
#define KERN_HASHJOIN_DMA_RECVPTR(khashjoin) \
KERN_HASHJOIN_RESULTBUF(khashjoin)
#define KERN_HASHJOIN_DMA_RECVOFS(khashjoin) \
KERN_HASHJOIN_PARAMBUF_LENGTH(khashjoin)
#define KERN_HASHJOIN_DMA_RECVLEN(khashjoin) \
KERN_HASHJOIN_RESULTBUF_LENGTH(khashjoin)
#ifdef OPENCL_DEVICE_CODE
/*
* gpuhashjoin_execute
*
* main routine of gpuhashjoin - it run hash-join logic on the supplied
* hash-tables and kds/ktoast pair, then stores its result on the "results"
* array. caller already acquires (n_matches * n_rels) slot from "results".
*/
static cl_uint
gpuhashjoin_execute(__private cl_int *errcode,
__global kern_parambuf *kparams,
__global kern_multihash *kmhash,
__local cl_uint *crc32_table,
__global kern_data_store *kds,
__global kern_data_store *ktoast,
size_t kds_index,
__global cl_int *rbuffer);
/*
* kern_gpuhashjoin_main
*
* entrypoint of kernel gpuhashjoin implementation. Its job can be roughly
* separated into two portions; the first one is to count expected number
* of matched items (that should be acquired on the kern_resultbuf), then
* the second job is to store the hashjoin result - for more correctness,
* it shall be done in gpuhashjoin_main automatically generated.
* In case when the result buffer does not have sufficient space, it
* returns StromError_DataStoreNoSpace to inform host system this hashjoin
* needs larger result buffer.
*/
__kernel void
kern_gpuhashjoin_main(__global kern_hashjoin *khashjoin,
__global kern_multihash *kmhash,
__global kern_data_store *kds,
__global kern_data_store *ktoast,
__global kern_row_map *krowmap,
KERN_DYNAMIC_LOCAL_WORKMEM_ARG)
{
__global kern_parambuf *kparams = KERN_HASHJOIN_PARAMBUF(khashjoin);
__global kern_resultbuf *kresults = KERN_HASHJOIN_RESULTBUF(khashjoin);
cl_int errcode = StromError_Success;
cl_uint n_matches;
cl_uint offset;
cl_uint nitems;
size_t kds_index;
size_t crc_index;
__local cl_uint base;
__local cl_uint crc32_table[256];
/* sanity check - kresults must have sufficient width of slots for the
* required hash-tables within kern_multihash.
*/
if (kresults->nrels != kmhash->ntables + 1)
{
errcode = StromError_DataStoreCorruption;
goto out;
}
/* move crc32 table to __local memory from __global memory.
*
* NOTE: calculation of hash value (based on crc32 in GpuHashJoin) is
* the core of calculation workload in the GpuHashJoin implementation.
* If we keep the master table is global memory, it will cause massive
* amount of computing core stall because of RAM access latency.
* So, we try to move them into local shared memory at the beginning.
*/
for (crc_index = get_local_id(0);
crc_index < 256;
crc_index += get_local_size(0))
{
crc32_table[crc_index] = kmhash->pg_crc32_table[crc_index];
}
barrier(CLK_LOCAL_MEM_FENCE);
/* In case when kern_row_map (krowmap) is given, it means all the items
* are not valid and some them have to be dealt as like invisible rows.
* krowmap is an array of valid row-index.
*/
if (!krowmap)
kds_index = get_global_id(0);
else if (get_global_id(0) < krowmap->nvalids)
kds_index = (size_t) krowmap->rindex[get_global_id(0)];
else
kds_index = kds->nitems; /* ensure this thread is out of range */
/* 1st-stage: At first, we walks on the hash tables to count number of
* expected number of matched hash entries towards the items being in
* the kern_data_store; to be aquired later for writing back the results.
* Also note that a thread not mapped on a particular valid item in kds
* can be simply assumed n_matches == 0.
*/
if (kds_index < kds->nitems)
n_matches = gpuhashjoin_execute(&errcode,
kparams,
kmhash,
crc32_table,
kds, ktoast,
kds_index,
NULL);
else
n_matches = 0;
/*
* XXX - calculate total number of matched tuples being searched
* by this workgroup
*/
offset = arithmetic_stairlike_add(n_matches, LOCAL_WORKMEM, &nitems);
/*
* XXX - allocation of result buffer. A tuple takes 2 * sizeof(cl_uint)
* to store pair of row-indexes.
* If no space any more, return an error code to retry later.
*
* use atomic_add(&kresults->nitems, nitems) to determine the position
* to write. If expected usage is larger than kresults->nrooms, it
* exceeds the limitation of result buffer.
*
* MEMO: we may need to re-define nrooms/nitems using 64bit variables
* to avoid overflow issues, but has special platform capability on
* 64bit atomic-write...
*/
if(get_local_id(0) == 0)
{
if (nitems > 0)
base = atomic_add(&kresults->nitems, nitems);
else
base = 0;
}
barrier(CLK_LOCAL_MEM_FENCE);
/* In case when (base + nitems) is larger than or equal to the nrooms,
* it means we don't have enough space to write back hash-join results
* to host-side. So, we have to tell the host code the provided
* kern_resultbuf didn't have enough space.
*/
if (base + nitems > kresults->nrooms)
{
errcode = StromError_DataStoreNoSpace;
goto out;
}
/*
* 2nd-stage: we already know how many items shall be generated on
* this hash-join. So, all we need to do is to invoke auto-generated
* hash-joining function with a certain address on the result-buffer.
*/
if (n_matches > 0 && kds_index < kds->nitems)
{
__global cl_int *rbuffer
= kresults->results + kresults->nrels * (base + offset);
n_matches = gpuhashjoin_execute(&errcode,
kparams,
kmhash,
crc32_table,
kds, ktoast,
kds_index,
rbuffer);
}
out:
/* write-back execution status into host-side */
kern_writeback_error_status(&kresults->errcode, errcode, LOCAL_WORKMEM);
}
/*
* kern_gpuhashjoin_projection_xxx
*
*
*
*/
static void
gpuhashjoin_projection_mapping(cl_int dest_colidx,
__private cl_uint *src_depth,
__private cl_uint *src_colidx);
static void
gpuhashjoin_projection_datum(__private cl_int *errcode,
__global Datum *slot_values,
__global cl_char *slot_isnull,
cl_int depth,
cl_int colidx,
hostptr_t hostaddr,
__global void *datum);
__kernel void
kern_gpuhashjoin_projection_row(__global kern_hashjoin *khashjoin, /* in */
__global kern_multihash *kmhash, /* in */
__global kern_data_store *kds, /* in */
__global kern_data_store *ktoast, /* in */
__global kern_data_store *kds_dest, /* out */
KERN_DYNAMIC_LOCAL_WORKMEM_ARG)
{
__global kern_parambuf *kparams = KERN_HASHJOIN_PARAMBUF(khashjoin);
__global kern_resultbuf *kresults = KERN_HASHJOIN_RESULTBUF(khashjoin);
__global cl_int *rbuffer;
__global void *datum;
cl_uint nrels = kresults->nrels;
cl_bool heap_hasnull = false;
cl_uint t_hoff;
cl_uint required;
cl_uint offset;
cl_uint total_len;
cl_uint usage_head;
cl_uint usage_tail;
__local cl_uint usage_prev;
cl_int errcode = StromError_Success;
/* Case of overflow; it shall be retried or executed by CPU instead,
* so no projection is needed anyway. We quickly exit the kernel.
* No need to set an error code because kern_gpuhashjoin_main()
* should already set it.
*/
if (kresults->nitems > kresults->nrooms ||
kresults->nitems > kds_dest->nrooms)
{
STROM_SET_ERROR(&errcode, StromError_DataStoreNoSpace);
goto out;
}
/* update nitems of kds_dest. note that get_global_id(0) is not always
* called earlier than other thread. So, we should not expect nitems
* of kds_dest is initialized.
*/
if (get_global_id(0) == 0)
kds_dest->nitems = kresults->nitems;
/* Ensure format of the kern_data_store (source/destination) */
if ((kds->format != KDS_FORMAT_ROW &&
kds->format != KDS_FORMAT_ROW_FLAT) ||
kds_dest->format != KDS_FORMAT_ROW_FLAT)
{
STROM_SET_ERROR(&errcode, StromError_DataStoreCorruption);
goto out;
}
/* combination of rows in this join */
rbuffer = kresults->results + nrels * get_global_id(0);
/*
* Step.1 - compute length of the joined tuple
*/
if (get_global_id(0) < kresults->nitems)
{
cl_uint i, ncols = kds_dest->ncols;
cl_uint datalen = 0;
for (i=0; i < ncols; i++)
{
kern_colmeta cmeta = kds_dest->colmeta[i];
cl_uint depth;
cl_uint colidx;
/* ask auto generated code */
gpuhashjoin_projection_mapping(i, &depth, &colidx);
if (depth == 0)
datum = kern_get_datum(kds, ktoast, colidx, rbuffer[0] - 1);
else if (depth < nrels)
{
__global kern_hashtable *khtable;
__global kern_hashentry *kentry;
khtable = KERN_HASHTABLE(kmhash, depth);
kentry = (__global kern_hashentry *)
((__global char *)khtable + rbuffer[depth]);
datum = kern_get_datum_tuple(khtable->colmeta,
&kentry->htup,
colidx);
}
else
datum = NULL;
if (!datum)
heap_hasnull = true;
else
{
/* att_align_datum */
if (cmeta.attlen > 0 || !VARATT_IS_1B(datum))
datalen = TYPEALIGN(cmeta.attalign, datalen);
/* att_addlength_datum */
if (cmeta.attlen > 0)
datalen += cmeta.attlen;
else
datalen += VARSIZE_ANY(datum);
}
}
required = offsetof(HeapTupleHeaderData, t_bits);
if (heap_hasnull)
required += bitmaplen(ncols);
if (kds->tdhasoid)
required += sizeof(cl_uint);
t_hoff = required = MAXALIGN(required);
required += MAXALIGN(datalen);
}
else
required = 0;
/*
* Step.2 - takes advance usage counter of kds_dest->usage
*/
offset = arithmetic_stairlike_add(required, LOCAL_WORKMEM, &total_len);
if (get_local_id(0) == 0)
{
if (total_len > 0)
usage_prev = atomic_add(&kds_dest->usage, total_len);
else
usage_prev = 0;
}
barrier(CLK_LOCAL_MEM_FENCE);
/* Check expected usage of the buffer */
usage_head = (STROMALIGN(offsetof(kern_data_store,
colmeta[kds_dest->ncols])) +
STROMALIGN(sizeof(kern_blkitem) * kds_dest->maxblocks) +
STROMALIGN(sizeof(kern_rowitem) * kresults->nitems));
if (usage_head + usage_prev + total_len > kds_dest->length)
{
errcode = StromError_DataStoreNoSpace;
goto out;
}
/*
* Step.3 - construction of a heap-tuple
*/
if (required > 0)
{
__global HeapTupleHeaderData *htup;
__global kern_rowitem *ritem;
cl_uint htup_offset;
cl_uint i, ncols = kds_dest->ncols;
cl_uint curr;
/* put index of heap-tuple */
htup_offset = kds_dest->length - (usage_prev + offset + required);
ritem = KERN_DATA_STORE_ROWITEM(kds_dest, get_global_id(0));
ritem->htup_offset = htup_offset;
/* build a heap-tuple */
htup = (__global HeapTupleHeaderData *)
((__global char *)kds_dest + htup_offset);
SET_VARSIZE(&htup->t_choice.t_datum, required);
htup->t_choice.t_datum.datum_typmod = kds_dest->tdtypmod;
htup->t_choice.t_datum.datum_typeid = kds_dest->tdtypeid;
htup->t_ctid.ip_blkid.bi_hi = 0;
htup->t_ctid.ip_blkid.bi_lo = 0;
htup->t_ctid.ip_posid = 0;
htup->t_infomask2 = (ncols & HEAP_NATTS_MASK);
htup->t_infomask = (heap_hasnull ? HEAP_HASNULL : 0);
htup->t_hoff = t_hoff;
curr = t_hoff;
for (i=0; i < ncols; i++)
{
kern_colmeta cmeta = kds_dest->colmeta[i];
cl_uint depth;
cl_uint colidx;
/* ask auto generated code again */
gpuhashjoin_projection_mapping(i, &depth, &colidx);
if (depth == 0)
datum = kern_get_datum(kds, ktoast, colidx, rbuffer[0] - 1);
else
{
__global kern_hashtable *khtable;
__global kern_hashentry *kentry;
khtable = KERN_HASHTABLE(kmhash, depth);
kentry = (__global kern_hashentry *)
((__global char *)khtable + rbuffer[depth]);
datum = kern_get_datum_tuple(khtable->colmeta,
&kentry->htup,
colidx);
}
/* put datum on the destination kds */
if (!datum)
htup->t_bits[i >> 3] &= ~(1 << (i & 0x07));
else
{
if (cmeta.attlen > 0)
{
__global char *dest;
while (TYPEALIGN(cmeta.attalign, curr) != curr)
((__global char *)htup)[curr++] = 0;
dest = (__global char *)htup + curr;
switch (cmeta.attlen)
{
case sizeof(cl_char):
*((__global cl_char *) dest)
= *((__global cl_char *) datum);
break;
case sizeof(cl_short):
*((__global cl_short *) dest)
= *((__global cl_short *) datum);
break;
case sizeof(cl_int):
*((__global cl_int *) dest)
= *((__global cl_int *) datum);
break;
case sizeof(cl_long):
*((__global cl_long *) dest)
= *((__global cl_long *) datum);
break;
default:
memcpy(dest, datum, cmeta.attlen);
break;
}
curr += cmeta.attlen;
}
else
{
cl_uint vl_len = VARSIZE_ANY(datum);
/* put 0 and align here, if not a short varlena */
if (!VARATT_IS_1B(datum))
{
while (TYPEALIGN(cmeta.attalign, curr) != curr)
((__global char *)htup)[curr++] = 0;
}
memcpy((__global char *)htup + curr, datum, vl_len);
curr += vl_len;
}
if (heap_hasnull)
htup->t_bits[i >> 3] |= (1 << (i & 0x07));
}
}
}
out:
/* write-back execution status into host-side */
kern_writeback_error_status(&kresults->errcode, errcode, LOCAL_WORKMEM);
}
__kernel void
kern_gpuhashjoin_projection_slot(__global kern_hashjoin *khashjoin, /* in */
__global kern_multihash *kmhash, /* in */
__global kern_data_store *kds, /* in */
__global kern_data_store *ktoast, /* in */
__global kern_data_store *kds_dest, /* out */
KERN_DYNAMIC_LOCAL_WORKMEM_ARG)
{
__global kern_parambuf *kparams = KERN_HASHJOIN_PARAMBUF(khashjoin);
__global kern_resultbuf *kresults = KERN_HASHJOIN_RESULTBUF(khashjoin);
__global cl_int *rbuffer;
__global Datum *slot_values;
__global cl_char *slot_isnull;
cl_int nrels = kresults->nrels;
cl_int depth;
cl_int errcode = StromError_Success;
/* Case of overflow; it shall be retried or executed by CPU instead,
* so no projection is needed anyway. We quickly exit the kernel.
* No need to set an error code because kern_gpuhashjoin_main()
* should already set it.
*/
if (kresults->nitems > kresults->nrooms ||
kresults->nitems > kds_dest->nrooms)
{
STROM_SET_ERROR(&errcode, StromError_DataStoreNoSpace);
goto out;
}
/* Update the nitems of kds_dest */
if (get_global_id(0) == 0)
kds_dest->nitems = kresults->nitems;
/* Do projection if thread is responsible */
if (get_global_id(0) >= kresults->nitems)
goto out;
/* Ensure format of the kern_data_store */
if ((kds->format != KDS_FORMAT_ROW &&
kds->format != KDS_FORMAT_ROW_FLAT) ||
kds_dest->format != KDS_FORMAT_TUPSLOT)
{
STROM_SET_ERROR(&errcode, StromError_DataStoreCorruption);
goto out;
}
/* Extract each tuple and projection */
rbuffer = kresults->results + nrels * get_global_id(0);
slot_values = KERN_DATA_STORE_VALUES(kds_dest, get_global_id(0));
slot_isnull = KERN_DATA_STORE_ISNULL(kds_dest, get_global_id(0));
for (depth=0; depth < nrels; depth++)
{
__global HeapTupleHeaderData *htup;
__global kern_colmeta *p_colmeta;
__global void *datum;
__global char *baseaddr;
hostptr_t hostaddr;
cl_uint i, ncols;
cl_uint offset;
cl_uint nattrs;
cl_bool heap_hasnull;
if (depth == 0)
{
ncols = kds->ncols;
p_colmeta = kds->colmeta;
if (kds->format == KDS_FORMAT_ROW)
{
__global kern_blkitem *bitem;
cl_uint blkidx;
htup = kern_get_tuple_rs(kds, rbuffer[0] - 1, &blkidx);
baseaddr = (__global char *)
KERN_DATA_STORE_ROWBLOCK(kds, blkidx);
bitem = KERN_DATA_STORE_BLKITEM(kds, blkidx);
hostaddr = bitem->page;
}
else
{
htup = kern_get_tuple_rsflat(kds, rbuffer[0] - 1);
baseaddr = (__global char *)&kds->hostptr;
hostaddr = kds->hostptr;
}
}
else
{
__global kern_hashtable *khtable = KERN_HASHTABLE(kmhash, depth);
__global kern_hashentry *kentry;
kentry = (__global kern_hashentry *)
((__global char *)khtable + rbuffer[depth]);
htup = &kentry->htup;
ncols = khtable->ncols;
p_colmeta = khtable->colmeta;
baseaddr = (__global char *)&kmhash->hostptr;
hostaddr = kmhash->hostptr;
}
/* fill up the slot with null */
if (!htup)
{
for (i=0; i < ncols; i++)
gpuhashjoin_projection_datum(&errcode,
slot_values,
slot_isnull,
depth,
i,
0,
NULL);
continue;
}
offset = htup->t_hoff;
nattrs = (htup->t_infomask2 & HEAP_NATTS_MASK);
heap_hasnull = (htup->t_infomask & HEAP_HASNULL);
for (i=0; i < ncols; i++)
{
if (i >= nattrs)
datum = NULL;
else if (heap_hasnull && att_isnull(i, htup->t_bits))
datum = NULL;
else
{
kern_colmeta cmeta = p_colmeta[i];
if (cmeta.attlen > 0)
offset = TYPEALIGN(cmeta.attlen, offset);
else if (!VARATT_NOT_PAD_BYTE((__global char *)htup + offset))
offset = TYPEALIGN(cmeta.attalign, offset);
datum = ((__global char *) htup + offset);
offset += (cmeta.attlen > 0
? cmeta.attlen
: VARSIZE_ANY(datum));
}
/* put datum */
gpuhashjoin_projection_datum(&errcode,
slot_values,
slot_isnull,
depth,
i,
hostaddr + ((uintptr_t) datum -
(uintptr_t) baseaddr),
datum);
}
}
out:
/* write-back execution status into host-side */
kern_writeback_error_status(&kresults->errcode, errcode, LOCAL_WORKMEM);
}
/*
* Template of variable reference on the hash-entry
*/
#define STROMCL_SIMPLE_HASHREF_TEMPLATE(NAME,BASE) \
static pg_##NAME##_t \
pg_##NAME##_hashref(__global kern_hashtable *khtable, \
__global kern_hashentry *kentry, \
__private int *p_errcode, \
cl_uint colidx) \
{ \
pg_##NAME##_t result; \
__global BASE *addr \
= kern_get_datum_tuple(khtable->colmeta, \
&kentry->htup, \
colidx); \
if (!addr) \
result.isnull = true; \
else \
{ \
result.isnull = false; \
result.value = *addr; \
} \
return result; \
}
static pg_varlena_t
pg_varlena_hashref(__global kern_hashtable *khtable,
__global kern_hashentry *kentry,
__private int *p_errcode,
cl_uint colidx)
{
pg_varlena_t result;
__global varlena *vl_ptr
= kern_get_datum_tuple(khtable->colmeta,
&kentry->htup,
colidx);
if (!vl_ptr)
result.isnull = true;
else if (VARATT_IS_4B_U(vl_ptr) || VARATT_IS_1B(vl_ptr))
{
result.value = vl_ptr;
result.isnull = false;
}
else
{
result.isnull = true;
STROM_SET_ERROR(p_errcode, StromError_CpuReCheck);
}
return result;
}
#define STROMCL_VARLENA_HASHREF_TEMPLATE(NAME) \
static pg_##NAME##_t \
pg_##NAME##_hashref(__global kern_hashtable *khtable, \
__global kern_hashentry *kentry, \
__private int *p_errcode, \
cl_uint colidx) \
{ \
return pg_varlena_hashref(khtable, kentry, \
p_errcode, colidx); \
}
/*
* Macros to calculate hash key-value.
* (logic was copied from pg_crc32.c)
*/
#define INIT_CRC32(crc) ((crc) = 0xFFFFFFFF)
#define FIN_CRC32(crc) ((crc) ^= 0xFFFFFFFF)
#define STROMCL_SIMPLE_HASHKEY_TEMPLATE(NAME,BASE) \
static inline cl_uint \
pg_##NAME##_hashkey(__local cl_uint *crc32_table, \
cl_uint hash, pg_##NAME##_t datum) \
{ \
cl_uint __len = sizeof(BASE); \
cl_uint __index; \
union { \
BASE as_base; \
cl_ulong as_long; \
} __data; \
\
if (!datum.isnull) \
{ \
__data.as_base = datum.value; \
while (__len-- > 0) \
{ \
__index = ((hash >> 24) ^ (__data.as_long)) & 0xff; \
hash = crc32_table[__index] ^ (hash << 8); \
__data.as_long = (__data.as_long >> 8); \
} \
} \
return hash; \
}
#define STROMCL_VARLENA_HASHKEY_TEMPLATE(NAME) \
static inline cl_uint \
pg_##NAME##_hashkey(__local cl_uint *crc32_table, \
cl_uint hash, pg_##NAME##_t datum) \
{ \
if (!datum.isnull) \
{ \
__global const cl_char *__data = \
VARDATA_ANY(datum.value); \
cl_uint __len = VARSIZE_ANY_EXHDR(datum.value); \
cl_uint __index; \
while (__len-- > 0) \
{ \
__index = ((hash >> 24) ^ *__data++) & 0xff;\
hash = crc32_table[__index] ^ (hash << 8); \
} \
} \
return hash; \
}
#else /* OPENCL_DEVICE_CODE */
typedef struct pgstrom_multihash_tables
{
StromObject sobj; /* = StromTab_HashJoinTable */
Size length; /* max available length of this mhash */
Size usage; /* current usage of this mhash */
double ntuples; /* number of tuples in this mhash */
bool is_divided; /* true, if not whole of the inner relation */
slock_t lock; /* protection of the fields below */
cl_int refcnt; /* reference counter of this hash table */
cl_int dindex; /* device to load the hash table */
cl_int n_kernel; /* number of active running kernel */
cl_mem m_hash; /* in-kernel buffer object. Once n_kernel
* backed to zero, valid m_hash needs to
* be released. */
cl_event ev_hash; /* event to load hash table to kernel */
kern_multihash kern;
} pgstrom_multihash_tables;
typedef struct
{
pgstrom_message msg; /* = StromTag_GpuHashJoin */
Datum dprog_key; /* device key for gpuhashjoin */
pgstrom_multihash_tables *mhtables; /* inner hashjoin tables */
pgstrom_data_store *pds; /* data store of outer relation */
pgstrom_data_store *pds_dest; /* data store of result buffer */
kern_hashjoin khashjoin; /* kern_hashjoin of this request */
} pgstrom_gpuhashjoin;
#endif /* OPENCL_DEVICE_CODE */
#endif /* OPENCL_HASHJOIN_H */