Skip to content

Commit 447033b

Browse files
committed
fix monotonicity of doc ids
the concurrent shatter had left open the possibility of having non-monotonically increasing document ids. Fix with a sync.Cond
1 parent 9979f3b commit 447033b

File tree

4 files changed

+35
-20
lines changed

4 files changed

+35
-20
lines changed

cmd/dupi/extract.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,6 @@ func (x *extractCmd) Run(args []string) error {
5353
defer x.index.Close()
5454
query := x.index.StartQuery(dupi.QueryMaxBlot)
5555
shape := []dupi.Blot{
56-
{Blot: 0, Docs: make([]dupi.Doc, 0, 32)},
57-
{Blot: 0, Docs: make([]dupi.Doc, 0, 32)},
58-
{Blot: 0, Docs: make([]dupi.Doc, 0, 32)},
59-
{Blot: 0, Docs: make([]dupi.Doc, 0, 32)},
60-
{Blot: 0, Docs: make([]dupi.Doc, 0, 32)},
61-
{Blot: 0, Docs: make([]dupi.Doc, 0, 32)},
62-
{Blot: 0, Docs: make([]dupi.Doc, 0, 32)},
63-
{Blot: 0, Docs: make([]dupi.Doc, 0, 32)},
64-
{Blot: 0, Docs: make([]dupi.Doc, 0, 32)},
65-
{Blot: 0, Docs: make([]dupi.Doc, 0, 32)},
6656
{Blot: 0, Docs: make([]dupi.Doc, 0, 32)},
6757
{Blot: 0, Docs: make([]dupi.Doc, 0, 32)}}
6858
for {

dmd/adder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func (t *Adder) Add(fid, start, end uint32) (uint32, error) {
4242
return 0, err
4343
}
4444
}
45+
n = uint32(len(t.buf))
4546
t.buf = append(t.buf, fields{fid, start, end})
4647
return n + t.flushed, nil
4748
}

post/t.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,19 @@
1616
// dupi blots with dupi internal document ids.
1717
package post
1818

19+
import "fmt"
20+
1921
// a post is a tuple of document id, blot
2022
type T uint64
2123

2224
func (p T) Docid() uint32 {
2325
return uint32(p >> 32)
2426
}
2527

28+
func (p T) String() string {
29+
return fmt.Sprintf("<%d,%x>", p.Docid(), p.Blot()&0xffff)
30+
}
31+
2632
func (p T) Blot() uint32 {
2733
return uint32(p)
2834
}

shatter.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@ import (
2222
"github.com/go-air/dupi/token"
2323
)
2424

25-
type shardMsg struct {
26-
posts []post.T
27-
}
28-
2925
type shatterReq struct {
3026
docid uint32
3127
offset uint32
@@ -37,12 +33,13 @@ func startShatter(ns, n, s int,
3733
tf token.TokenizerFunc, blotcfg *blotter.Config,
3834
chns []chan []post.T) (chan *shatterReq, error) {
3935
rch := make(chan *shatterReq)
36+
mono := newMono()
4037
for i := 0; i < ns; i++ {
4138
bler, err := blotter.FromConfig(blotcfg)
4239
if err != nil {
4340
return nil, err
4441
}
45-
sh := newShatter(n, s, tf, bler)
42+
sh := newShatter(n, s, tf, bler, mono)
4643
copy(sh.shardChns, chns)
4744
go func(sh *shatter) {
4845
for {
@@ -60,22 +57,34 @@ func startShatter(ns, n, s int,
6057
return rch, nil
6158
}
6259

60+
type mono struct {
61+
docid uint32
62+
cond *sync.Cond
63+
}
64+
65+
func newMono() *mono {
66+
var mu sync.Mutex
67+
return &mono{cond: sync.NewCond(&mu)}
68+
}
69+
6370
type shatter struct {
6471
tokfn token.TokenizerFunc
6572
tokb []token.T
6673
bler blotter.T
6774
seqlen int
6875
d [][]post.T
6976
shardChns []chan []post.T
77+
mono *mono
7078
}
7179

72-
func newShatter(n, s int, tf token.TokenizerFunc, bler blotter.T) *shatter {
80+
func newShatter(n, s int, tf token.TokenizerFunc, bler blotter.T, mono *mono) *shatter {
7381
res := &shatter{
7482
tokfn: tf,
7583
bler: bler,
7684
seqlen: s,
7785
shardChns: make([]chan []post.T, n),
78-
d: make([][]post.T, n)}
86+
d: make([][]post.T, n),
87+
mono: mono}
7988
for i := range res.shardChns {
8089
res.shardChns[i] = make(chan []post.T)
8190
}
@@ -100,26 +109,35 @@ func (s *shatter) do(did, offset uint32, msg []byte) {
100109
default:
101110
}
102111
}
103-
s.send()
112+
s.send(did)
104113
}
105114

106-
func (s *shatter) send() {
115+
func (s *shatter) send(did uint32) {
116+
s.mono.cond.L.Lock()
117+
for s.mono.docid != did-1 {
118+
s.mono.cond.Wait()
119+
}
120+
107121
var wg sync.WaitGroup
108122
for i, ps := range s.d {
109123
wg.Add(1)
110124
go func(i int, ps []post.T) {
111125
defer wg.Done()
112126
s.shardChns[i] <- ps
113127
<-s.shardChns[i]
114-
s.d[i] = nil //ps[:0]
128+
s.d[i] = nil //ps[:0] (was racy)
115129

116130
}(i, ps)
117131
}
118132
wg.Wait()
133+
s.mono.docid = did
134+
s.mono.cond.Broadcast()
135+
s.mono.cond.L.Unlock()
119136
}
120137

121138
func (s *shatter) blot(docid, b uint32) {
122139
n := uint32(len(s.d))
140+
123141
i := b % n
124142
s.d[i] = append(s.d[i], post.Make(docid, b/n))
125143
}

0 commit comments

Comments
 (0)