From 4eea9b7a9efb9e958e56ddcc374c010a4c231b4d Mon Sep 17 00:00:00 2001 From: Chris Gianelloni Date: Sat, 17 Jan 2026 19:23:27 -0500 Subject: [PATCH] feat(push): persist FCM messages Signed-off-by: Chris Gianelloni --- output/push/fcm/message.go | 10 +-- output/push/fcm/message_test.go | 64 ++++++++++++++++ output/push/fcm_repository.go | 129 ++++++++++++++++++++++++++++++-- output/push/plugin.go | 9 ++- output/push/push.go | 62 +++++++-------- 5 files changed, 226 insertions(+), 48 deletions(-) create mode 100644 output/push/fcm/message_test.go diff --git a/output/push/fcm/message.go b/output/push/fcm/message.go index 7564ed7..99b9edc 100644 --- a/output/push/fcm/message.go +++ b/output/push/fcm/message.go @@ -22,9 +22,6 @@ import ( "fmt" "io" "net/http" - "os" - - "github.com/blinklabs-io/adder/internal/logging" ) type Message struct { @@ -59,10 +56,9 @@ func WithNotification(title string, body string) MessageOption { } } -func NewMessage(token string, opts ...MessageOption) *Message { +func NewMessage(token string, opts ...MessageOption) (*Message, error) { if token == "" { - logging.GetLogger().Error("Token is mandatory for FCM message") - os.Exit(1) + return nil, errors.New("token is mandatory for FCM message") } msg := &Message{ @@ -73,7 +69,7 @@ func NewMessage(token string, opts ...MessageOption) *Message { for _, opt := range opts { opt(&msg.MessageContent) } - return msg + return msg, nil } func Send(accessToken string, projectId string, msg *Message) error { diff --git a/output/push/fcm/message_test.go b/output/push/fcm/message_test.go new file mode 100644 index 0000000..efa684d --- /dev/null +++ b/output/push/fcm/message_test.go @@ -0,0 +1,64 @@ +// Copyright 2025 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fcm + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewMessage(t *testing.T) { + t.Run("empty token returns error", func(t *testing.T) { + msg, err := NewMessage("") + assert.Nil(t, msg) + require.Error(t, err) + assert.Contains(t, err.Error(), "token is mandatory") + }) + + t.Run("valid token returns message", func(t *testing.T) { + msg, err := NewMessage("valid-token-123") + require.NoError(t, err) + require.NotNil(t, msg) + assert.Equal(t, "valid-token-123", msg.Token) + }) + + t.Run("with notification option", func(t *testing.T) { + msg, err := NewMessage( + "valid-token-123", + WithNotification("Test Title", "Test Body"), + ) + require.NoError(t, err) + require.NotNil(t, msg) + require.NotNil(t, msg.Notification) + assert.Equal(t, "Test Title", msg.Notification.Title) + assert.Equal(t, "Test Body", msg.Notification.Body) + }) + + t.Run("with data option", func(t *testing.T) { + data := map[string]any{ + "key1": "value1", + "key2": "value2", + } + msg, err := NewMessage( + "valid-token-123", + WithData(data), + ) + require.NoError(t, err) + require.NotNil(t, msg) + assert.Equal(t, data, msg.Data) + }) +} diff --git a/output/push/fcm_repository.go b/output/push/fcm_repository.go index c545a44..116e0cd 100644 --- a/output/push/fcm_repository.go +++ b/output/push/fcm_repository.go @@ -15,14 +15,21 @@ package push import ( + "encoding/json" "net/http" + "os" + "sync" _ "github.com/blinklabs-io/adder/docs" + "github.com/blinklabs-io/adder/internal/logging" "github.com/gin-gonic/gin" ) type TokenStore struct { - FCMTokens map[string]string + FCMTokens map[string]string `json:"fcm_tokens"` + filePath string + mu sync.RWMutex + persistMutex sync.Mutex } // TokenRequest represents a request containing an FCM token. @@ -43,16 +50,36 @@ type ErrorResponse struct { Error string `json:"error"` } -// TODO add support for persistence (#335) var fcmStore *TokenStore func init() { - fcmStore = newTokenStore() + fcmStore = newTokenStore("") } -func newTokenStore() *TokenStore { - return &TokenStore{ +func newTokenStore(filePath string) *TokenStore { + store := &TokenStore{ FCMTokens: make(map[string]string), + filePath: filePath, + } + // Load existing tokens if persistence is enabled + if filePath != "" { + store.loadTokens() + } + return store +} + +// SetPersistenceFile configures the file path for token persistence +// If called with a non-empty path, tokens will be loaded from and saved to this file +func SetPersistenceFile(filePath string) { + if fcmStore == nil { + fcmStore = newTokenStore(filePath) + return + } + fcmStore.persistMutex.Lock() + fcmStore.filePath = filePath + fcmStore.persistMutex.Unlock() + if filePath != "" { + fcmStore.loadTokens() } } @@ -60,6 +87,77 @@ func getTokenStore() *TokenStore { return fcmStore } +// loadTokens loads tokens from the persistence file +func (s *TokenStore) loadTokens() { + s.persistMutex.Lock() + defer s.persistMutex.Unlock() + + if s.filePath == "" { + return + } + + logger := logging.GetLogger() + + data, err := os.ReadFile(s.filePath) + if err != nil { + if os.IsNotExist(err) { + // File doesn't exist yet, that's fine for first run + logger.Debug("FCM token persistence file does not exist yet", "path", s.filePath) + return + } + logger.Error("failed to read FCM tokens from file", "error", err, "path", s.filePath) + return + } + + var loadedStore struct { + FCMTokens map[string]string `json:"fcm_tokens"` + } + if err := json.Unmarshal(data, &loadedStore); err != nil { + logger.Error("failed to parse FCM tokens from file", "error", err, "path", s.filePath) + return + } + + s.mu.Lock() + if loadedStore.FCMTokens != nil { + s.FCMTokens = loadedStore.FCMTokens + } + s.mu.Unlock() + + logger.Info("loaded FCM tokens from persistence file", "count", len(loadedStore.FCMTokens), "path", s.filePath) +} + +// saveTokens saves tokens to the persistence file +func (s *TokenStore) saveTokens() { + s.persistMutex.Lock() + defer s.persistMutex.Unlock() + + if s.filePath == "" { + return + } + + logger := logging.GetLogger() + + s.mu.RLock() + data, err := json.MarshalIndent(struct { + FCMTokens map[string]string `json:"fcm_tokens"` + }{ + FCMTokens: s.FCMTokens, + }, "", " ") + s.mu.RUnlock() + + if err != nil { + logger.Error("failed to marshal FCM tokens", "error", err) + return + } + + if err := os.WriteFile(s.filePath, data, 0o600); err != nil { + logger.Error("failed to write FCM tokens to file", "error", err, "path", s.filePath) + return + } + + logger.Debug("saved FCM tokens to persistence file", "path", s.filePath) +} + // @Summary Store FCM Token // @Description Store a new FCM token // @Accept json @@ -84,7 +182,10 @@ func storeFCMToken(c *gin.Context) { ) return } + store.mu.Lock() store.FCMTokens[req.FCMToken] = req.FCMToken + store.mu.Unlock() + store.saveTokens() c.Status(http.StatusCreated) } @@ -106,7 +207,9 @@ func readFCMToken(c *gin.Context) { ) return } + store.mu.RLock() storedToken, exists := store.FCMTokens[token] + store.mu.RUnlock() if !exists { c.Status(http.StatusNotFound) return @@ -132,20 +235,32 @@ func deleteFCMToken(c *gin.Context) { ) return } + store.mu.Lock() _, exists := store.FCMTokens[token] if exists { delete(store.FCMTokens, token) + } + store.mu.Unlock() + if exists { + store.saveTokens() c.Status(http.StatusNoContent) } else { c.Status(http.StatusNotFound) } } -// GetFcmTokens returns the current in-memory FCM tokens +// GetFcmTokens returns a copy of the current in-memory FCM tokens func GetFcmTokens() map[string]string { store := getTokenStore() if store == nil { return make(map[string]string) } - return store.FCMTokens + store.mu.RLock() + defer store.mu.RUnlock() + // Return a copy to avoid race conditions + tokens := make(map[string]string, len(store.FCMTokens)) + for k, v := range store.FCMTokens { + tokens[k] = v + } + return tokens } diff --git a/output/push/plugin.go b/output/push/plugin.go index 1a8112c..1094792 100644 --- a/output/push/plugin.go +++ b/output/push/plugin.go @@ -52,12 +52,19 @@ func init() { } func NewFromCmdlineOptions() plugin.Plugin { - p := New( + p, err := New( WithLogger( logging.GetLogger().With("plugin", "output.push"), ), WithAccessTokenUrl(cmdlineOptions.accessTokenUrl), WithServiceAccountFilePath(cmdlineOptions.serviceAccountFilePath), ) + if err != nil { + logging.GetLogger().Error( + "failed to create push output plugin", + "error", err, + ) + return nil + } return p } diff --git a/output/push/push.go b/output/push/push.go index 6fd1a6c..9ef6dee 100644 --- a/output/push/push.go +++ b/output/push/push.go @@ -20,6 +20,7 @@ import ( "encoding/json" "errors" "fmt" + "log/slog" "math/big" "os" @@ -52,18 +53,16 @@ type PushPayload struct { Notifications []Notification `json:"notifications"` } -func New(options ...PushOptionFunc) *PushOutput { +func New(options ...PushOptionFunc) (*PushOutput, error) { p := &PushOutput{} for _, option := range options { option(p) } if err := p.GetProjectId(); err != nil { - logging.GetLogger(). - Error(fmt.Sprintf("Failed to get project ID: %v", err)) - os.Exit(1) + return nil, fmt.Errorf("failed to get project ID: %w", err) } - return p + return p, nil } func (p *PushOutput) Start() error { @@ -80,18 +79,21 @@ func (p *PushOutput) Start() error { } // Get access token per each event if err := p.GetAccessToken(); err != nil { - return + slog.Error("failed to get access token", "error", err) + continue } switch evt.Type { case "chainsync.block": payload := evt.Payload if payload == nil { - panic(fmt.Errorf("ERROR: %v", payload)) + slog.Error("block event has nil payload") + continue } context := evt.Context if context == nil { - panic(fmt.Errorf("ERROR: %v", context)) + slog.Error("block event has nil context") + continue } be := payload.(event.BlockEvent) @@ -119,7 +121,8 @@ func (p *PushOutput) Start() error { case "chainsync.rollback": payload := evt.Payload if payload == nil { - panic(fmt.Errorf("ERROR: %v", payload)) + slog.Error("rollback event has nil payload") + continue } re := payload.(event.RollbackEvent) @@ -133,11 +136,13 @@ func (p *PushOutput) Start() error { case "chainsync.transaction": payload := evt.Payload if payload == nil { - panic(fmt.Errorf("ERROR: %v", payload)) + slog.Error("transaction event has nil payload") + continue } context := evt.Context if context == nil { - panic(fmt.Errorf("ERROR: %v", context)) + slog.Error("transaction event has nil context") + continue } te := payload.(event.TransactionEvent) @@ -207,7 +212,7 @@ func (p *PushOutput) processFcmNotifications(title, body string) { // Fetch new FCM tokens and add to p.fcmTokens p.refreshFcmTokens() - // If no FCM tokens exist, log and exit + // If no FCM tokens exist, log and return if len(p.fcmTokens) == 0 { logging.GetLogger().Info("No FCM tokens found. Skipping notification.") return @@ -215,10 +220,15 @@ func (p *PushOutput) processFcmNotifications(title, body string) { // Send notification to each FCM token for _, fcmToken := range p.fcmTokens { - msg := fcm.NewMessage( + msg, err := fcm.NewMessage( fcmToken, fcm.WithNotification(title, body), ) + if err != nil { + logging.GetLogger(). + Error(fmt.Sprintf("Failed to create message for token %s: %v", fcmToken, err)) + continue + } if err := fcm.Send(p.accessToken, p.projectID, msg); err != nil { logging.GetLogger(). @@ -233,25 +243,17 @@ func (p *PushOutput) processFcmNotifications(title, body string) { func (p *PushOutput) GetAccessToken() error { data, err := os.ReadFile(p.serviceAccountFilePath) if err != nil { - logging.GetLogger(). - Error(fmt.Sprintf("Failed to read the credential file: %v", err)) - os.Exit(1) - return err + return fmt.Errorf("failed to read credential file: %w", err) } conf, err := google.JWTConfigFromJSON(data, p.accessTokenUrl) if err != nil { - logging.GetLogger(). - Error(fmt.Sprintf("Failed to parse the credential file: %v", err)) - os.Exit(1) - return err + return fmt.Errorf("failed to parse credential file: %w", err) } token, err := conf.TokenSource(context.Background()).Token() if err != nil { - logging.GetLogger().Error(fmt.Sprintf("Failed to get token: %v", err)) - os.Exit(1) - return err + return fmt.Errorf("failed to get token: %w", err) } fmt.Println(token.AccessToken) @@ -259,23 +261,17 @@ func (p *PushOutput) GetAccessToken() error { return nil } -// Get project ID from file +// GetProjectId gets project ID from file func (p *PushOutput) GetProjectId() error { data, err := os.ReadFile(p.serviceAccountFilePath) if err != nil { - logging.GetLogger(). - Error(fmt.Sprintf("Failed to read the credential file: %v", err)) - os.Exit(1) - return err + return fmt.Errorf("failed to read credential file: %w", err) } // Get project ID from file var v map[string]any if err := json.Unmarshal(data, &v); err != nil { - logging.GetLogger(). - Error(fmt.Sprintf("Failed to parse the credential file: %v", err)) - os.Exit(1) - return err + return fmt.Errorf("failed to parse credential file: %w", err) } p.projectID = v["project_id"].(string)