Skip to content
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e458d38
write RFC about calculated channel extraction
pjdotson Jun 9, 2026
404ef64
Merge branch 'rc' into sy-4348-refactor-channel-service-so-it-doesnt-…
pjdotson Jun 10, 2026
1a13679
Merge branch 'rc' into sy-4348-refactor-channel-service-so-it-doesnt-…
pjdotson Jun 11, 2026
6132eaa
wip: checkpoint before merging rc
pjdotson Jun 12, 2026
c56945e
Merge remote-tracking branch 'origin/rc' into sy-4348-refactor-channe…
pjdotson Jun 12, 2026
a01c267
More changes
pjdotson Jun 12, 2026
82b40db
Some service renamings
pjdotson Jun 12, 2026
7f7191f
change renaming stuff
pjdotson Jun 12, 2026
9ba6bb5
More renamings to distchannel
pjdotson Jun 12, 2026
aad2a7c
change state config dependency
pjdotson Jun 12, 2026
94ec055
Merge branch 'rc' into sy-4348-refactor-channel-service-so-it-doesnt-…
pjdotson Jun 12, 2026
7baf698
More renamings
pjdotson Jun 12, 2026
1ef8eed
Migrate line plot migration as well
pjdotson Jun 12, 2026
12d4bff
more refactors
pjdotson Jun 12, 2026
60b1291
rename
pjdotson Jun 12, 2026
23abfaa
Merge branch 'rc' into sy-4348-refactor-channel-service-so-it-doesnt-…
pjdotson Jun 12, 2026
ff3b5d5
refactor some things
pjdotson Jun 12, 2026
93c9287
More consolidations
pjdotson Jun 12, 2026
9d3c1ed
Change a few last things
pjdotson Jun 12, 2026
4fe7ee4
Remove dead code
pjdotson Jun 12, 2026
80c639f
More fixes
pjdotson Jun 12, 2026
3aa3a2b
move
pjdotson Jun 13, 2026
859c7cc
init implementation
pjdotson Jun 15, 2026
1589a98
Merge branch 'rc' into sy-4348-refactor-channel-service-so-it-doesnt-…
pjdotson Jun 15, 2026
e526d2a
Merge branch 'sy-4348-refactor-channel-service-so-it-doesnt-depend-on…
pjdotson Jun 15, 2026
a6e7bc5
Merge branch 'rc' into sy-4333-move-signal-propagation-out-of-distrib…
pjdotson Jun 15, 2026
a31f0d5
Fix new signals stuff
pjdotson Jun 15, 2026
6105387
More changes
pjdotson Jun 15, 2026
4f67f3c
Fix build issues
pjdotson Jun 15, 2026
73e1bd5
Run gofmt
pjdotson Jun 15, 2026
c1c3dff
cdc renamings
pjdotson Jun 15, 2026
4d4e741
Fix some test code
pjdotson Jun 15, 2026
7eca659
Change test styles
pjdotson Jun 15, 2026
9ea5424
Update channel and group signals
pjdotson Jun 15, 2026
ff25f6d
Simplify ontology signals code
pjdotson Jun 15, 2026
0476e10
Fix ontology signals suite
pjdotson Jun 15, 2026
9a8830a
A few more fixes
pjdotson Jun 15, 2026
1b94d2c
add continues and remove duplicate code
pjdotson Jun 15, 2026
5af3a1b
Don't notify on empty changes
pjdotson Jun 15, 2026
bc4ee14
change dependency order
pjdotson Jun 15, 2026
194a5d9
Merge branch 'rc' into sy-4333-move-signal-propagation-out-of-distrib…
pjdotson Jun 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/pkg/distribution/channel/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ var _ = Describe("Create", Ordered, func() {
})
It("Should create the channel without error", func(ctx SpecContext) {
Expect(ch.Key().Leaseholder()).To(Equal(aspen.NodeKeyFree))
Expect(ch.Key().LocalKey()).To(Equal(channel.LocalKey(5)))
Expect(ch.Key().LocalKey()).To(Equal(channel.LocalKey(1)))
Expect(mockCluster.Nodes[1].Storage.TS.RetrieveChannels(ctx, ch.Key().StorageKey())).
Error().To(MatchError(query.ErrNotFound))
})
Expand Down
33 changes: 0 additions & 33 deletions core/pkg/distribution/group/signals/signals.go

This file was deleted.

50 changes: 0 additions & 50 deletions core/pkg/distribution/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,17 @@ package distribution
import (
"context"
"fmt"
"io"

"github.com/samber/lo"
"github.com/synnaxlabs/alamos"
"github.com/synnaxlabs/aspen"
"github.com/synnaxlabs/synnax/pkg/distribution/channel"
channelsignals "github.com/synnaxlabs/synnax/pkg/distribution/channel/signals"
"github.com/synnaxlabs/synnax/pkg/distribution/channel/verification"
"github.com/synnaxlabs/synnax/pkg/distribution/framer"
"github.com/synnaxlabs/synnax/pkg/distribution/group"
groupsignals "github.com/synnaxlabs/synnax/pkg/distribution/group/signals"
"github.com/synnaxlabs/synnax/pkg/distribution/node"
"github.com/synnaxlabs/synnax/pkg/distribution/ontology"
ontologysignals "github.com/synnaxlabs/synnax/pkg/distribution/ontology/signals"
"github.com/synnaxlabs/synnax/pkg/distribution/search"
"github.com/synnaxlabs/synnax/pkg/distribution/signals"
"github.com/synnaxlabs/synnax/pkg/storage"
"github.com/synnaxlabs/x/address"
"github.com/synnaxlabs/x/config"
Expand Down Expand Up @@ -75,11 +70,6 @@ type LayerConfig struct {
//
// [REQUIRED]
Storage *storage.Layer
// EnableServiceSignals sets whether to enable CDC signal propagation for changes
// to distribution layer data structures (channels, groups, etc.)
//
// [OPTIONAL] - Defaults to true.
EnableServiceSignals *bool
// ValidateChannelNames disables channel name validation when true.
// This allows channels with special characters, spaces, etc.
//
Expand Down Expand Up @@ -116,7 +106,6 @@ var (
// required fields specific in Config.
DefaultLayerConfig = LayerConfig{
GorpCodec: orc.NewCodec(msgpack.Codec),
EnableServiceSignals: new(true),
ValidateChannelNames: new(true),
}
)
Expand All @@ -134,7 +123,6 @@ func (c LayerConfig) Override(other LayerConfig) LayerConfig {
c.Verifier = override.String(c.Verifier, other.Verifier)
c.TestingIntOverflowCheck = override.Nil(c.TestingIntOverflowCheck, other.TestingIntOverflowCheck)
c.GorpCodec = override.Nil(c.GorpCodec, other.GorpCodec)
c.EnableServiceSignals = override.Nil(c.EnableServiceSignals, other.EnableServiceSignals)
c.ValidateChannelNames = override.Nil(c.ValidateChannelNames, other.ValidateChannelNames)
return c
}
Expand All @@ -149,7 +137,6 @@ func (c LayerConfig) Validate() error {
validate.NotNil(v, "frame_transport", c.FrameTransport)
validate.NotNil(v, "aspen_transport", c.AspenTransport)
validate.NotNil(v, "codec", c.GorpCodec)
validate.NotNil(v, "enable_channel_signals", c.EnableServiceSignals)
validate.NotNil(v, "disable_channel_name_validation", c.ValidateChannelNames)
return v.Error()
}
Expand Down Expand Up @@ -179,9 +166,6 @@ type Layer struct {
// Search is the full-text search index for ontology resources.
// [REQUIRED]
Search *search.Index
// Signals are for propagating changes to data structures through channels in
// Synnax.
Signals *signals.Provider
// Group is for grouping related resources in the cluster.
Group *group.Service
// Verification verifies that the universe remains as it is.
Expand Down Expand Up @@ -302,40 +286,6 @@ func OpenLayer(ctx context.Context, cfgs ...LayerConfig) (l *Layer, err error) {
return nil, err
}

if l.Signals, err = signals.New(signals.Config{
Channel: l.Channel,
Framer: l.Framer,
Instrumentation: cfg.Child("signals"),
}); !ok(err, nil) {
return nil, err
}

if *cfg.EnableServiceSignals {
var channelSignalsCloser io.Closer
if channelSignalsCloser, err = channelsignals.Publish(
ctx,
l.Signals,
l.Channel.Observe(),
); !ok(err, channelSignalsCloser) {
return nil, err
}
var groupSignalsCloser io.Closer
if groupSignalsCloser, err = groupsignals.Publish(ctx, l.Signals, l.Group.Observe()); !ok(err, groupSignalsCloser) {
return nil, err
}
}

if l.Cluster.HostKey() == node.KeyBootstrapper {
var ontologyCDCCloser io.Closer
if ontologyCDCCloser, err = ontologysignals.Publish(
ctx,
l.Signals,
l.Ontology,
); !ok(err, ontologyCDCCloser) {
return nil, err
}
}

return l, err
}

Expand Down
1 change: 0 additions & 1 deletion core/pkg/distribution/mock/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func (c *Cluster) Provision(
AspenOptions: []aspen.Option{
aspen.WithPropagationConfig(aspen.FastPropagationConfig),
},
EnableServiceSignals: new(false),
}, c.cfg}, cfgs...)...))
)
node := Node{Layer: distributionLayer, Storage: storageLayer}
Expand Down
6 changes: 3 additions & 3 deletions core/pkg/distribution/ontology/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type Service interface {
// RetrieveResource returns the resource with the give key (Name.Name). If the resource
// does not exist, returns a query.ErrNotFound error.
RetrieveResource(ctx context.Context, key string, tx gorp.Tx) (Resource, error)
// Observable is used by the ontology to subscribe to changes in the entities.
// This is used to propagate changes via the ResourceObserver for CDC signals.
// If the service's entities are static, use observe.Noop.
// Observable is used by the ontology to subscribe to changes in the entities. This
// is used to propagate changes via the ResourceObserver for signals. If the
// service's entities are static, use observe.Noop.
observe.Observable[iter.Seq[Change]]
}

Expand Down
126 changes: 0 additions & 126 deletions core/pkg/distribution/ontology/signals/signals.go

This file was deleted.

22 changes: 0 additions & 22 deletions core/pkg/distribution/ontology/signals/signals_suite_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion core/pkg/service/access/rbac/policy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/synnaxlabs/alamos"
"github.com/synnaxlabs/synnax/pkg/distribution/ontology"
"github.com/synnaxlabs/synnax/pkg/distribution/search"
"github.com/synnaxlabs/synnax/pkg/distribution/signals"
"github.com/synnaxlabs/synnax/pkg/service/access/rbac/policy/migrations/v0"
"github.com/synnaxlabs/synnax/pkg/service/signals"
"github.com/synnaxlabs/x/config"
"github.com/synnaxlabs/x/gorp"
xio "github.com/synnaxlabs/x/io"
Expand Down
2 changes: 1 addition & 1 deletion core/pkg/service/access/rbac/role/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/synnaxlabs/synnax/pkg/distribution/group"
"github.com/synnaxlabs/synnax/pkg/distribution/ontology"
"github.com/synnaxlabs/synnax/pkg/distribution/search"
"github.com/synnaxlabs/synnax/pkg/distribution/signals"
"github.com/synnaxlabs/synnax/pkg/service/signals"
"github.com/synnaxlabs/x/config"
"github.com/synnaxlabs/x/gorp"
xio "github.com/synnaxlabs/x/io"
Expand Down
2 changes: 1 addition & 1 deletion core/pkg/service/access/rbac/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (
"github.com/synnaxlabs/synnax/pkg/distribution/group"
"github.com/synnaxlabs/synnax/pkg/distribution/ontology"
"github.com/synnaxlabs/synnax/pkg/distribution/search"
"github.com/synnaxlabs/synnax/pkg/distribution/signals"
"github.com/synnaxlabs/synnax/pkg/service/access"
"github.com/synnaxlabs/synnax/pkg/service/access/rbac/builtin"
v0 "github.com/synnaxlabs/synnax/pkg/service/access/rbac/migrations/v0"
"github.com/synnaxlabs/synnax/pkg/service/access/rbac/policy"
"github.com/synnaxlabs/synnax/pkg/service/access/rbac/role"
"github.com/synnaxlabs/synnax/pkg/service/signals"
"github.com/synnaxlabs/synnax/pkg/service/user"
"github.com/synnaxlabs/x/config"
"github.com/synnaxlabs/x/errors"
Expand Down
10 changes: 9 additions & 1 deletion core/pkg/service/actions/actions_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/synnaxlabs/synnax/pkg/distribution/mock"
"github.com/synnaxlabs/synnax/pkg/service/signals"
. "github.com/synnaxlabs/x/testutil"
)

Expand All @@ -25,11 +26,18 @@ func TestActions(t *testing.T) {

var _ = ShouldNotLeakGoroutinesPerSpec()

var dist mock.Node
var (
dist mock.Node
sigs *signals.Provider
)

var _ = BeforeSuite(func(ctx SpecContext) {
cluster := DeferClose(mock.NewCluster())
dist = DeferClose(cluster.Provision(ctx))
sigs = MustSucceed(signals.New(signals.Config{
Channel: dist.Channel,
Framer: dist.Framer,
}))
})

// testAction is a small concrete action type used to instantiate the generic
Expand Down
2 changes: 1 addition & 1 deletion core/pkg/service/actions/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"fmt"
"io"

"github.com/synnaxlabs/synnax/pkg/distribution/signals"
"github.com/synnaxlabs/synnax/pkg/service/channel"
"github.com/synnaxlabs/synnax/pkg/service/signals"
xchange "github.com/synnaxlabs/x/change"
"github.com/synnaxlabs/x/errors"
"github.com/synnaxlabs/x/observe"
Expand Down
Loading
Loading