From 65caba39ca6cab0cb3a5a095eb775683355a06da Mon Sep 17 00:00:00 2001 From: Chris Gianelloni Date: Wed, 18 Feb 2026 16:59:06 -0500 Subject: [PATCH 1/2] feat(tray): adder config Signed-off-by: Chris Gianelloni --- input/chainsync/chainsync.go | 13 +- input/mempool/mempool.go | 1 + output/push/fcm/message.go | 15 ++- output/webhook/webhook.go | 1 + tray/adder_config.go | 180 +++++++++++++++++++++++++ tray/adder_config_test.go | 245 +++++++++++++++++++++++++++++++++++ 6 files changed, 445 insertions(+), 10 deletions(-) create mode 100644 tray/adder_config.go create mode 100644 tray/adder_config_test.go diff --git a/input/chainsync/chainsync.go b/input/chainsync/chainsync.go index e2255cff..37627a3e 100644 --- a/input/chainsync/chainsync.go +++ b/input/chainsync/chainsync.go @@ -730,10 +730,16 @@ func getKupoClient(c *ChainSync) (*kugo.Client, error) { } // Validate URL first - _, err := url.ParseRequestURI(c.kupoUrl) + kupoURL, err := url.ParseRequestURI(c.kupoUrl) if err != nil { return nil, fmt.Errorf("invalid kupo URL: %w", err) } + if kupoURL.Scheme != "http" && kupoURL.Scheme != "https" { + return nil, fmt.Errorf("invalid kupo URL scheme: %s", kupoURL.Scheme) + } + if kupoURL.Host == "" { + return nil, errors.New("invalid kupo URL host") + } KugoCustomLogger := logging.NewKugoCustomLogger(logging.LevelInfo) @@ -748,17 +754,18 @@ func getKupoClient(c *ChainSync) (*kugo.Client, error) { Timeout: 2 * time.Second, } - healthUrl := strings.TrimRight(c.kupoUrl, "/") + "/health" + healthURL := kupoURL.JoinPath("health") // Create context with timeout ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthUrl, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthURL.String(), nil) if err != nil { return nil, fmt.Errorf("failed to create health check request: %w", err) } + // #nosec G704 -- Kupo endpoint is user-configured and validated before use. resp, err := httpClient.Do(req) if err != nil { // Handle different error types diff --git a/input/mempool/mempool.go b/input/mempool/mempool.go index 2b7426ac..c884d3f2 100644 --- a/input/mempool/mempool.go +++ b/input/mempool/mempool.go @@ -396,6 +396,7 @@ func (m *Mempool) getKupoClient() (*kugo.Client, error) { return nil, fmt.Errorf("failed to create health check request: %w", err) } httpClient := &http.Client{Timeout: kupoHealthTimeout} + // #nosec G704 -- Kupo endpoint is user-configured and validated before use. resp, err := httpClient.Do(req) if err != nil { switch { diff --git a/output/push/fcm/message.go b/output/push/fcm/message.go index 99b9edca..af2ac4a4 100644 --- a/output/push/fcm/message.go +++ b/output/push/fcm/message.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "net/http" + "net/url" ) type Message struct { @@ -73,10 +74,11 @@ func NewMessage(token string, opts ...MessageOption) (*Message, error) { } func Send(accessToken string, projectId string, msg *Message) error { - fcmEndpoint := fmt.Sprintf( - "https://fcm.googleapis.com/v1/projects/%s/messages:send", - projectId, - ) + fcmURL := &url.URL{ + Scheme: "https", + Host: "fcm.googleapis.com", + Path: fmt.Sprintf("/v1/projects/%s/messages:send", projectId), + } // Convert the message to JSON payload, err := json.Marshal(msg) @@ -84,14 +86,12 @@ func Send(accessToken string, projectId string, msg *Message) error { return err } - fmt.Println(string(payload)) - // Create a new HTTP request ctx := context.Background() req, err := http.NewRequestWithContext( ctx, http.MethodPost, - fcmEndpoint, + fcmURL.String(), bytes.NewBuffer(payload), ) if err != nil { @@ -104,6 +104,7 @@ func Send(accessToken string, projectId string, msg *Message) error { // Execute the request client := &http.Client{} + // #nosec G704 -- Request targets the fixed FCM host with a validated path. resp, err := client.Do(req) if err != nil { return err diff --git a/output/webhook/webhook.go b/output/webhook/webhook.go index ae329b40..e49390be 100644 --- a/output/webhook/webhook.go +++ b/output/webhook/webhook.go @@ -332,6 +332,7 @@ func (w *WebhookOutput) SendWebhook(e *event.Event) error { } client := &http.Client{Transport: customTransport} // Send payload + // #nosec G704 -- Webhook URL is user-configured and intentionally allowed. resp, err := client.Do(req) if err != nil { return fmt.Errorf("%w", err) diff --git a/tray/adder_config.go b/tray/adder_config.go new file mode 100644 index 00000000..d1d12cfc --- /dev/null +++ b/tray/adder_config.go @@ -0,0 +1,180 @@ +// Copyright 2026 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tray + +import ( + "errors" + "fmt" + "os" + "path/filepath" + + "gopkg.in/yaml.v3" +) + +// MonitorTemplate represents a pre-configured monitoring template. +type MonitorTemplate int + +const ( + WatchWallet MonitorTemplate = iota + TrackDRep + MonitorPool +) + +// String returns the display name of the template. +func (t MonitorTemplate) String() string { + switch t { + case WatchWallet: + return "Watch Wallet" + case TrackDRep: + return "Track DRep" + case MonitorPool: + return "Monitor Pool" + default: + return "Unknown" + } +} + +// AdderConfigParams holds the parameters for generating an adder +// configuration. +type AdderConfigParams struct { + Network string + Template MonitorTemplate + Param string // The address/drep/pool value depending on Template + Output string // Output plugin name (default: "log") + Format string // Output format (default: "json") +} + +// adderConfig is the internal structure matching adder's config.yaml +// format. We use map types for the plugins section to match adder's +// flexible config. +type adderConfig struct { + Input string `yaml:"input"` + Output string `yaml:"output"` + API adderAPIConfig `yaml:"api"` + Logging adderLoggingConfig `yaml:"logging"` + Plugins map[string]map[string]interface{} `yaml:"plugins"` +} + +type adderAPIConfig struct { + Address string `yaml:"address"` + Port uint `yaml:"port"` +} + +type adderLoggingConfig struct { + Level string `yaml:"level"` +} + +// GenerateAdderConfig builds an adder configuration from the given +// parameters. The API server is always enabled because adder-tray +// connects to its /events endpoint for desktop notifications and +// /healthcheck for status monitoring. +func GenerateAdderConfig(params AdderConfigParams) ([]byte, error) { + if params.Network == "" { + return nil, errors.New("network is required") + } + if params.Param == "" { + return nil, errors.New("filter parameter is required") + } + + output := params.Output + if output == "" { + output = "log" + } + format := params.Format + if format == "" { + format = "json" + } + + cfg := adderConfig{ + Input: "chainsync", + Output: output, + API: adderAPIConfig{ + Address: "127.0.0.1", + Port: 8080, + }, + Logging: adderLoggingConfig{ + Level: "info", + }, + Plugins: map[string]map[string]interface{}{ + "input": { + "chainsync": map[string]interface{}{ + "network": params.Network, + }, + }, + "output": { + output: map[string]interface{}{ + "format": format, + }, + }, + }, + } + + // Add filter config based on template + filterKey := templateFilterKey(params.Template) + if filterKey == "" { + return nil, fmt.Errorf("unsupported monitor template: %d", params.Template) + } + cfg.Plugins["filter"] = map[string]interface{}{ + "chainsync": map[string]interface{}{ + filterKey: params.Param, + }, + } + + return yaml.Marshal(&cfg) +} + +// templateFilterKey returns the filter configuration key for a +// template. Returns an empty string for unrecognized templates. +func templateFilterKey(t MonitorTemplate) string { + switch t { + case WatchWallet: + return "address" + case TrackDRep: + return "drep" + case MonitorPool: + return "pool" + default: + return "" + } +} + +// AdderConfigPath returns the path to the adder config file in the +// config directory. +func AdderConfigPath() string { + return filepath.Join(ConfigDir(), "config.yaml") +} + +// WriteAdderConfig writes the adder configuration to the config +// directory. +func WriteAdderConfig(params AdderConfigParams) error { + data, err := GenerateAdderConfig(params) + if err != nil { + return fmt.Errorf("generating config: %w", err) + } + + dir := ConfigDir() + if err := os.MkdirAll(dir, 0o700); err != nil { + return fmt.Errorf("creating config directory: %w", err) + } + if err := os.Chmod(dir, 0o700); err != nil { + return fmt.Errorf("setting config directory permissions: %w", err) + } + + if err := os.WriteFile(AdderConfigPath(), data, 0o600); err != nil { + return fmt.Errorf("writing config file: %w", err) + } + + return nil +} diff --git a/tray/adder_config_test.go b/tray/adder_config_test.go new file mode 100644 index 00000000..45afc9b0 --- /dev/null +++ b/tray/adder_config_test.go @@ -0,0 +1,245 @@ +// Copyright 2026 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tray + +import ( + "os" + "path/filepath" + "runtime" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" +) + +func TestMonitorTemplateString(t *testing.T) { + tests := []struct { + tmpl MonitorTemplate + want string + }{ + {WatchWallet, "Watch Wallet"}, + {TrackDRep, "Track DRep"}, + {MonitorPool, "Monitor Pool"}, + } + for _, tt := range tests { + assert.Equal(t, tt.want, tt.tmpl.String()) + } +} + +func TestGenerateAdderConfig_WatchWallet(t *testing.T) { + params := AdderConfigParams{ + Network: "mainnet", + Template: WatchWallet, + Param: "addr1qtest123", + } + data, err := GenerateAdderConfig(params) + require.NoError(t, err) + + // Unmarshal back and verify structure + var cfg map[string]interface{} + require.NoError(t, yaml.Unmarshal(data, &cfg)) + + assert.Equal(t, "chainsync", cfg["input"]) + assert.Equal(t, "log", cfg["output"]) + + // Verify API config + api, ok := cfg["api"].(map[string]interface{}) + require.True(t, ok, "api should be a map") + assert.Equal(t, "127.0.0.1", api["address"]) + assert.Equal(t, 8080, api["port"]) + + // Verify plugins structure + plugins, ok := cfg["plugins"].(map[string]interface{}) + require.True(t, ok, "plugins should be a map") + input, ok := plugins["input"].(map[string]interface{}) + require.True(t, ok, "plugins.input should be a map") + chainsync, ok := input["chainsync"].(map[string]interface{}) + require.True(t, ok, "plugins.input.chainsync should be a map") + assert.Equal(t, "mainnet", chainsync["network"]) + + // Verify filter + filter, ok := plugins["filter"].(map[string]interface{}) + require.True(t, ok, "plugins.filter should be a map") + filterChainsync, ok := filter["chainsync"].(map[string]interface{}) + require.True(t, ok, "plugins.filter.chainsync should be a map") + assert.Equal(t, "addr1qtest123", filterChainsync["address"]) +} + +func TestGenerateAdderConfig_TrackDRep(t *testing.T) { + params := AdderConfigParams{ + Network: "preview", + Template: TrackDRep, + Param: "drep1test456", + } + data, err := GenerateAdderConfig(params) + require.NoError(t, err) + + var cfg map[string]interface{} + require.NoError(t, yaml.Unmarshal(data, &cfg)) + + plugins, ok := cfg["plugins"].(map[string]interface{}) + require.True(t, ok, "plugins should be a map") + filter, ok := plugins["filter"].(map[string]interface{}) + require.True(t, ok, "plugins.filter should be a map") + filterChainsync, ok := filter["chainsync"].(map[string]interface{}) + require.True(t, ok, "plugins.filter.chainsync should be a map") + assert.Equal(t, "drep1test456", filterChainsync["drep"]) +} + +func TestGenerateAdderConfig_MonitorPool(t *testing.T) { + params := AdderConfigParams{ + Network: "mainnet", + Template: MonitorPool, + Param: "pool1test789", + } + data, err := GenerateAdderConfig(params) + require.NoError(t, err) + + var cfg map[string]interface{} + require.NoError(t, yaml.Unmarshal(data, &cfg)) + + plugins, ok := cfg["plugins"].(map[string]interface{}) + require.True(t, ok, "plugins should be a map") + filter, ok := plugins["filter"].(map[string]interface{}) + require.True(t, ok, "plugins.filter should be a map") + filterChainsync, ok := filter["chainsync"].(map[string]interface{}) + require.True(t, ok, "plugins.filter.chainsync should be a map") + assert.Equal(t, "pool1test789", filterChainsync["pool"]) +} + +func TestGenerateAdderConfig_CustomOutput(t *testing.T) { + params := AdderConfigParams{ + Network: "mainnet", + Template: WatchWallet, + Param: "addr1qtest", + Output: "webhook", + Format: "jsonl", + } + data, err := GenerateAdderConfig(params) + require.NoError(t, err) + + var cfg map[string]interface{} + require.NoError(t, yaml.Unmarshal(data, &cfg)) + + assert.Equal(t, "webhook", cfg["output"]) + plugins, ok := cfg["plugins"].(map[string]interface{}) + require.True(t, ok, "plugins should be a map") + output, ok := plugins["output"].(map[string]interface{}) + require.True(t, ok, "plugins.output should be a map") + webhook, ok := output["webhook"].(map[string]interface{}) + require.True(t, ok, "plugins.output.webhook should be a map") + assert.Equal(t, "jsonl", webhook["format"]) +} + +func TestGenerateAdderConfig_NetworkRequired(t *testing.T) { + params := AdderConfigParams{ + Template: WatchWallet, + Param: "addr1qtest", + } + _, err := GenerateAdderConfig(params) + assert.Error(t, err) + assert.Contains(t, err.Error(), "network") +} + +func TestGenerateAdderConfig_ParamRequired(t *testing.T) { + params := AdderConfigParams{ + Network: "mainnet", + Template: WatchWallet, + } + _, err := GenerateAdderConfig(params) + assert.Error(t, err) + assert.Contains(t, err.Error(), "parameter") +} + +func TestGenerateAdderConfig_RoundTrip(t *testing.T) { + params := AdderConfigParams{ + Network: "mainnet", + Template: WatchWallet, + Param: "addr1qtest", + } + data, err := GenerateAdderConfig(params) + require.NoError(t, err) + + // Should be valid YAML that can be unmarshaled and re-marshaled + var intermediate interface{} + require.NoError(t, yaml.Unmarshal(data, &intermediate)) + data2, err := yaml.Marshal(intermediate) + require.NoError(t, err) + assert.NotEmpty(t, data2) +} + +func TestAdderConfigPath(t *testing.T) { + path := AdderConfigPath() + assert.True(t, filepath.IsAbs(path)) + assert.Equal(t, "config.yaml", filepath.Base(path)) +} + +func TestWriteAdderConfig(t *testing.T) { + tmpDir := t.TempDir() + switch runtime.GOOS { + case "linux": + t.Setenv("XDG_CONFIG_HOME", tmpDir) + case "darwin": + t.Setenv("HOME", tmpDir) + case "windows": + t.Setenv("APPDATA", tmpDir) + default: + t.Skipf("unsupported platform: %s", runtime.GOOS) + } + + params := AdderConfigParams{ + Network: "mainnet", + Template: WatchWallet, + Param: "addr1qtest", + } + + err := WriteAdderConfig(params) + require.NoError(t, err) + + // Verify file exists + path := AdderConfigPath() + info, err := os.Stat(path) + require.NoError(t, err) + if runtime.GOOS != "windows" { + assert.Equal(t, os.FileMode(0o600), info.Mode().Perm()) + } + + // Verify content is valid YAML + data, err := os.ReadFile(path) + require.NoError(t, err) + + var cfg map[string]interface{} + require.NoError(t, yaml.Unmarshal(data, &cfg)) + assert.Equal(t, "chainsync", cfg["input"]) +} + +func TestTemplateFilterKey(t *testing.T) { + assert.Equal(t, "address", templateFilterKey(WatchWallet)) + assert.Equal(t, "drep", templateFilterKey(TrackDRep)) + assert.Equal(t, "pool", templateFilterKey(MonitorPool)) + assert.Equal(t, "", templateFilterKey(MonitorTemplate(99))) +} + +func TestGenerateAdderConfig_UnsupportedTemplate(t *testing.T) { + params := AdderConfigParams{ + Network: "mainnet", + Template: MonitorTemplate(99), + Param: "test", + } + _, err := GenerateAdderConfig(params) + assert.Error(t, err) + assert.Contains(t, err.Error(), "unsupported monitor template") +} From c648701107ad1261830754db23c754d20860742d Mon Sep 17 00:00:00 2001 From: Chris Gianelloni Date: Wed, 18 Feb 2026 17:16:31 -0500 Subject: [PATCH 2/2] feat(tray): enhanced menu Signed-off-by: Chris Gianelloni --- tray/app.go | 73 +++++++++ tray/events.go | 104 +++++++++++++ tray/health.go | 147 ++++++++++++++++++ tray/process.go | 209 ++++++++++++++++++++++++-- tray/process_test.go | 323 ++++++++++++++++++++++++++++++++++++++++ tray/service.go | 83 +++++++++++ tray/service_darwin.go | 137 +++++++++++++++++ tray/service_linux.go | 128 ++++++++++++++++ tray/service_test.go | 158 ++++++++++++++++++++ tray/service_windows.go | 80 ++++++++++ tray/status.go | 95 ++++++++++++ 11 files changed, 1524 insertions(+), 13 deletions(-) create mode 100644 tray/events.go create mode 100644 tray/health.go create mode 100644 tray/process_test.go create mode 100644 tray/service.go create mode 100644 tray/service_darwin.go create mode 100644 tray/service_linux.go create mode 100644 tray/service_test.go create mode 100644 tray/service_windows.go create mode 100644 tray/status.go diff --git a/tray/app.go b/tray/app.go index 41c8cd01..af753771 100644 --- a/tray/app.go +++ b/tray/app.go @@ -15,16 +15,25 @@ package tray import ( + "fmt" "log/slog" + "os/exec" + "runtime" "fyne.io/systray" ) +const ( + defaultAPIAddress = "127.0.0.1" + defaultAPIPort = 8080 +) + // App holds references to all major components of the tray // application. type App struct { config TrayConfig process *ProcessManager + status *StatusTracker } // NewApp creates and initialises the tray application. @@ -38,11 +47,17 @@ func NewApp() (*App, error) { cfg = DefaultConfig() } + status := NewStatusTracker() + a := &App{ config: cfg, + status: status, process: NewProcessManager( WithBinary(cfg.AdderBinary), WithConfigFile(cfg.AdderConfig), + WithStatusTracker(status), + WithAPIEndpoint(defaultAPIAddress, defaultAPIPort), + WithAutoRestart(true), ), } @@ -60,14 +75,44 @@ func (a *App) onReady() { systray.SetTitle("Adder") systray.SetTooltip("Adder - Cardano Event Streamer") + // Status display (disabled — updated via observer) + mStatus := systray.AddMenuItem( + "Status: Stopped", "Current adder status", + ) + mStatus.Disable() + + systray.AddSeparator() + mStart := systray.AddMenuItem("Start", "Start adder") mStop := systray.AddMenuItem("Stop", "Stop adder") mRestart := systray.AddMenuItem( "Restart", "Restart adder", ) + + systray.AddSeparator() + + mShowConfig := systray.AddMenuItem( + "Show Config Folder", "Open configuration directory", + ) + mShowLogs := systray.AddMenuItem( + "Show Logs", "Open log directory", + ) + + systray.AddSeparator() + + mAbout := systray.AddMenuItem( + "About Adder", "Show version information", + ) + systray.AddSeparator() + mQuit := systray.AddMenuItem("Quit", "Quit adder-tray") + // Update status menu item when status changes + a.status.OnChange(func(s Status) { + mStatus.SetTitle(fmt.Sprintf("Status: %s", s)) + }) + go func() { for { select { @@ -92,6 +137,12 @@ func (a *App) onReady() { "error", err, ) } + case <-mShowConfig.ClickedCh: + openFolder(ConfigDir()) + case <-mShowLogs.ClickedCh: + openFolder(LogDir()) + case <-mAbout.ClickedCh: + slog.Info("Adder - Cardano Event Streamer") case <-mQuit.ClickedCh: systray.Quit() return @@ -130,3 +181,25 @@ func (a *App) onExit() { func (a *App) Shutdown() { systray.Quit() } + +// openFolder opens the given directory in the platform's file +// manager. +func openFolder(dir string) { + var cmd string + switch runtime.GOOS { + case "darwin": + cmd = "open" + case "windows": + cmd = "explorer" + default: + cmd = "xdg-open" + } + + if err := exec.Command(cmd, dir).Start(); err != nil { //nolint:gosec // command selected by platform, dir from internal paths + slog.Error( + "failed to open folder", + "dir", dir, + "error", err, + ) + } +} diff --git a/tray/events.go b/tray/events.go new file mode 100644 index 00000000..5cd6d657 --- /dev/null +++ b/tray/events.go @@ -0,0 +1,104 @@ +// Copyright 2026 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tray + +import ( + "bufio" + "encoding/json" + "io" + "log/slog" + + "github.com/blinklabs-io/adder/event" +) + +const eventChannelBuffer = 64 + +// EventParser reads newline-delimited JSON events from a reader and +// sends parsed events to a channel. +type EventParser struct { + scanner *bufio.Scanner + events chan event.Event + done chan struct{} +} + +// NewEventParser creates a new EventParser that reads from r using +// the given buffer size for the scanner. +func NewEventParser(r io.Reader, bufSize int) *EventParser { + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, bufSize), bufSize) + return &EventParser{ + scanner: scanner, + events: make(chan event.Event, eventChannelBuffer), + done: make(chan struct{}), + } +} + +// Start begins parsing events in a background goroutine. The events +// channel is closed when the reader returns EOF or an error, or when +// Stop is called. +func (ep *EventParser) Start() { + go ep.run() +} + +// Stop signals the event parser to stop. Note that the parser will +// also stop naturally when the underlying reader is closed. +func (ep *EventParser) Stop() { + select { + case <-ep.done: + default: + close(ep.done) + } +} + +// Events returns a read-only channel of parsed events. +func (ep *EventParser) Events() <-chan event.Event { + return ep.events +} + +func (ep *EventParser) run() { + defer close(ep.events) + + for ep.scanner.Scan() { + select { + case <-ep.done: + return + default: + } + + line := ep.scanner.Bytes() + if len(line) == 0 { + continue + } + + var evt event.Event + if err := json.Unmarshal(line, &evt); err != nil { + slog.Warn( + "skipping malformed event line", + "error", err, + ) + continue + } + + select { + case ep.events <- evt: + case <-ep.done: + return + } + } + + if err := ep.scanner.Err(); err != nil { + slog.Debug("event parser scanner error", "error", err) + } +} diff --git a/tray/health.go b/tray/health.go new file mode 100644 index 00000000..ac15bc3f --- /dev/null +++ b/tray/health.go @@ -0,0 +1,147 @@ +// Copyright 2026 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tray + +import ( + "encoding/json" + "fmt" + "log/slog" + "net/http" + "time" +) + +const ( + healthPollInterval = 10 * time.Second + healthHTTPTimeout = 5 * time.Second + healthConsecutiveFailCap = 3 +) + +// HealthResponse represents the JSON response from the adder +// healthcheck endpoint. +type HealthResponse struct { + Failed bool `json:"failed"` + Reason string `json:"reason,omitempty"` +} + +// HealthPoller periodically checks the adder HTTP healthcheck +// endpoint and updates a StatusTracker accordingly. +type HealthPoller struct { + address string + port uint + tracker *StatusTracker + client *http.Client + stopCh chan struct{} + consecutiveFailures int +} + +// NewHealthPoller creates a new HealthPoller that will poll the given +// address and port, updating the provided StatusTracker. +func NewHealthPoller( + address string, + port uint, + tracker *StatusTracker, +) *HealthPoller { + return &HealthPoller{ + address: address, + port: port, + tracker: tracker, + client: &http.Client{ + Timeout: healthHTTPTimeout, + }, + stopCh: make(chan struct{}), + } +} + +// Start begins periodic health polling in a background goroutine. +func (hp *HealthPoller) Start() { + go hp.run() +} + +// Stop signals the health poller to stop. It is safe to call +// multiple times. +func (hp *HealthPoller) Stop() { + select { + case <-hp.stopCh: + default: + close(hp.stopCh) + } +} + +func (hp *HealthPoller) run() { + ticker := time.NewTicker(healthPollInterval) + defer ticker.Stop() + + for { + select { + case <-hp.stopCh: + return + case <-ticker.C: + hp.poll() + } + } +} + +func (hp *HealthPoller) poll() { + resp, err := hp.client.Get(hp.healthURL()) //nolint:noctx // short-lived poller with client timeout + if err != nil || resp == nil { + hp.consecutiveFailures++ + slog.Debug( + "health poll failed", + "error", err, + "consecutive_failures", hp.consecutiveFailures, + ) + if hp.consecutiveFailures >= healthConsecutiveFailCap { + hp.tracker.Set(StatusError) + } + return + } + defer resp.Body.Close() + + var hr HealthResponse + if err := json.NewDecoder(resp.Body).Decode(&hr); err != nil { + hp.consecutiveFailures++ + slog.Debug( + "health poll decode failed", + "error", err, + "consecutive_failures", hp.consecutiveFailures, + ) + if hp.consecutiveFailures >= healthConsecutiveFailCap { + hp.tracker.Set(StatusError) + } + return + } + + if hr.Failed { + hp.consecutiveFailures++ + slog.Debug( + "health poll reported failure", + "reason", hr.Reason, + "consecutive_failures", hp.consecutiveFailures, + ) + if hp.consecutiveFailures >= healthConsecutiveFailCap { + hp.tracker.Set(StatusError) + } + return + } + + // Healthy response: reset failures and mark connected + hp.consecutiveFailures = 0 + hp.tracker.Set(StatusConnected) +} + +// healthURL returns the full URL of the healthcheck endpoint. +func (hp *HealthPoller) healthURL() string { + return fmt.Sprintf("http://%s:%d/healthcheck", hp.address, hp.port) +} diff --git a/tray/process.go b/tray/process.go index 447e65db..de1317d9 100644 --- a/tray/process.go +++ b/tray/process.go @@ -23,9 +23,19 @@ import ( "runtime" "sync" "time" + + "github.com/blinklabs-io/adder/event" ) -const stopTimeout = 10 * time.Second +const ( + stopTimeout = 10 * time.Second + + // Backoff constants for automatic restart. + initialBackoff = 1 * time.Second + maxBackoff = 60 * time.Second + healthyRunTime = 30 * time.Second + stdoutBufSize = 1024 * 1024 // 1 MB scanner buffer +) // ProcessManager manages the lifecycle of the adder subprocess. type ProcessManager struct { @@ -34,6 +44,23 @@ type ProcessManager struct { done chan struct{} binary string cfgFile string + + // Status tracking + status *StatusTracker + + // Health polling + healthPoller *HealthPoller + apiAddress string + apiPort uint + + // Event parsing + eventParser *EventParser + events chan event.Event + + // Auto-restart with backoff + autoRestart bool + restartCount int + lastStart time.Time } // ProcessManagerOption is a functional option for ProcessManager. @@ -53,12 +80,41 @@ func WithConfigFile(path string) ProcessManagerOption { } } +// WithStatusTracker sets a StatusTracker for the process manager +// to report status changes. +func WithStatusTracker(t *StatusTracker) ProcessManagerOption { + return func(pm *ProcessManager) { + pm.status = t + } +} + +// WithAPIEndpoint configures the adder API address and port for +// health polling. +func WithAPIEndpoint( + address string, + port uint, +) ProcessManagerOption { + return func(pm *ProcessManager) { + pm.apiAddress = address + pm.apiPort = port + } +} + +// WithAutoRestart enables or disables automatic restart with +// exponential backoff when the adder process crashes. +func WithAutoRestart(enabled bool) ProcessManagerOption { + return func(pm *ProcessManager) { + pm.autoRestart = enabled + } +} + // NewProcessManager creates a new ProcessManager with the given options. func NewProcessManager( opts ...ProcessManagerOption, ) *ProcessManager { pm := &ProcessManager{ binary: "adder", + events: make(chan event.Event, eventChannelBuffer), } for _, opt := range opts { opt(pm) @@ -66,6 +122,12 @@ func NewProcessManager( return pm } +// Events returns a read-only channel of events parsed from the +// adder process stdout. +func (pm *ProcessManager) Events() <-chan event.Event { + return pm.events +} + // Start launches the adder process. Returns an error if it is already // running. func (pm *ProcessManager) Start() error { @@ -76,17 +138,33 @@ func (pm *ProcessManager) Start() error { return errors.New("adder process is already running") } + if pm.status != nil { + pm.status.Set(StatusStarting) + } + args := []string{} if pm.cfgFile != "" { args = append(args, "--config", pm.cfgFile) } pm.cmd = exec.Command(pm.binary, args...) //nolint:gosec // binary path from user config - pm.cmd.Stdout = os.Stdout pm.cmd.Stderr = os.Stderr + // Capture stdout for event parsing + stdout, err := pm.cmd.StdoutPipe() + if err != nil { + pm.cmd = nil + if pm.status != nil { + pm.status.Set(StatusError) + } + return fmt.Errorf("creating stdout pipe: %w", err) + } + if err := pm.cmd.Start(); err != nil { pm.cmd = nil + if pm.status != nil { + pm.status.Set(StatusError) + } return fmt.Errorf("starting adder: %w", err) } @@ -95,24 +173,126 @@ func (pm *ProcessManager) Start() error { "pid", pm.cmd.Process.Pid, ) - // Wait for process in background so we can detect exits - pm.done = make(chan struct{}) + pm.lastStart = time.Now() + + // Start event parser on stdout + pm.eventParser = NewEventParser(stdout, stdoutBufSize) + pm.eventParser.Start() + + // Forward parsed events to the ProcessManager events channel go func() { - err := pm.cmd.Wait() - pm.mu.Lock() - defer pm.mu.Unlock() - pm.cmd = nil - close(pm.done) - if err != nil { - slog.Warn("adder process exited with error", "error", err) - } else { - slog.Info("adder process exited") + for evt := range pm.eventParser.Events() { + select { + case pm.events <- evt: + default: + slog.Debug("event channel full, dropping event") + } } }() + // Start health poller if API endpoint is configured + if pm.apiAddress != "" { + tracker := pm.status + if tracker == nil { + tracker = NewStatusTracker() + } + pm.healthPoller = NewHealthPoller( + pm.apiAddress, + pm.apiPort, + tracker, + ) + pm.healthPoller.Start() + } + + // Wait for process in background so we can detect exits + pm.done = make(chan struct{}) + go pm.waitForExit() + return nil } +func (pm *ProcessManager) waitForExit() { + waitErr := pm.cmd.Wait() + + pm.mu.Lock() + + // Stop event parser + if pm.eventParser != nil { + pm.eventParser.Stop() + pm.eventParser = nil + } + + // Stop health poller + if pm.healthPoller != nil { + pm.healthPoller.Stop() + pm.healthPoller = nil + } + + lastStart := pm.lastStart + shouldRestart := pm.autoRestart && waitErr != nil + restartCount := pm.restartCount + + pm.cmd = nil + close(pm.done) + + if waitErr != nil { + if pm.status != nil { + pm.status.Set(StatusError) + } + slog.Warn("adder process exited with error", "error", waitErr) + } else { + if pm.status != nil { + pm.status.Set(StatusStopped) + } + slog.Info("adder process exited") + } + + pm.mu.Unlock() + + // Auto-restart with exponential backoff on crash + if shouldRestart { + // Reset restart count if the process ran long enough + if time.Since(lastStart) >= healthyRunTime { + restartCount = 0 + } + delay := backoffDelay(restartCount) + + pm.mu.Lock() + pm.restartCount = restartCount + 1 + pm.mu.Unlock() + + slog.Info( + "scheduling automatic restart", + "delay", delay, + "restart_count", restartCount+1, + ) + + go func() { + time.Sleep(delay) + if err := pm.Start(); err != nil { + slog.Error( + "automatic restart failed", + "error", err, + ) + } + }() + } +} + +// backoffDelay calculates the exponential backoff delay for the +// given restart count. The delay starts at 1s, doubles each time, +// and is capped at 60s. +func backoffDelay(restartCount int) time.Duration { + delay := initialBackoff + for range restartCount { + delay *= 2 + if delay >= maxBackoff { + return maxBackoff + } + } + return delay +} + // Stop terminates the running adder process gracefully. func (pm *ProcessManager) Stop() error { pm.mu.Lock() @@ -122,6 +302,9 @@ func (pm *ProcessManager) Stop() error { return nil } + // Disable auto-restart when explicitly stopped + pm.autoRestart = false + slog.Info("stopping adder process", "pid", pm.cmd.Process.Pid) done := pm.done diff --git a/tray/process_test.go b/tray/process_test.go new file mode 100644 index 00000000..b023d0ed --- /dev/null +++ b/tray/process_test.go @@ -0,0 +1,323 @@ +// Copyright 2026 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tray + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// --------------------------------------------------------------------------- +// StatusTracker tests +// --------------------------------------------------------------------------- + +func TestStatusTracker_InitialState(t *testing.T) { + st := NewStatusTracker() + assert.Equal(t, StatusStopped, st.Get()) +} + +func TestStatusTracker_SetGet(t *testing.T) { + st := NewStatusTracker() + + st.Set(StatusStarting) + assert.Equal(t, StatusStarting, st.Get()) + + st.Set(StatusConnected) + assert.Equal(t, StatusConnected, st.Get()) + + st.Set(StatusError) + assert.Equal(t, StatusError, st.Get()) +} + +func TestStatusTracker_OnChange(t *testing.T) { + st := NewStatusTracker() + + var mu sync.Mutex + var captured []Status + + st.OnChange(func(s Status) { + mu.Lock() + defer mu.Unlock() + captured = append(captured, s) + }) + + st.Set(StatusStarting) + st.Set(StatusConnected) + // Setting to the same value should not trigger callback + st.Set(StatusConnected) + st.Set(StatusError) + + mu.Lock() + defer mu.Unlock() + assert.Equal(t, []Status{StatusStarting, StatusConnected, StatusError}, captured) +} + +func TestStatusTracker_String(t *testing.T) { + statuses := []Status{ + StatusStopped, + StatusStarting, + StatusConnected, + StatusReconnecting, + StatusError, + } + + for _, s := range statuses { + assert.NotEmpty(t, s.String(), "Status %q should have a non-empty string", s) + } +} + +// --------------------------------------------------------------------------- +// EventParser tests +// --------------------------------------------------------------------------- + +func TestEventParser_ValidJSON(t *testing.T) { + r, w := io.Pipe() + ep := NewEventParser(r, 1024*1024) + ep.Start() + t.Cleanup(func() { + ep.Stop() + }) + + evt := map[string]any{ + "type": "chainsync.block", + "timestamp": "2026-01-15T10:30:00Z", + "payload": map[string]any{"slot": 12345}, + } + data, err := json.Marshal(evt) + require.NoError(t, err) + + _, err = fmt.Fprintf(w, "%s\n", data) + require.NoError(t, err) + + select { + case parsed, ok := <-ep.Events(): + require.True(t, ok, "events channel should be open") + assert.Equal(t, "chainsync.block", parsed.Type) + assert.False(t, parsed.Timestamp.IsZero()) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event") + } + + require.NoError(t, w.Close()) +} + +func TestEventParser_MalformedLines(t *testing.T) { + r, w := io.Pipe() + ep := NewEventParser(r, 1024*1024) + ep.Start() + t.Cleanup(func() { + ep.Stop() + }) + + // Write malformed line followed by a valid line + _, err := fmt.Fprintln(w, "this is not json") + require.NoError(t, err) + + evt := map[string]any{ + "type": "chainsync.rollback", + "timestamp": "2026-01-15T10:31:00Z", + "payload": map[string]any{}, + } + data, err := json.Marshal(evt) + require.NoError(t, err) + _, err = fmt.Fprintf(w, "%s\n", data) + require.NoError(t, err) + + // Should skip malformed and deliver valid event + select { + case parsed, ok := <-ep.Events(): + require.True(t, ok) + assert.Equal(t, "chainsync.rollback", parsed.Type) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for event after malformed line") + } + + require.NoError(t, w.Close()) +} + +func TestEventParser_EOF(t *testing.T) { + r := strings.NewReader("") + ep := NewEventParser(r, 1024*1024) + ep.Start() + + // Channel should close on EOF + select { + case _, ok := <-ep.Events(): + assert.False(t, ok, "events channel should be closed on EOF") + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for events channel to close") + } +} + +// --------------------------------------------------------------------------- +// HealthPoller tests +// --------------------------------------------------------------------------- + +func TestHealthPoller_URLConstruction(t *testing.T) { + tracker := NewStatusTracker() + hp := NewHealthPoller("127.0.0.1", 8080, tracker) + assert.Equal(t, "http://127.0.0.1:8080/healthcheck", hp.healthURL()) +} + +func TestHealthPoller_HealthyResponse(t *testing.T) { + tracker := NewStatusTracker() + tracker.Set(StatusStarting) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"failed":false}`)) + })) + defer srv.Close() + + // Parse the test server address + hp := newHealthPollerFromURL(t, srv.URL, tracker) + hp.poll() + + assert.Equal(t, StatusConnected, tracker.Get()) +} + +func TestHealthPoller_UnhealthyResponse(t *testing.T) { + tracker := NewStatusTracker() + tracker.Set(StatusStarting) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(`{"failed":true,"reason":"pipeline is not running"}`)) + })) + defer srv.Close() + + hp := newHealthPollerFromURL(t, srv.URL, tracker) + + // First two polls should not yet set error + hp.poll() + assert.NotEqual(t, StatusError, tracker.Get()) + hp.poll() + assert.NotEqual(t, StatusError, tracker.Get()) + + // Third consecutive failure should set error + hp.poll() + assert.Equal(t, StatusError, tracker.Get()) +} + +func TestHealthPoller_RecoveryAfterError(t *testing.T) { + tracker := NewStatusTracker() + tracker.Set(StatusStarting) + + var mu sync.Mutex + healthy := false + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + mu.Lock() + isHealthy := healthy + mu.Unlock() + + w.Header().Set("Content-Type", "application/json") + if isHealthy { + _, _ = w.Write([]byte(`{"failed":false}`)) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(`{"failed":true,"reason":"pipeline is not running"}`)) + } + })) + defer srv.Close() + + hp := newHealthPollerFromURL(t, srv.URL, tracker) + + // Drive to error state + hp.poll() + hp.poll() + hp.poll() + assert.Equal(t, StatusError, tracker.Get()) + + // Switch to healthy + mu.Lock() + healthy = true + mu.Unlock() + + hp.poll() + assert.Equal(t, StatusConnected, tracker.Get()) +} + +// newHealthPollerFromURL creates a HealthPoller targeting a test server. +func newHealthPollerFromURL( + t *testing.T, + serverURL string, + tracker *StatusTracker, +) *HealthPoller { + t.Helper() + + u, err := url.Parse(serverURL) + require.NoError(t, err, "failed to parse test server URL") + + host := u.Hostname() + portStr := u.Port() + portVal, err := strconv.ParseUint(portStr, 10, 32) + require.NoError(t, err, "failed to parse port from test server URL") + + hp := &HealthPoller{ + address: host, + port: uint(portVal), + tracker: tracker, + client: &http.Client{ + Timeout: healthHTTPTimeout, + }, + stopCh: make(chan struct{}), + } + return hp +} + +// --------------------------------------------------------------------------- +// Backoff tests +// --------------------------------------------------------------------------- + +func TestBackoffDelay(t *testing.T) { + tests := []struct { + restartCount int + expected time.Duration + }{ + {0, 1 * time.Second}, + {1, 2 * time.Second}, + {2, 4 * time.Second}, + {3, 8 * time.Second}, + {4, 16 * time.Second}, + {5, 32 * time.Second}, + {6, 60 * time.Second}, // capped + {7, 60 * time.Second}, // still capped + {10, 60 * time.Second}, // still capped + } + + for _, tt := range tests { + t.Run( + fmt.Sprintf("count_%d", tt.restartCount), + func(t *testing.T) { + got := backoffDelay(tt.restartCount) + assert.Equal(t, tt.expected, got) + }, + ) + } +} diff --git a/tray/service.go b/tray/service.go new file mode 100644 index 00000000..da11c099 --- /dev/null +++ b/tray/service.go @@ -0,0 +1,83 @@ +// Copyright 2026 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tray + +import "errors" + +// ServiceStatus represents the state of the system service. +type ServiceStatus int + +const ( + // ServiceNotRegistered indicates no service is installed. + ServiceNotRegistered ServiceStatus = iota + // ServiceRegistered indicates the service is installed but not running. + ServiceRegistered + // ServiceRunning indicates the service is installed and currently running. + ServiceRunning +) + +// String returns a human-readable representation of the ServiceStatus. +func (s ServiceStatus) String() string { + switch s { + case ServiceNotRegistered: + return "not registered" + case ServiceRegistered: + return "registered" + case ServiceRunning: + return "running" + default: + return "unknown" + } +} + +// ServiceConfig holds the configuration needed for service registration. +type ServiceConfig struct { + // BinaryPath is the absolute path to the adder binary. + BinaryPath string + // ConfigPath is the optional path to the adder configuration file. + ConfigPath string + // LogDir is the directory for log output (used by platforms that + // do not support journal-style logging). + LogDir string +} + +// Validate checks that the ServiceConfig contains the minimum required fields. +func (c ServiceConfig) Validate() error { + if c.BinaryPath == "" { + return errors.New("binary path must not be empty") + } + return nil +} + +// RegisterService installs adder as a system service using the +// platform-specific mechanism (systemd user unit, launchd plist, +// or Windows Task Scheduler). +func RegisterService(cfg ServiceConfig) error { + if err := cfg.Validate(); err != nil { + return err + } + return registerService(cfg) +} + +// UnregisterService removes the adder system service. +func UnregisterService() error { + return unregisterService() +} + +// ServiceStatusCheck returns the current status of the adder system +// service. +func ServiceStatusCheck() (ServiceStatus, error) { + return serviceStatusCheck() +} diff --git a/tray/service_darwin.go b/tray/service_darwin.go new file mode 100644 index 00000000..bbd1a95f --- /dev/null +++ b/tray/service_darwin.go @@ -0,0 +1,137 @@ +// Copyright 2026 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build darwin + +package tray + +import ( + "bytes" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "text/template" +) + +const launchAgentLabel = "io.blinklabs.adder" +const launchAgentFile = "io.blinklabs.adder.plist" + +const plistTemplate = ` + + + + Label + io.blinklabs.adder + ProgramArguments + + {{.BinaryPath}} + {{- if .ConfigPath}} + --config + {{.ConfigPath}} + {{- end}} + + RunAtLoad + + KeepAlive + + StandardOutPath + {{.LogDir}}/adder.stdout.log + StandardErrorPath + {{.LogDir}}/adder.stderr.log + + +` + +// servicePlistDir returns the LaunchAgents directory. +func servicePlistDir() string { + return filepath.Join(homeOrTmp(), "Library", "LaunchAgents") +} + +// servicePlistPath returns the full path to the adder LaunchAgent plist. +func servicePlistPath() string { + return filepath.Join(servicePlistDir(), launchAgentFile) +} + +// renderPlist renders the LaunchAgent plist template with the given config. +func renderPlist(cfg ServiceConfig) ([]byte, error) { + tmpl, err := template.New("plist").Parse(plistTemplate) + if err != nil { + return nil, fmt.Errorf("parsing plist template: %w", err) + } + var buf bytes.Buffer + if err := tmpl.Execute(&buf, cfg); err != nil { + return nil, fmt.Errorf("rendering plist template: %w", err) + } + return buf.Bytes(), nil +} + +func registerService(cfg ServiceConfig) error { + if cfg.LogDir == "" { + cfg.LogDir = LogDir() + } + + plistDir := servicePlistDir() + if err := os.MkdirAll(plistDir, 0o755); err != nil { + return fmt.Errorf("creating LaunchAgents dir: %w", err) + } + + if err := os.MkdirAll(cfg.LogDir, 0o755); err != nil { + return fmt.Errorf("creating log dir: %w", err) + } + + data, err := renderPlist(cfg) + if err != nil { + return err + } + + if err := os.WriteFile(servicePlistPath(), data, 0o644); err != nil { + return fmt.Errorf("writing plist file: %w", err) + } + + if out, err := exec.Command( + "launchctl", "load", "-w", servicePlistPath(), + ).CombinedOutput(); err != nil { + return fmt.Errorf("loading launch agent: %s: %w", strings.TrimSpace(string(out)), err) + } + + return nil +} + +func unregisterService() error { + if out, err := exec.Command( + "launchctl", "unload", servicePlistPath(), + ).CombinedOutput(); err != nil { + return fmt.Errorf("unloading launch agent: %s: %w", strings.TrimSpace(string(out)), err) + } + + if err := os.Remove(servicePlistPath()); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("removing plist file: %w", err) + } + + return nil +} + +func serviceStatusCheck() (ServiceStatus, error) { + if _, err := os.Stat(servicePlistPath()); os.IsNotExist(err) { + return ServiceNotRegistered, nil + } + + if err := exec.Command("launchctl", "list", launchAgentLabel).Run(); err == nil { + return ServiceRunning, nil + } + + return ServiceRegistered, nil +} diff --git a/tray/service_linux.go b/tray/service_linux.go new file mode 100644 index 00000000..50b462e1 --- /dev/null +++ b/tray/service_linux.go @@ -0,0 +1,128 @@ +// Copyright 2026 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux + +package tray + +import ( + "bytes" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "text/template" +) + +const serviceName = "adder.service" + +const serviceUnitTemplate = `[Unit] +Description=Adder - Cardano Event Streamer +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +ExecStart={{.BinaryPath}}{{if .ConfigPath}} --config {{.ConfigPath}}{{end}} +Restart=on-failure +RestartSec=5 +StandardOutput=journal +StandardError=journal +Environment=HOME=%h + +[Install] +WantedBy=default.target +` + +// serviceUnitDir returns the systemd user unit directory. +func serviceUnitDir() string { + if dir := os.Getenv("XDG_CONFIG_HOME"); dir != "" { + return filepath.Join(dir, "systemd", "user") + } + return filepath.Join(homeOrTmp(), ".config", "systemd", "user") +} + +// serviceUnitPath returns the full path to the adder systemd unit file. +func serviceUnitPath() string { + return filepath.Join(serviceUnitDir(), serviceName) +} + +// renderUnit renders the systemd unit template with the given config. +func renderUnit(cfg ServiceConfig) ([]byte, error) { + tmpl, err := template.New("unit").Parse(serviceUnitTemplate) + if err != nil { + return nil, fmt.Errorf("parsing unit template: %w", err) + } + var buf bytes.Buffer + if err := tmpl.Execute(&buf, cfg); err != nil { + return nil, fmt.Errorf("rendering unit template: %w", err) + } + return buf.Bytes(), nil +} + +func registerService(cfg ServiceConfig) error { + unitDir := serviceUnitDir() + if err := os.MkdirAll(unitDir, 0o755); err != nil { + return fmt.Errorf("creating systemd user dir: %w", err) + } + + data, err := renderUnit(cfg) + if err != nil { + return err + } + + if err := os.WriteFile(serviceUnitPath(), data, 0o644); err != nil { //nolint:gosec // systemd unit files need 0644 permissions + return fmt.Errorf("writing unit file: %w", err) + } + + if out, err := exec.Command("systemctl", "--user", "daemon-reload").CombinedOutput(); err != nil { + return fmt.Errorf("daemon-reload: %s: %w", strings.TrimSpace(string(out)), err) + } + + if out, err := exec.Command("systemctl", "--user", "enable", serviceName).CombinedOutput(); err != nil { + return fmt.Errorf("enabling service: %s: %w", strings.TrimSpace(string(out)), err) + } + + return nil +} + +func unregisterService() error { + if out, err := exec.Command("systemctl", "--user", "disable", serviceName).CombinedOutput(); err != nil { + return fmt.Errorf("disabling service: %s: %w", strings.TrimSpace(string(out)), err) + } + + if err := os.Remove(serviceUnitPath()); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("removing unit file: %w", err) + } + + if out, err := exec.Command("systemctl", "--user", "daemon-reload").CombinedOutput(); err != nil { + return fmt.Errorf("daemon-reload: %s: %w", strings.TrimSpace(string(out)), err) + } + + return nil +} + +func serviceStatusCheck() (ServiceStatus, error) { + if _, err := os.Stat(serviceUnitPath()); os.IsNotExist(err) { + return ServiceNotRegistered, nil + } + + out, err := exec.Command("systemctl", "--user", "is-active", serviceName).Output() + if err == nil && strings.TrimSpace(string(out)) == "active" { + return ServiceRunning, nil + } + + return ServiceRegistered, nil +} diff --git a/tray/service_test.go b/tray/service_test.go new file mode 100644 index 00000000..2f98bbcb --- /dev/null +++ b/tray/service_test.go @@ -0,0 +1,158 @@ +// Copyright 2026 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tray + +import ( + "os" + "path/filepath" + "runtime" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestServiceStatusString(t *testing.T) { + tests := []struct { + status ServiceStatus + want string + }{ + {ServiceNotRegistered, "not registered"}, + {ServiceRegistered, "registered"}, + {ServiceRunning, "running"}, + {ServiceStatus(99), "unknown"}, + } + for _, tt := range tests { + assert.Equal(t, tt.want, tt.status.String()) + } +} + +func TestServiceConfigValidate(t *testing.T) { + t.Run("empty binary path", func(t *testing.T) { + cfg := ServiceConfig{} + err := cfg.Validate() + require.Error(t, err) + assert.Contains(t, err.Error(), "binary path") + }) + + t.Run("valid config", func(t *testing.T) { + cfg := ServiceConfig{BinaryPath: "/usr/bin/adder"} + err := cfg.Validate() + require.NoError(t, err) + }) +} + +func TestRegisterServiceValidation(t *testing.T) { + err := RegisterService(ServiceConfig{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "binary path") +} + +func TestServiceUnitDir(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("systemd unit dir only applies on Linux") + } + + t.Run("uses XDG_CONFIG_HOME", func(t *testing.T) { + tmpDir := t.TempDir() + t.Setenv("XDG_CONFIG_HOME", tmpDir) + + dir := serviceUnitDir() + assert.True( + t, + strings.HasPrefix(dir, tmpDir), + "serviceUnitDir should start with XDG_CONFIG_HOME", + ) + assert.True( + t, + strings.HasSuffix(dir, filepath.Join("systemd", "user")), + "serviceUnitDir should end with systemd/user", + ) + }) + + t.Run("fallback without XDG_CONFIG_HOME", func(t *testing.T) { + t.Setenv("XDG_CONFIG_HOME", "") + + dir := serviceUnitDir() + assert.Contains(t, dir, ".config") + assert.Contains(t, dir, filepath.Join("systemd", "user")) + }) +} + +func TestServiceUnitPath(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("systemd unit path only applies on Linux") + } + + path := serviceUnitPath() + assert.True( + t, + strings.HasSuffix(path, "adder.service"), + "serviceUnitPath should end with adder.service", + ) +} + +func TestServiceUnitTemplate(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("systemd unit template only applies on Linux") + } + + cfg := ServiceConfig{ + BinaryPath: "/usr/bin/adder", + ConfigPath: "/etc/adder.yaml", + } + + data, err := renderUnit(cfg) + require.NoError(t, err) + + content := string(data) + assert.Contains(t, content, "[Unit]") + assert.Contains(t, content, "[Service]") + assert.Contains(t, content, "[Install]") + assert.Contains( + t, content, + "ExecStart=/usr/bin/adder --config /etc/adder.yaml", + ) + assert.Contains(t, content, "Restart=on-failure") + assert.Contains(t, content, "WantedBy=default.target") + + // Verify it can be written and read back + tmpDir := t.TempDir() + unitPath := filepath.Join(tmpDir, "adder.service") + err = os.WriteFile(unitPath, data, 0o644) + require.NoError(t, err) + + readBack, err := os.ReadFile(unitPath) + require.NoError(t, err) + assert.Equal(t, content, string(readBack)) +} + +func TestServiceUnitTemplateNoConfig(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("systemd unit template only applies on Linux") + } + + cfg := ServiceConfig{ + BinaryPath: "/usr/bin/adder", + } + + data, err := renderUnit(cfg) + require.NoError(t, err) + + content := string(data) + assert.Contains(t, content, "ExecStart=/usr/bin/adder") + assert.NotContains(t, content, "--config") +} diff --git a/tray/service_windows.go b/tray/service_windows.go new file mode 100644 index 00000000..686e4c98 --- /dev/null +++ b/tray/service_windows.go @@ -0,0 +1,80 @@ +// Copyright 2026 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build windows + +package tray + +import ( + "fmt" + "os/exec" + "strings" +) + +const taskName = "Adder" + +func registerService(cfg ServiceConfig) error { + command := fmt.Sprintf(`"%s"`, cfg.BinaryPath) + if cfg.ConfigPath != "" { + command = fmt.Sprintf(`"%s" --config "%s"`, cfg.BinaryPath, cfg.ConfigPath) + } + + out, err := exec.Command( //nolint:gosec // command args constructed from validated config + "schtasks.exe", + "/Create", + "/TN", taskName, + "/SC", "ONLOGON", + "/TR", command, + "/RL", "LIMITED", + "/F", + ).CombinedOutput() + if err != nil { + return fmt.Errorf("creating scheduled task: %s: %w", strings.TrimSpace(string(out)), err) + } + + return nil +} + +func unregisterService() error { + out, err := exec.Command( + "schtasks.exe", + "/Delete", + "/TN", taskName, + "/F", + ).CombinedOutput() + if err != nil { + return fmt.Errorf("deleting scheduled task: %s: %w", strings.TrimSpace(string(out)), err) + } + + return nil +} + +func serviceStatusCheck() (ServiceStatus, error) { + out, err := exec.Command( + "schtasks.exe", + "/Query", + "/TN", taskName, + "/FO", "CSV", + "/NH", + ).CombinedOutput() + if err != nil { + return ServiceNotRegistered, nil + } + + if strings.Contains(string(out), "Running") { + return ServiceRunning, nil + } + + return ServiceRegistered, nil +} diff --git a/tray/status.go b/tray/status.go new file mode 100644 index 00000000..e027bdd7 --- /dev/null +++ b/tray/status.go @@ -0,0 +1,95 @@ +// Copyright 2026 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tray + +import "sync" + +// Status represents the current state of the adder process. +type Status string + +const ( + StatusStopped Status = "stopped" + StatusStarting Status = "starting" + StatusConnected Status = "connected" + StatusReconnecting Status = "reconnecting" + StatusError Status = "error" +) + +// String returns a human-readable label for the status. +func (s Status) String() string { + switch s { + case StatusStopped: + return "Stopped" + case StatusStarting: + return "Starting" + case StatusConnected: + return "Connected" + case StatusReconnecting: + return "Reconnecting" + case StatusError: + return "Error" + default: + return "Unknown" + } +} + +// StatusTracker provides thread-safe tracking of the adder process +// status with an optional change observer callback. +type StatusTracker struct { + mu sync.Mutex + current Status + onChange func(Status) +} + +// NewStatusTracker creates a new StatusTracker with an initial status +// of StatusStopped. +func NewStatusTracker() *StatusTracker { + return &StatusTracker{ + current: StatusStopped, + } +} + +// Get returns the current status. +func (st *StatusTracker) Get() Status { + st.mu.Lock() + defer st.mu.Unlock() + return st.current +} + +// Set updates the current status. If an onChange callback is +// registered and the status has changed, the callback is invoked +// after the lock is released with the new status. +func (st *StatusTracker) Set(s Status) { + st.mu.Lock() + if st.current == s { + st.mu.Unlock() + return + } + st.current = s + fn := st.onChange + st.mu.Unlock() + if fn != nil { + fn(s) + } +} + +// OnChange registers a callback that is called whenever the status +// changes. Only one callback can be registered at a time; subsequent +// calls replace the previous callback. +func (st *StatusTracker) OnChange(fn func(Status)) { + st.mu.Lock() + defer st.mu.Unlock() + st.onChange = fn +}