Skip to content

Commit 6d9bdb2

Browse files
committed
Introduce task-parallel processing
Signed-off-by: Carl Pearson <cwpears@sandia.gov>
1 parent e2a8207 commit 6d9bdb2

5 files changed

Lines changed: 590 additions & 94 deletions

File tree

analysis/runner.go

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
// Copyright 2025 National Technology and Engineering Solutions of Sandia
2+
// SPDX-License-Identifier: BSD-3-Clause
3+
package analysis
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"sync"
9+
10+
"github.com/sandialabs/bibcheck/lookup"
11+
)
12+
13+
const DefaultWorkers = 4
14+
15+
type Status string
16+
17+
const (
18+
StatusWaiting Status = "waiting"
19+
StatusPending Status = "pending"
20+
StatusActive Status = "active"
21+
StatusCompleted Status = "completed"
22+
StatusError Status = "error"
23+
)
24+
25+
type Stage string
26+
27+
const (
28+
StageExtraction Stage = "extraction"
29+
StageLookup Stage = "lookup"
30+
StageSummary Stage = "summary"
31+
)
32+
33+
type Summary struct {
34+
Mismatch bool
35+
Comment string
36+
}
37+
38+
type Entry struct {
39+
ID int
40+
41+
ExtractionStatus Status
42+
Text string
43+
ExtractionError error
44+
45+
LookupStatus Status
46+
Result *lookup.Result
47+
LookupError error
48+
49+
SummaryStatus Status
50+
Summary Summary
51+
SummaryError error
52+
}
53+
54+
func (e Entry) Terminal() bool {
55+
return e.ExtractionStatus == StatusError || e.LookupStatus == StatusError ||
56+
e.SummaryStatus == StatusCompleted || e.SummaryStatus == StatusError
57+
}
58+
59+
type Snapshot struct {
60+
Entries []Entry
61+
Completed int
62+
Done bool
63+
}
64+
65+
type Config struct {
66+
EntryIDs []int
67+
Workers int
68+
Extract func(int) (string, error)
69+
Lookup func(string) (*lookup.Result, error)
70+
Summarize func(*lookup.Result) (Summary, error)
71+
Progress func(Snapshot)
72+
}
73+
74+
type job struct {
75+
index int
76+
stage Stage
77+
entry Entry
78+
}
79+
80+
type table struct {
81+
mu sync.Mutex
82+
cond *sync.Cond
83+
entries []Entry
84+
completed int
85+
stopped bool
86+
}
87+
88+
func newTable(ids []int) *table {
89+
t := &table{entries: make([]Entry, len(ids))}
90+
t.cond = sync.NewCond(&t.mu)
91+
for i, id := range ids {
92+
t.entries[i] = Entry{
93+
ID: id,
94+
ExtractionStatus: StatusPending,
95+
LookupStatus: StatusWaiting,
96+
SummaryStatus: StatusWaiting,
97+
}
98+
}
99+
return t
100+
}
101+
102+
// claim returns the first available operation in bibliography order.
103+
func (t *table) claim(ctx context.Context) (job, bool) {
104+
t.mu.Lock()
105+
defer t.mu.Unlock()
106+
for {
107+
if ctx.Err() != nil {
108+
t.stopped = true
109+
}
110+
if t.stopped || t.completed == len(t.entries) {
111+
return job{}, false
112+
}
113+
for i := range t.entries {
114+
e := &t.entries[i]
115+
var stage Stage
116+
switch {
117+
case e.ExtractionStatus == StatusPending:
118+
e.ExtractionStatus = StatusActive
119+
stage = StageExtraction
120+
case e.ExtractionStatus == StatusCompleted && e.LookupStatus == StatusPending:
121+
e.LookupStatus = StatusActive
122+
stage = StageLookup
123+
case e.LookupStatus == StatusCompleted && e.SummaryStatus == StatusPending:
124+
e.SummaryStatus = StatusActive
125+
stage = StageSummary
126+
default:
127+
continue
128+
}
129+
return job{index: i, stage: stage, entry: *e}, true
130+
}
131+
t.cond.Wait()
132+
}
133+
}
134+
135+
func (t *table) complete(j job, value any, err error) {
136+
t.mu.Lock()
137+
defer t.mu.Unlock()
138+
if t.stopped {
139+
return
140+
}
141+
e := &t.entries[j.index]
142+
switch j.stage {
143+
case StageExtraction:
144+
if err != nil {
145+
e.ExtractionStatus = StatusError
146+
e.ExtractionError = err
147+
t.completed++
148+
} else {
149+
e.ExtractionStatus = StatusCompleted
150+
e.Text = value.(string)
151+
e.LookupStatus = StatusPending
152+
}
153+
case StageLookup:
154+
if err != nil {
155+
e.LookupStatus = StatusError
156+
e.LookupError = err
157+
t.completed++
158+
} else {
159+
e.LookupStatus = StatusCompleted
160+
e.Result = value.(*lookup.Result)
161+
e.SummaryStatus = StatusPending
162+
}
163+
case StageSummary:
164+
if err != nil {
165+
e.SummaryStatus = StatusError
166+
e.SummaryError = err
167+
} else {
168+
e.SummaryStatus = StatusCompleted
169+
e.Summary = value.(Summary)
170+
}
171+
t.completed++
172+
}
173+
t.cond.Broadcast()
174+
}
175+
176+
func (t *table) stop() {
177+
t.mu.Lock()
178+
t.stopped = true
179+
t.cond.Broadcast()
180+
t.mu.Unlock()
181+
}
182+
183+
func (t *table) snapshot() Snapshot {
184+
t.mu.Lock()
185+
defer t.mu.Unlock()
186+
entries := make([]Entry, len(t.entries))
187+
copy(entries, t.entries)
188+
return Snapshot{
189+
Entries: entries,
190+
Completed: t.completed,
191+
Done: t.completed == len(t.entries),
192+
}
193+
}
194+
195+
func Run(ctx context.Context, cfg Config) (Snapshot, error) {
196+
if len(cfg.EntryIDs) == 0 {
197+
return Snapshot{}, fmt.Errorf("no bibliography entries")
198+
}
199+
if cfg.Extract == nil || cfg.Lookup == nil || cfg.Summarize == nil {
200+
return Snapshot{}, fmt.Errorf("incomplete analysis configuration")
201+
}
202+
workers := cfg.Workers
203+
if workers < 1 {
204+
workers = DefaultWorkers
205+
}
206+
if workers > len(cfg.EntryIDs) {
207+
workers = len(cfg.EntryIDs)
208+
}
209+
210+
t := newTable(cfg.EntryIDs)
211+
updates := make(chan struct{}, workers*2+1)
212+
var dispatch sync.WaitGroup
213+
dispatch.Add(1)
214+
go func() {
215+
defer dispatch.Done()
216+
for range updates {
217+
if cfg.Progress != nil {
218+
cfg.Progress(t.snapshot())
219+
}
220+
}
221+
}()
222+
notify := func() {
223+
select {
224+
case updates <- struct{}{}:
225+
default:
226+
// A queued notification will observe the latest table snapshot.
227+
}
228+
}
229+
notify()
230+
231+
stopWatching := make(chan struct{})
232+
go func() {
233+
select {
234+
case <-ctx.Done():
235+
t.stop()
236+
case <-stopWatching:
237+
}
238+
}()
239+
240+
var wg sync.WaitGroup
241+
wg.Add(workers)
242+
for range workers {
243+
go func() {
244+
defer wg.Done()
245+
for {
246+
j, ok := t.claim(ctx)
247+
if !ok {
248+
return
249+
}
250+
notify()
251+
var value any
252+
var err error
253+
switch j.stage {
254+
case StageExtraction:
255+
value, err = cfg.Extract(j.entry.ID)
256+
case StageLookup:
257+
value, err = cfg.Lookup(j.entry.Text)
258+
case StageSummary:
259+
value, err = cfg.Summarize(j.entry.Result)
260+
}
261+
t.complete(j, value, err)
262+
notify()
263+
}
264+
}()
265+
}
266+
wg.Wait()
267+
close(stopWatching)
268+
close(updates)
269+
dispatch.Wait()
270+
271+
result := t.snapshot()
272+
if err := ctx.Err(); err != nil {
273+
return result, err
274+
}
275+
return result, nil
276+
}

0 commit comments

Comments
 (0)