Skip to content

Commit d5ce90f

Browse files
committed
added migration tooling
1 parent d4e5211 commit d5ce90f

8 files changed

Lines changed: 269 additions & 73 deletions

File tree

.github/workflows/fuzz.go.yaml

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Fuzz - Go Codecs
1+
name: Fuzz - Go
22

33
on:
44
pull_request:
@@ -15,18 +15,11 @@ jobs:
1515
matrix:
1616
include:
1717
- module: x/go
18-
packages: >-
19-
./color/ ./label/ ./spatial/ ./telem/
18+
artifact: x-go
2019
- module: core
21-
packages: >-
22-
./pkg/distribution/channel/ ./pkg/distribution/group/
23-
./pkg/distribution/ontology/ ./pkg/service/arc/ ./pkg/service/device/
24-
./pkg/service/lineplot/ ./pkg/service/log/ ./pkg/service/rack/
25-
./pkg/service/ranger/ ./pkg/service/schematic/ ./pkg/service/table/
26-
./pkg/service/user/ ./pkg/service/workspace/
20+
artifact: core
2721
- module: arc/go
28-
packages: >-
29-
./compiler/ ./graph/ ./ir/ ./program/ ./text/ ./types/
22+
artifact: arc-go
3023
steps:
3124
- name: Checkout repository
3225
uses: actions/checkout@v6
@@ -41,10 +34,16 @@ jobs:
4134
4235
- name: Fuzz
4336
working-directory: ${{ matrix.module }}
37+
# go test -fuzz requires a single package (cannot use ./...), so we
38+
# discover packages containing fuzz targets via grep and iterate.
4439
run: |
45-
for pkg in ${{ matrix.packages }}; do
40+
packages=$(grep -rl '^func Fuzz' --include='*_test.go' . | sed 's|/[^/]*$||' | sort -u)
41+
if [ -z "$packages" ]; then
42+
echo "No fuzz targets found"
43+
exit 0
44+
fi
45+
for pkg in $packages; do
4646
echo "::group::Fuzzing $pkg"
47-
# List all fuzz targets in the package
4847
targets=$(go test "$pkg" -list '^Fuzz' -run '^$' 2>/dev/null | grep '^Fuzz' || true)
4948
for target in $targets; do
5049
echo "--- $target ---"
@@ -57,5 +56,5 @@ jobs:
5756
if: failure()
5857
uses: actions/upload-artifact@v4
5958
with:
60-
name: fuzz-crashes-${{ matrix.module }}
59+
name: fuzz-crashes-${{ matrix.artifact }}
6160
path: ${{ matrix.module }}/**/testdata/fuzz/

core/pkg/distribution/ontology/writer_dag.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import (
1414
"maps"
1515

1616
"github.com/samber/lo"
17-
"github.com/synnaxlabs/x/errors"
1817
"github.com/synnaxlabs/x/gorp"
18+
"github.com/synnaxlabs/x/graph"
1919
)
2020

2121
// dagWriter is a key-value backed directed acyclic graph that implements the Writer
@@ -30,7 +30,7 @@ type dagWriter struct {
3030
var _ Writer = dagWriter{}
3131

3232
// ErrCycle is returned when a cycle is created in the graph.
33-
var ErrCycle = errors.New("[ontology] - cyclic dependency")
33+
var ErrCycle = graph.ErrCyclicDependency
3434

3535
// DefineResource implements the Writer interface.
3636
func (d dagWriter) DefineResource(ctx context.Context, tk ID) error {

core/pkg/distribution/ontology/writer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ var _ = Describe("Writer", func() {
106106
Expect(w.DefineRelationship(ctx, idOne, ontology.RelationshipTypeParentOf, idTwo)).To(Succeed())
107107
err := w.DefineRelationship(ctx, idTwo, ontology.RelationshipTypeParentOf, idOne)
108108
Expect(err).To(HaveOccurred())
109-
Expect(errors.Is(err, ontology.ErrCycle)).To(BeTrue())
109+
Expect(err).To(MatchError(ontology.ErrCycle))
110110
},
111111
)
112112
It("Should return an error is a relationships creates a cycle",
@@ -117,7 +117,7 @@ var _ = Describe("Writer", func() {
117117
Expect(w.DefineRelationship(ctx, idTwo, ontology.RelationshipTypeParentOf, idThree)).To(Succeed())
118118
err := w.DefineRelationship(ctx, idThree, ontology.RelationshipTypeParentOf, idOne)
119119
Expect(err).To(HaveOccurred())
120-
Expect(errors.Is(err, ontology.ErrCycle)).To(BeTrue())
120+
Expect(err).To(MatchError(ontology.ErrCycle))
121121
})
122122
})
123123
})
@@ -155,7 +155,7 @@ var _ = Describe("Writer", func() {
155155
[]ontology.ID{idOne},
156156
)
157157
Expect(err).To(HaveOccurred())
158-
Expect(errors.Is(err, ontology.ErrCycle)).To(BeTrue())
158+
Expect(err).To(MatchError(ontology.ErrCycle))
159159
})
160160
})
161161
Describe("Deleting a Relationship", func() {

x/go/encoding/orc/reader.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
package orc
1111

1212
import (
13-
"errors"
14-
"fmt"
1513
"io"
1614
"math"
15+
16+
"github.com/synnaxlabs/x/errors"
1717
)
1818

1919
// ErrRecursionDepth is returned when decoding exceeds the maximum recursion depth.
20-
var ErrRecursionDepth = errors.New("orc: recursion depth exceeded")
20+
var ErrRecursionDepth = errors.New("[orc] recursion depth exceeded")
2121

2222
// MaxStringLen is the maximum length of a string that can be decoded. In direct
2323
// mode (ResetBytes), the backing slice provides a natural bound. In io.Reader
@@ -185,7 +185,7 @@ func (r *Reader) String() (string, error) {
185185
return s, nil
186186
}
187187
if n > MaxStringLen {
188-
return "", fmt.Errorf(
188+
return "", errors.Newf(
189189
"orc: string length %d exceeds maximum %d", n, MaxStringLen,
190190
)
191191
}

x/go/gorp/migrate.go

Lines changed: 12 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/synnaxlabs/alamos"
1717
"github.com/synnaxlabs/x/binary"
1818
"github.com/synnaxlabs/x/errors"
19+
"github.com/synnaxlabs/x/graph"
1920
"github.com/synnaxlabs/x/kv"
2021
"go.uber.org/zap"
2122
)
@@ -262,16 +263,9 @@ func MigrationDepOpt[T any](ctx context.Context) (T, bool) {
262263
return v, ok
263264
}
264265

265-
// ErrCyclicDependency is returned when migrations form a dependency cycle.
266-
var ErrCyclicDependency = errors.New("cyclic dependency detected in migrations")
267-
268-
// ErrMissingDependency is returned when a migration depends on a name that
269-
// does not exist in the migration list and has not already been applied.
270-
var ErrMissingDependency = errors.New("missing migration dependency")
271-
272266
// topoSort filters out already-applied migrations, then produces a valid
273-
// execution order using Kahn's algorithm. Dependencies that are already applied
274-
// are considered satisfied and do not need to appear in the pending set.
267+
// execution order. Dependencies that are already applied are considered
268+
// satisfied and do not need to appear in the pending set.
275269
func topoSort(migrations []Migration, applied map[string]bool) ([]Migration, error) {
276270
byName := make(map[string]Migration, len(migrations))
277271
for _, m := range migrations {
@@ -299,59 +293,28 @@ func topoSort(migrations []Migration, applied map[string]bool) ([]Migration, err
299293
return pending, nil
300294
}
301295

302-
pendingSet := make(map[string]bool, len(pending))
303-
for _, m := range pending {
304-
pendingSet[m.Name()] = true
305-
}
306-
307-
inDegree := make(map[string]int, len(pending))
308-
dependents := make(map[string][]string, len(pending))
296+
adj := make(map[string][]string, len(pending))
309297
for _, m := range pending {
310298
name := m.Name()
311-
if _, exists := inDegree[name]; !exists {
312-
inDegree[name] = 0
313-
}
299+
adj[name] = nil
314300
if dd, ok := m.(DependencyDeclarer); ok {
315301
for _, dep := range dd.Dependencies() {
316302
if applied[dep] {
317303
continue
318304
}
319-
if !pendingSet[dep] {
320-
if _, known := byName[dep]; !known {
321-
return nil, fmt.Errorf(
322-
"%w: migration %q depends on %q which does not exist",
323-
ErrMissingDependency, name, dep,
324-
)
325-
}
326-
}
327-
inDegree[name]++
328-
dependents[dep] = append(dependents[dep], name)
305+
adj[name] = append(adj[name], dep)
329306
}
330307
}
331308
}
332309

333-
var queue []string
334-
for _, m := range pending {
335-
if inDegree[m.Name()] == 0 {
336-
queue = append(queue, m.Name())
337-
}
338-
}
339-
340-
var sorted []Migration
341-
for len(queue) > 0 {
342-
name := queue[0]
343-
queue = queue[1:]
344-
sorted = append(sorted, byName[name])
345-
for _, dep := range dependents[name] {
346-
inDegree[dep]--
347-
if inDegree[dep] == 0 {
348-
queue = append(queue, dep)
349-
}
350-
}
310+
order, err := graph.TopoSort(adj)
311+
if err != nil {
312+
return nil, err
351313
}
352314

353-
if len(sorted) != len(pending) {
354-
return nil, fmt.Errorf("%w: not all migrations could be ordered", ErrCyclicDependency)
315+
sorted := make([]Migration, len(order))
316+
for i, name := range order {
317+
sorted[i] = byName[name]
355318
}
356319
return sorted, nil
357320
}

x/go/gorp/migrate_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/synnaxlabs/x/binary"
2323
"github.com/synnaxlabs/x/errors"
2424
"github.com/synnaxlabs/x/gorp"
25+
"github.com/synnaxlabs/x/graph"
2526
"github.com/synnaxlabs/x/kv/memkv"
2627
"github.com/synnaxlabs/x/query"
2728
. "github.com/synnaxlabs/x/testutil"
@@ -817,7 +818,7 @@ var _ = Describe("Gorp", func() {
817818
Migrations: []gorp.Migration{m1},
818819
})
819820
Expect(err).To(HaveOccurred())
820-
Expect(err).To(MatchError(ContainSubstring("missing migration dependency")))
821+
Expect(err).To(MatchError(graph.ErrMissingDependency))
821822
Expect(err).To(MatchError(ContainSubstring("nonexistent")))
822823
})
823824

x/go/graph/graph.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"cmp"
1414
"slices"
1515

16+
"github.com/synnaxlabs/x/errors"
1617
"github.com/synnaxlabs/x/set"
1718
)
1819

@@ -83,3 +84,70 @@ func TarjanSCC[T cmp.Ordered](adj map[T][]T) [][]T {
8384
}
8485
return sccs
8586
}
87+
88+
// ErrCyclicDependency is returned by TopoSort when the graph contains a cycle.
89+
var ErrCyclicDependency = errors.New("cyclic dependency detected")
90+
91+
// ErrMissingDependency is returned by TopoSort when an edge references a node
92+
// that does not exist in the graph.
93+
var ErrMissingDependency = errors.New("missing dependency")
94+
95+
// TopoSort returns a topological ordering of the directed acyclic graph
96+
// represented by the adjacency list adj (where adj[a] = [b, c] means a depends
97+
// on b and c, i.e. b and c must come before a). Returns ErrCyclicDependency if
98+
// the graph contains a cycle, or ErrMissingDependency if an edge references a
99+
// node not present as a key in adj. The output is deterministic: nodes at the
100+
// same topological level are sorted by their natural ordering.
101+
func TopoSort[T cmp.Ordered](adj map[T][]T) ([]T, error) {
102+
for node, deps := range adj {
103+
for _, dep := range deps {
104+
if _, exists := adj[dep]; !exists {
105+
return nil, errors.Wrapf(
106+
ErrMissingDependency,
107+
"%v depends on %v which does not exist",
108+
node, dep,
109+
)
110+
}
111+
}
112+
}
113+
114+
inDegree := make(map[T]int, len(adj))
115+
dependents := make(map[T][]T, len(adj))
116+
for node := range adj {
117+
if _, exists := inDegree[node]; !exists {
118+
inDegree[node] = 0
119+
}
120+
for _, dep := range adj[node] {
121+
inDegree[node]++
122+
dependents[dep] = append(dependents[dep], node)
123+
}
124+
}
125+
126+
var queue []T
127+
for node, deg := range inDegree {
128+
if deg == 0 {
129+
queue = append(queue, node)
130+
}
131+
}
132+
slices.Sort(queue)
133+
134+
var sorted []T
135+
for len(queue) > 0 {
136+
node := queue[0]
137+
queue = queue[1:]
138+
sorted = append(sorted, node)
139+
next := dependents[node]
140+
slices.Sort(next)
141+
for _, dep := range next {
142+
inDegree[dep]--
143+
if inDegree[dep] == 0 {
144+
queue = append(queue, dep)
145+
}
146+
}
147+
}
148+
149+
if len(sorted) != len(adj) {
150+
return nil, errors.Wrap(ErrCyclicDependency, "not all nodes could be ordered")
151+
}
152+
return sorted, nil
153+
}

0 commit comments

Comments
 (0)