Skip to content

SY-4331: Move Channel Functionality to the Service Layer#2499

Open
pjdotson wants to merge 132 commits into
rcfrom
sy-4331-move-most-channel-functionality-to-service-layer
Open

SY-4331: Move Channel Functionality to the Service Layer#2499
pjdotson wants to merge 132 commits into
rcfrom
sy-4331-move-most-channel-functionality-to-service-layer

Conversation

@pjdotson

@pjdotson pjdotson commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Issue Pull Request

Linear Issue

SY-4331

Description

Moves most channel functionality from the distribution layer up to the service layer, tightening the 4-layer boundary so the distribution layer holds only what telemetry routing and storage need.

Distribution layer now owns a minimal, hand-written Channel (name, leaseholder, data type, index, virtual, concurrency — no Internal/Operations/Expression) plus an allocator: channel.Service routes local-key assignment (counters) and storage ts.DB create/delete/rename to the leaseholder. It exposes a narrow channel.Retriever interface (a settable holder) that the framer consumes instead of the full channel service. Nothing in the distribution layer depends on the service layer.

Service layer now owns the rich Channel, the gorp metadata table, the Retrieve builder, ontology/search registration, name/overflow validation, and the create/delete/rename orchestration. It drives the distribution allocator for keys + storage, writes the rich record itself, implements distribution/channel.Retriever (binding it into the framer after open), and creates the per-node control channel.

The Oracle schema is updated to emit the rich Channel (and Operation/OperationType) into the service layer while Key/LocalKey/Name stay in distribution; the internal cluster-transport proto is now hand-maintained. The calculated-channel analyzer is decoupled from the rich type to avoid an import cycle.

Known follow-up

One spec is marked Pending: deleting a remote-leased channel's metadata from a non-leaseholder node does not yet propagate (aspen can't derive the lease for a key-only gorp delete on the originating node). This needs leaseholder routing for metadata deletes/renames, mirroring the pre-refactor behavior. Storage deletion is already routed and works.

Basic Readiness

  • I have performed a self-review of my code.
  • I have added relevant, automated tests to cover the changes.
  • I have updated documentation to reflect the changes.

Greptile Summary

This PR moves the rich channel record, metadata table, CRUD orchestration, ontology/search registration, and overflow enforcement from the distribution layer up to the service layer. The distribution layer now holds only a minimal Channel struct and an allocator (channel.Service) responsible for local-key assignment and storage creation/rename/deletion, exposed through the new Allocator interface. A late-bound RetrieverHolder bridges the layer boundary so the framer can resolve channel metadata without a direct import of the service layer.

  • Distribution layer is stripped to the allocator: Allocate, DeleteStorage, RenameStorage, routing primitives, and local counters. All metadata operations, ontology, search, and overflow tracking move up.
  • Service layer gains the full-featured channel.Service with the gorp table, Retrieve builder, ontology/search integration, create/delete/rename orchestration, and calculated-channel index auto-creation.
  • RetrieverHolder is a new concurrency-safe indirection populated by the service layer after open; framer, codec, iterator, relay, and writer all consume it through the Retriever interface.

Confidence Score: 3/5

The architectural restructuring is sound and the test coverage is extensive, but the nil-unguarded RetrieverHolder means any framer operation that reaches the validation path before the service layer binds the retriever will crash with an opaque panic rather than a recoverable error.

The RetrieverHolder delegates to a nil interface without any guard, producing an unhelpful panic rather than a clear error if a framer path is exercised before the service layer completes its bind. The test files work around this by calling ChannelService() as an explicit setup step with an explanatory comment, showing the team recognises the footgun. In production the startup ordering prevents the race, but the code provides no enforcement. The two other findings are documentation and consistency issues that do not affect runtime correctness under normal operation.

core/pkg/distribution/channel/retriever.go (nil-unguarded RetrieverHolder methods) and core/pkg/distribution/channel/service.go (DeleteStorage includes free channels that have no backing storage).

Important Files Changed

Filename Overview
core/pkg/distribution/channel/retriever.go New file introducing Retriever interface and RetrieverHolder — the late-bound indirection used by the framer. The holder has no nil guard before Bind, which causes a panic rather than a clear error when accessed prematurely.
core/pkg/distribution/channel/service.go Drastically slimmed to a pure allocator — removed table, ontology, search, and writer; exposes Allocate, DeleteStorage, and RenameStorage. DeleteStorage passes batch.Free entries (which have no persistent storage) to DeleteChannels, inconsistent with RenameStorage which explicitly skips free channels.
core/pkg/service/channel/crud.go New file hosting all CRUD logic moved from the distribution layer — create, delete, rename, name validation, overflow enforcement, and ontology resource management. Logic is well-structured and correctly threads the allocator for key/storage operations.
core/pkg/service/channel/service.go Service layer now owns the full-featured channel service: table, retrieval, ontology, search, and overflow enforcement previously held by the distribution service. Config correctly validates required fields and defaults ValidateNames to true.
core/pkg/service/channel/retriever.go New file implementing dischannel.Retriever on the service-layer Service. The implementation comment says it returns query.ErrNotFound, but the interface doc says keys are omitted from the result — a contradictory contract.
core/pkg/distribution/layer.go Correctly wires the new RetrieverHolder and exposes IntOverflowCheck and ValidateChannelNames for the service layer to consume; removes channel-group and ontology config from the distribution channel service.
core/pkg/service/layer.go Correctly binds the channel retriever after opening the service-layer channel service and moves configureControlUpdates here, where it belongs now that the control channel is a rich service-layer channel.
core/pkg/service/channel/wrap.go New test/lightweight helper that opens a service-layer channel service over a distribution layer and binds the retriever. Panics on failure, which is intentional for test contexts but should be documented as test-only.
core/pkg/distribution/mock/cluster.go Adds lazy ChannelService() helper so distribution-layer tests can bind the retriever on demand. The sync.Once prevents double-initialisation. Tests that use the framer without calling ChannelService() first would still panic at the nil retriever.
core/pkg/distribution/channel/lease_proxy.go Reduced to a focused allocator: allocateHandler, deleteHandler, renameHandler, and the routing helpers allocateGateway, allocateFree, allocateRemote. Logic is clean; assignKeys correctly handles the zero-toAssign case.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Client
    participant SvcChannel as service/channel.Service
    participant DistAlloc as distribution/channel.Service (Allocator)
    participant TSChannel as storage/ts.DB
    participant GorpTable as gorp table (service layer)
    participant RetrieverHolder as channel.RetrieverHolder
    participant Framer as distribution/framer.*

    Note over SvcChannel,DistAlloc: Startup sequence
    DistAlloc->>RetrieverHolder: NewRetrieverHolder()
    DistAlloc->>Framer: pass RetrieverHolder as Retriever
    SvcChannel->>SvcChannel: NewService (opens table, ontology, search)
    SvcChannel->>RetrieverHolder: Bind(svcChannel)

    Note over Client,TSChannel: Channel create path
    Client->>SvcChannel: Create(channels)
    SvcChannel->>SvcChannel: validateNames / preprocessCalc
    SvcChannel->>DistAlloc: Allocate(minimalChannels)
    DistAlloc->>DistAlloc: route by leaseholder
    DistAlloc->>TSChannel: CreateChannel (leaseholder node)
    DistAlloc-->>SvcChannel: allocated keys
    SvcChannel->>GorpTable: write rich Channel records
    SvcChannel->>SvcChannel: register ontology resources

    Note over Framer,TSChannel: Frame read path
    Framer->>RetrieverHolder: RetrieveByKeys / ContainsKeys
    RetrieverHolder->>SvcChannel: delegate to bound Service
    SvcChannel->>GorpTable: query + project to dischannel.Channel
    GorpTable-->>Framer: []dischannel.Channel
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Client
    participant SvcChannel as service/channel.Service
    participant DistAlloc as distribution/channel.Service (Allocator)
    participant TSChannel as storage/ts.DB
    participant GorpTable as gorp table (service layer)
    participant RetrieverHolder as channel.RetrieverHolder
    participant Framer as distribution/framer.*

    Note over SvcChannel,DistAlloc: Startup sequence
    DistAlloc->>RetrieverHolder: NewRetrieverHolder()
    DistAlloc->>Framer: pass RetrieverHolder as Retriever
    SvcChannel->>SvcChannel: NewService (opens table, ontology, search)
    SvcChannel->>RetrieverHolder: Bind(svcChannel)

    Note over Client,TSChannel: Channel create path
    Client->>SvcChannel: Create(channels)
    SvcChannel->>SvcChannel: validateNames / preprocessCalc
    SvcChannel->>DistAlloc: Allocate(minimalChannels)
    DistAlloc->>DistAlloc: route by leaseholder
    DistAlloc->>TSChannel: CreateChannel (leaseholder node)
    DistAlloc-->>SvcChannel: allocated keys
    SvcChannel->>GorpTable: write rich Channel records
    SvcChannel->>SvcChannel: register ontology resources

    Note over Framer,TSChannel: Frame read path
    Framer->>RetrieverHolder: RetrieveByKeys / ContainsKeys
    RetrieverHolder->>SvcChannel: delegate to bound Service
    SvcChannel->>GorpTable: query + project to dischannel.Channel
    GorpTable-->>Framer: []dischannel.Channel
Loading

Reviews (1): Last reviewed commit: "SY-4331: Update tests for the channel se..." | Re-trigger Greptile

Greptile also left 4 inline comments on this PR.

pjdotson added 29 commits June 9, 2026 17:11
…l-service-so-it-doesnt-depend-on-arc-service

# Conflicts:
#	core/pkg/service/channel/calculation/calcstate/state_config_test.go
Move the rich Channel struct (and Operation/OperationType) Oracle output
from the distribution layer to the service layer, keeping Key/LocalKey/Name
in distribution. Regenerates the Go/C++/protobuf outputs and the retrieve,
codec, and ontology machinery accordingly. The internal cluster-transport
proto (distribution/channel/pb) is no longer Oracle-managed.
The distribution-layer channel.Service becomes an allocator: it routes
local-key assignment (counters) and storage ts.DB create/delete/rename to
the leaseholder, operating on a minimal, hand-written Channel (name,
leaseholder, data type, index, virtual, concurrency). The gorp table,
retrieval, and ontology lifecycle move out to the service layer.

The framer now depends on a narrow channel.Retriever interface (a settable
holder bound by the service layer after open) instead of the full channel
service, and the control channel is no longer created here.
…layer

The service-layer channel.Service now owns the gorp metadata table, the
Retrieve builder, ontology and search registration, name/overflow
validation, and the create/delete/rename orchestration. It drives the
distribution allocator for key assignment and storage, writes the rich
record itself, and implements distribution's channel.Retriever (bound into
the framer by the service layer, which also creates the control channel).

The calculated-channel analyzer is decoupled from the rich Channel type to
avoid an import cycle, and downstream consumers (signals, transport codecs,
cmd/start) are updated to the new ownership. mock.Node gains a lazy
ChannelService() helper for tests.
Point channel create/retrieve/lifecycle calls through the new service-layer
channel service (mock.Node.ChannelService()), update transport tests to the
channel.Retriever hole, bind retrievers on all nodes in multi-node and relay
scenarios, and use NewWriterNoAnalysis for calculated-channel tests that
intentionally persist invalid or circular expressions (analysis now runs on
create). Move the rich-channel behavior tests to exercise the service layer.

One spec is marked Pending: deleting a remote-leased channel's metadata from
a non-leaseholder node does not yet propagate (needs leaseholder routing for
metadata deletes); storage deletion is routed and works.
…vice-layer

# Conflicts:
#	core/pkg/service/arc/arc_suite_test.go
pjdotson added 13 commits June 23, 2026 16:48
- Add x/go/analyzers/leaklint: a Go analyzer enforcing that every Ginkgo
  suite registers ShouldNotLeakGoroutinesPerSpec() at package scope and that
  every BeforeSuite/BeforeAll calls ShouldNotLeakGoroutines() as its first
  statement.
- Make leak_test.go black-box (package testutil_test); add fasthttp
  updateServerDate to default leak filters (upstream won't-fix #2257).
- Fix Fake.AfterFunc goroutine leak: Stop releases the goroutine and After is
  buffered so Advance's send never blocks.
- Apply the per-spec and setup leak checks across x/go, alamos, aspen, and
  freighter/integration; suppress the un-teardownable OTel daemons in the
  alamos suite via //nolint:leaklint.
…vice-layer

# Conflicts:
#	aspen/internal/kv/config.go
#	aspen/internal/kv/version.go
#	core/cmd/start/start.go
#	core/pkg/api/arc/arc_suite_test.go
#	core/pkg/distribution/channel/counter.go
#	core/pkg/distribution/channel/create_test.go
#	core/pkg/distribution/channel/delete_test.go
#	core/pkg/distribution/channel/limit_test.go
#	core/pkg/distribution/channel/pb/services.pb.go
#	core/pkg/distribution/channel/pb/services.proto
#	core/pkg/distribution/channel/pb/services_grpc.pb.go
#	core/pkg/distribution/channel/rename_test.go
#	core/pkg/distribution/channel/service.go
#	core/pkg/distribution/channel/service_test.go
#	core/pkg/distribution/channel/transport.go
#	core/pkg/distribution/framer/codec/codec_test.go
#	core/pkg/distribution/framer/deleter/deleter_test.go
#	core/pkg/distribution/framer/deleter/lease_proxy.go
#	core/pkg/distribution/framer/iterator/iterator_test.go
#	core/pkg/distribution/framer/iterator/service.go
#	core/pkg/distribution/framer/relay/relay_test.go
#	core/pkg/distribution/framer/relay/transport.go
#	core/pkg/distribution/framer/writer/service.go
#	core/pkg/distribution/framer/writer/transport.go
#	core/pkg/distribution/framer/writer/writer_test.go
#	core/pkg/distribution/layer.go
#	core/pkg/distribution/mock/cluster.go
#	core/pkg/distribution/mock/cluster_test.go
#	core/pkg/distribution/mock/node.go
#	core/pkg/distribution/mock/node_test.go
#	core/pkg/distribution/mock/static_host_provider.go
#	core/pkg/distribution/mock/static_host_provider_test.go
#	core/pkg/distribution/proxy/proxy.go
#	core/pkg/distribution/proxy/proxy_test.go
#	core/pkg/distribution/transport/grpc/channel/channel_suite_test.go
#	core/pkg/distribution/transport/grpc/channel/transport.go
#	core/pkg/distribution/transport/grpc/channel/transport_test.go
#	core/pkg/distribution/transport/grpc/framer/framer_suite_test.go
#	core/pkg/distribution/transport/grpc/framer/transport.go
#	core/pkg/distribution/transport/grpc/framer/transport_test.go
#	core/pkg/distribution/transport/grpc/grpc_suite_test.go
#	core/pkg/service/actions/actions_suite_test.go
#	core/pkg/service/actions/signals_test.go
#	core/pkg/service/arc/arc_suite_test.go
#	core/pkg/service/arc/rename_test.go
#	core/pkg/service/arc/runtime/dependencies_test.go
#	core/pkg/service/arc/service_test.go
#	core/pkg/service/arc/task/task_test.go
#	core/pkg/service/channel/calculation/compiler/compile_test.go
#	core/pkg/service/channel/calculation/graph/graph_test.go
#	core/pkg/service/channel/channel_suite_test.go
#	core/pkg/service/channel/channel_test.go
#	core/pkg/service/channel/ontology_test.go
#	core/pkg/service/channel/resolver_test.go
#	core/pkg/service/channel/retrieve_test.go
#	core/pkg/service/channel/signals/signals_suite_test.go
#	core/pkg/service/channel/signals/signals_test.go
#	core/pkg/service/driver/driver_suite_test.go
#	core/pkg/service/framer/calculation/calculator/bench_test.go
#	core/pkg/service/framer/calculation/calculator/calculator_test.go
#	core/pkg/service/framer/calculation/graph/graph_test.go
#	core/pkg/service/framer/calculation/service_test.go
#	core/pkg/service/framer/iterator/bench_test.go
#	core/pkg/service/framer/iterator/iterator_test.go
#	core/pkg/service/framer/streamer/bench_test.go
#	core/pkg/service/framer/streamer/streamer_test.go
#	core/pkg/service/log/service_test.go
#	core/pkg/service/metrics/metrics_suite_test.go
#	core/pkg/service/node/ontology_test.go
#	core/pkg/service/ontology/signals/signals_suite_test.go
#	core/pkg/service/ontology/signals/signals_test.go
#	core/pkg/service/panel/panel_suite_test.go
#	core/pkg/service/panel/signals_test.go
#	core/pkg/service/ranger/alias/alias_test.go
#	core/pkg/service/signals/publisher_test.go
#	core/pkg/service/signals/signals_suite_test.go
#	core/pkg/transport/grpc/framer/framer_suite_test.go
#	core/pkg/transport/grpc/grpc_suite_test.go
#	core/pkg/transport/http/framer/framer_suite_test.go
#	core/pkg/transport/http/http_suite_test.go
#	core/pkg/transport/transport_suite_test.go
#	x/go/kv/counter_test.go
#	x/go/kv/tx_test.go
Drops the x/go/analyzers/leaklint analyzer and the per-suite leak-check
enforcement it required, reverting the affected suite/setup files, testutil/leak.go,
and time/clock.go to their rc state. Unrelated scope for this PR.
Resets x/go/testutil/leak_test.go, aspen/internal/kv/version.go, and
cesium/control_test.go to rc; these changes are out of scope for this PR.
Matches rc's pattern: BeforeSuite + StartServer instead of a manual
per-spec server/pool setup.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant