Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion execgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func main() {
if err != nil {
// Write out incorrect source for easier debugging.
b = buf.Bytes()
fmt.Printf("Code formatting failed with Go parse error: %s", out, err)
fmt.Printf("Code formatting failed with Go parse error: %s", err)
}

wr := os.Stdout
Expand Down
285 changes: 285 additions & 0 deletions hashjoin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
package exectoy

// todo(changangela) hash table should be able to rehash or determine bucket size dynamically
const hashTableBucketSize = 1 << 16

type hashTableInt struct {
first []int
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the other code in this repo isn't well commented, but we'll need to add comments to all of these fields once we productionize it - otherwise, it'll be impossible for people to understand what's going on.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should describe the contract of hashTableInt as well. How is it used? What does it do exactly? At least a few sentences would be helpful.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, what's first and next? How do they work? What's the overall structure of the hash table, what guarantees does it provide?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! I've started migrating this code out to cockroachdb and I promise there is better documentation in there 😆

next []int

keys intColumn
values []intColumn

nOutCols int
count int
bucketSize int

// allocated represents the reserved number of keys that the hashTableInt
// can store in its current state
allocated int
}

func makeHashTableInt(bucketSize int, nOutCols int) *hashTableInt {
values := make([]intColumn, nOutCols)
for i := 0; i < nOutCols; i++ {
values[i] = make(intColumn, 1)
}

hashTable := &hashTableInt{
first: make([]int, bucketSize),
next: make([]int, 1),
keys: make([]int, 1),
values: values,
nOutCols: nOutCols,
bucketSize: bucketSize,
allocated: 1,
}

return hashTable
}

func (hashTable *hashTableInt) hashInt(key int) int {
// todo(changangela) replace with a better hash function or use a library
return key % hashTable.bucketSize
}

func (hashTable *hashTableInt) grow(amount int) {
hashTable.next = append(hashTable.next, make([]int, amount)...)
hashTable.keys = append(hashTable.keys, make(intColumn, amount)...)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may cause multiple allocations. Depending on whether you want fine-grained control over how much the slice will grow, I think the right way to do this is to allocate a new slice if the old's slice capacity is too small to fit the new amount, and copy the old slice into the new slice...

I hear Go 1.11 is more optimal for this case (https://go-review.googlesource.com/c/go/+/109517) but unfortunately we're still on 1.10 for other reasons. If this is your bottleneck i'd consider changing it, otherwise I guess we can leave for now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense, the implementation's a little different with the ColVec stuff and different types, which we can discuss later.


for i := 0; i < hashTable.nOutCols; i++ {
hashTable.values[i] = append(hashTable.values[i], make(intColumn, amount)...)
}
hashTable.allocated += amount
}

func (hashTable *hashTableInt) insert(hashkey int, key int) (id int) {
hashTable.count++

id = hashTable.count
hashTable.next[id] = hashTable.first[hashkey]
hashTable.first[hashkey] = id
hashTable.keys[id] = key

return id
}

type hashJoinBuilder struct {
hashTable *hashTableInt

batchSize int

bucket []int
groupId []int
}

func makeHashJoinBuilder(batchSize int, hashTable *hashTableInt) *hashJoinBuilder {
return &hashJoinBuilder{
hashTable: hashTable,
batchSize: batchSize,
bucket: make([]int, batchSize),
groupId: make([]int, batchSize),
}
}

func (builder *hashJoinBuilder) computeBucket(eqCol intColumn) {
for i := 0; i < builder.batchSize; i++ {
builder.bucket[i] = builder.hashTable.hashInt(eqCol[i])
}
}

func (builder *hashJoinBuilder) insertBatch(eqCol intColumn) {
builder.hashTable.grow(builder.batchSize)

for i := 0; i < builder.batchSize; i++ {
builder.groupId[i] = builder.hashTable.insert(builder.bucket[i], eqCol[i])
}
}

func (builder *hashJoinBuilder) spread(outCol intColumn, valIdx int) {
valCol := builder.hashTable.values[valIdx]

for i:= 0; i < builder.batchSize; i++ {
valCol[builder.groupId[i]] = outCol[i]
}
}

func (builder *hashJoinBuilder) start(flow dataFlow, eqColIdx int, outCols []int) {
eqCol := flow.b[eqColIdx].(intColumn)[:batchRowLen]

builder.computeBucket(eqCol)
builder.insertBatch(eqCol)

for idx, colIdx := range outCols {
outCol := flow.b[colIdx].(intColumn)[:batchRowLen]
builder.spread(outCol, idx)
}
}

type hashJoinProber struct {
hashTable *hashTableInt

batchSize int

bucket []int
groupId []int
toCheck []int
}

func makeHashJoinProber(batchSize int, hashTable *hashTableInt) *hashJoinProber {
return &hashJoinProber{
hashTable: hashTable,
batchSize: batchSize,

bucket: make([]int, batchSize),
groupId: make([]int, batchSize),
toCheck: make([]int, batchSize),
}
}

func (prober *hashJoinProber) computeBucket(eqCol intColumn) {
for i := 0; i < prober.batchSize; i++ {
prober.bucket[i] = prober.hashTable.hashInt(eqCol[i])
}
}

func (prober *hashJoinProber) lookupInitial() {
for i := 0; i < prober.batchSize; i++ {
prober.groupId[i] = prober.hashTable.first[prober.bucket[i]]
prober.toCheck[i] = i
}
}

func (prober *hashJoinProber) check(eqCol intColumn) int {
nToCheck := len(prober.toCheck)
differs := make([]int, 0)

for i := 0; i < nToCheck; i++ {
// id of 0 is reversed to represent end of next chain
if id := prober.groupId[prober.toCheck[i]]; id != 0 {
if prober.hashTable.keys[id] != eqCol[prober.toCheck[i]] {
differs = append(differs, prober.toCheck[i])
}
}
}

prober.toCheck = differs

return len(prober.toCheck)
}

func (prober *hashJoinProber) findNext(nToCheck int) {
for i := 0; i < nToCheck; i++ {
prober.groupId[prober.toCheck[i]] = prober.hashTable.next[prober.groupId[prober.toCheck[i]]]
}
}

func (prober *hashJoinProber) start(flow dataFlow, eqColIdx int, outCols []int, outFlow *dataFlow) {
eqCol := flow.b[eqColIdx].(intColumn)[:batchRowLen]

prober.computeBucket(eqCol)
prober.lookupInitial()

nToCheck := len(prober.toCheck)

for nToCheck > 0 {
nToCheck = prober.check(eqCol)
prober.findNext(nToCheck)
}

groupId := prober.groupId
results := make([]int, 0)
resultsIdx := make([]int, 0)

for i := 0; i < flow.n; i++ {
if groupId[i] != 0 {
results = append(results, groupId[i])
resultsIdx = append(resultsIdx, i)
}
}

nBuildOutCols := prober.hashTable.nOutCols
nResults := len(results)

for j := 0; j < nBuildOutCols; j++ {
outCol := outFlow.b[j].(intColumn)[:batchRowLen]
valCol := prober.hashTable.values[j]

for i := 0; i < nResults; i++ {
outCol[i] = valCol[results[i]]
}
}

for j, colIdx := range outCols {
outCol := outFlow.b[j + nBuildOutCols].(intColumn)[:batchRowLen]
valCol := flow.b[colIdx].(intColumn)[:batchRowLen]

for i := 0; i < nResults; i++ {
outCol[i] = valCol[resultsIdx[i]]
}
}

outFlow.n = nResults
}

type hashJoinIntInt struct {
hashTable *hashTableInt

left ExecOp
right ExecOp

leftEqColIdx int
rightEqColIdx int

leftCols []int
rightCols []int

outFlow dataFlow
}

func (hashJoiner *hashJoinIntInt) Init() {
nOutputCols := len(hashJoiner.leftCols) + len(hashJoiner.rightCols)
if nOutputCols == 0 {
panic("no output cols")
}

hashJoiner.outFlow.b = make(batch, nOutputCols)
for i := range hashJoiner.outFlow.b {
hashJoiner.outFlow.b[i] = make(intColumn, batchRowLen)
}

// todo(changangela): we currently assume build relation has unique join
// column and is on the left

hashJoiner.hashTable = makeHashTableInt(hashTableBucketSize, len(hashJoiner.rightCols))

hashJoiner.build()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't belong in Init, which is designed to run before execution starts at all - more of a setup phase than a do work phase. You should put this into Next behind a conditional that will only run once. In distsql we do this with a little state machine infrastructure.

}

func (hashJoiner *hashJoinIntInt) Next() dataFlow {
flow := hashJoiner.right.Next()
batchSize := flow.n

if batchSize == 0 {
return flow
}

batchProber := makeHashJoinProber(batchSize, hashJoiner.hashTable)
batchProber.start(flow, hashJoiner.rightEqColIdx, hashJoiner.rightCols, &hashJoiner.outFlow)

return hashJoiner.outFlow
}

func (hashJoiner *hashJoinIntInt) build() {
for {
flow := hashJoiner.left.Next()
batchSize := flow.n

if batchSize == 0 {
return
}

batchBuilder := makeHashJoinBuilder(batchSize, hashJoiner.hashTable)
batchBuilder.start(flow, hashJoiner.leftEqColIdx, hashJoiner.leftCols)
}
}
2 changes: 1 addition & 1 deletion mergejoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (m *mergeJoinIntIntOp) bufferMatchGroup(val int, flow dataFlow, colIdx int,
// It's hard because this whole process can span batch boundaries.
// The algorithm should be:
// for each col:
// add value to buffer until group's over or batch ends.
// add value to buffer until groupId's over or batch ends.
// if batch ended early, repeat.
for i, c := range cols {
batchBuf[i] = append(batchBuf[i].(intColumn), flow.b[c].(intColumn)[matchIdx])
Expand Down
63 changes: 63 additions & 0 deletions operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,70 @@ func (s *repeatableBatchSource) Init() {
}
}

type finiteBatchSource struct {
s *repeatableBatchSource
usableCount int
}

func (f *finiteBatchSource) Init() {
f.s.Init()
}

func (f *finiteBatchSource) Next() dataFlow {
f.usableCount--
if f.usableCount > 0 {
return f.s.Next()
} else {
return dataFlow{}
}
}

type uniformDistinctBatchSource struct {
numOutputCols int
internalCol intColumn
internalSel intColumn
usableCount int
curIdx int
}

func (u *uniformDistinctBatchSource) Init() {
u.internalSel = make(intColumn, batchRowLen)

u.internalCol = make(intColumn, hashTableBucketSize)
for i := 0; i < hashTableBucketSize; i++ {
u.internalCol[i] = i;
}
}

func (u *uniformDistinctBatchSource) Next() dataFlow {
u.usableCount--
if u.usableCount > 0 {
batch := make(batch, u.numOutputCols)
for i := 0; i < u.numOutputCols; i++ {
batch[i] = intColumn(u.internalCol[u.curIdx: u.curIdx + batchRowLen])
}

u.curIdx += batchRowLen
if u.curIdx >= hashTableBucketSize {
u.curIdx = 0
}

flow := dataFlow{
b: batch,
sel: u.internalSel,
useSel: false,
n: batchRowLen,
}

return flow
} else {
return dataFlow{}
}
}

var _ ExecOp = &repeatableBatchSource{}
var _ ExecOp = &finiteBatchSource{}
var _ ExecOp = &uniformDistinctBatchSource{}

/*

Expand Down
Loading