Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions common/eventhub/apikey_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package eventhub

import (
"fmt"
"strings"
)

const apiKeyEntityIDSeparator = "_"

// BuildAPIKeyEntityID returns the composite entity ID used for API key events.
func BuildAPIKeyEntityID(apiID, keyID string) string {
return apiID + apiKeyEntityIDSeparator + keyID
}

// ParseAPIKeyEntityID splits the composite entity ID used for API key events.
func ParseAPIKeyEntityID(entityID string) (string, string, error) {
separatorIndex := strings.LastIndex(entityID, apiKeyEntityIDSeparator)
if separatorIndex <= 0 || separatorIndex == len(entityID)-1 {
return "", "", fmt.Errorf("invalid API key entity ID: %q", entityID)
}

return entityID[:separatorIndex], entityID[separatorIndex+1:], nil
Comment thread
VirajSalaka marked this conversation as resolved.
Outdated
}
40 changes: 40 additions & 0 deletions common/eventhub/apikey_event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package eventhub

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestAPIKeyEntityIDRoundTrip(t *testing.T) {
entityID := BuildAPIKeyEntityID("api_id", "keyID")

apiID, keyID, err := ParseAPIKeyEntityID(entityID)
require.NoError(t, err)
assert.Equal(t, "api_id", apiID)
assert.Equal(t, "keyID", keyID)
}

func TestParseAPIKeyEntityIDInvalid(t *testing.T) {
_, _, err := ParseAPIKeyEntityID("invalid")
require.Error(t, err)
}
63 changes: 63 additions & 0 deletions common/eventhub/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package eventhub

import "time"

const defaultGatewayStatePageSize = 200

// EventhubImpl defines the backend interface for pluggable event hub implementations
type EventhubImpl interface {
// Initialize sets up the backend
Initialize() error
// RegisterGateway registers a new gateway for event tracking.
RegisterGateway(gatewayID string) error
// Publish publishes an event for a gateway.
Publish(gatewayID string, event Event) error
// Subscribe subscribes to events for a gateway, returning a channel.
Subscribe(gatewayID string) (<-chan Event, error)
// Unsubscribe removes a specific subscription for a gateway.
Unsubscribe(gatewayID string, subscriber <-chan Event) error
// UnsubscribeAll removes all subscriptions for a gateway.
UnsubscribeAll(gatewayID string) error
// Cleanup removes events older than the retention period
Cleanup(retentionPeriod time.Duration) error
// CleanupRange removes events in a time range for a gateway.
CleanupRange(gatewayID string, before time.Time) error
// Close gracefully shuts down the backend
Close() error
}

// SQLBackendConfig holds configuration for the SQL backend
type SQLBackendConfig struct {
PollInterval time.Duration
CleanupInterval time.Duration
RetentionPeriod time.Duration
GatewayStatePageSize int
}

// DefaultSQLBackendConfig returns a SQLBackendConfig with sensible defaults
func DefaultSQLBackendConfig() SQLBackendConfig {
return SQLBackendConfig{
PollInterval: 2 * time.Second,
CleanupInterval: 5 * time.Minute,
RetentionPeriod: 1 * time.Hour,
GatewayStatePageSize: defaultGatewayStatePageSize,
}
}
85 changes: 85 additions & 0 deletions common/eventhub/eventhub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package eventhub

import (
"database/sql"
"log/slog"
)

// eventHub is the main EventHub implementation that delegates to a backend
type eventHub struct {
backend EventhubImpl
logger *slog.Logger
config Config
}

// New creates a new EventHub backed by SQLite
func New(db *sql.DB, logger *slog.Logger, config Config) EventHub {
backendConfig := DefaultSQLBackendConfig()
backendConfig.PollInterval = config.PollInterval
backendConfig.CleanupInterval = config.CleanupInterval
backendConfig.RetentionPeriod = config.RetentionPeriod
backend := NewSQLBackend(db, logger, backendConfig)
return &eventHub{
backend: backend,
logger: logger,
config: config,
}
}

// NewWithBackend creates a new EventHub with a custom backend
func NewWithBackend(backend EventhubImpl, logger *slog.Logger) EventHub {
return &eventHub{
backend: backend,
logger: logger,
}
}

func (h *eventHub) Initialize() error {
return h.backend.Initialize()
}

func (h *eventHub) RegisterGateway(gatewayID string) error {
return h.backend.RegisterGateway(gatewayID)
}

func (h *eventHub) PublishEvent(gatewayID string, event Event) error {
return h.backend.Publish(gatewayID, event)
}

func (h *eventHub) Subscribe(gatewayID string) (<-chan Event, error) {
return h.backend.Subscribe(gatewayID)
}

func (h *eventHub) Unsubscribe(gatewayID string, subscriber <-chan Event) error {
return h.backend.Unsubscribe(gatewayID, subscriber)
}

func (h *eventHub) UnsubscribeAll(gatewayID string) error {
return h.backend.UnsubscribeAll(gatewayID)
}

func (h *eventHub) CleanUpEvents() error {
return h.backend.Cleanup(h.config.RetentionPeriod)
}

func (h *eventHub) Close() error {
return h.backend.Close()
}
Loading