Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,21 @@ linters:
- nilerr
- bodyclose
- exhaustive
- gosec
settings:
exhaustive:
default-signifies-exhaustive: true
gocritic:
enabled-tags:
- diagnostic
- performance
gosec:
# Narrowly scoped to G402 (TLS InsecureSkipVerify) so the existing
# //nolint:gosec annotations in pkg/oidc and pkg/searchengine are
# actually suppressing a real rule, and any new unannotated
# InsecureSkipVerify is rejected at lint time.
includes:
- G402
exclusions:
presets:
- std-error-handling
Expand Down
3 changes: 2 additions & 1 deletion auth-service/deploy/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ AUTH_SIGNING_KEY=SAXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
# Uses Docker service name (auth-service runs inside Docker network).
# For host-based dev, change to http://localhost:9090/realms/chatapp
OIDC_ISSUER_URL=http://keycloak:8080/realms/chatapp
OIDC_AUDIENCE=nats-chat
# Comma-separated list of acceptable `aud` values.
OIDC_AUDIENCES=nats-chat

# Skip TLS cert verification for OIDC issuer (dev only — self-signed certs)
TLS_SKIP_VERIFY=false
2 changes: 1 addition & 1 deletion auth-service/deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
- DEV_MODE=${DEV_MODE:-true}
- NATS_JWT_EXPIRY=2h
- OIDC_ISSUER_URL=http://keycloak:8080/realms/chatapp
- OIDC_AUDIENCE=nats-chat
- OIDC_AUDIENCES=nats-chat
- TLS_SKIP_VERIFY=false
networks:
- chat-local
Expand Down
12 changes: 6 additions & 6 deletions auth-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ type config struct {
NATSJWTExpiry time.Duration `env:"NATS_JWT_EXPIRY" envDefault:"2h"`

// OIDC settings — required when DEV_MODE is false.
OIDCIssuerURL string `env:"OIDC_ISSUER_URL"`
OIDCAudience string `env:"OIDC_AUDIENCE"`
TLSSkipVerify bool `env:"TLS_SKIP_VERIFY" envDefault:"false"`
OIDCIssuerURL string `env:"OIDC_ISSUER_URL"`
OIDCAudiences []string `env:"OIDC_AUDIENCES" envSeparator:","`
TLSSkipVerify bool `env:"TLS_SKIP_VERIFY" envDefault:"false"`
}

func main() {
Expand Down Expand Up @@ -56,14 +56,14 @@ func run() error {
slog.Info("dev mode enabled — OIDC validation disabled")
handler = NewAuthHandler(nil, signingKP, cfg.NATSJWTExpiry, true)
} else {
if cfg.OIDCIssuerURL == "" || cfg.OIDCAudience == "" {
return fmt.Errorf("OIDC_ISSUER_URL and OIDC_AUDIENCE are required when DEV_MODE is false")
if cfg.OIDCIssuerURL == "" || len(cfg.OIDCAudiences) == 0 {
return fmt.Errorf("OIDC_ISSUER_URL and OIDC_AUDIENCES are required when DEV_MODE is false")
}

// Initialize OIDC validator — connects to issuer and fetches JWKS keys.
oidcValidator, err := pkgoidc.NewValidator(ctx, pkgoidc.Config{
IssuerURL: cfg.OIDCIssuerURL,
Audience: cfg.OIDCAudience,
Audiences: cfg.OIDCAudiences,
TLSSkipVerify: cfg.TLSSkipVerify,
})
if err != nil {
Expand Down
69 changes: 69 additions & 0 deletions inbox-worker/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ import (
"testing"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
natsmod "github.com/testcontainers/testcontainers-go/modules/nats"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"

"github.com/hmchangw/chat/pkg/model"
"github.com/hmchangw/chat/pkg/natsutil"
"github.com/hmchangw/chat/pkg/stream"
"github.com/hmchangw/chat/pkg/subject"
"github.com/hmchangw/chat/pkg/testutil"
"github.com/hmchangw/chat/pkg/testutil/testimages"
)

func setupMongo(t *testing.T) *mongo.Database {
Expand Down Expand Up @@ -408,3 +414,66 @@ func TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub(t *testing.T) {
assert.Nil(t, bobSub.Roles, "DMs have no roles")
assert.False(t, bobSub.IsSubscribed, "DM does not set IsSubscribed=true")
}

// setupNATS starts a NATS container with JetStream enabled and returns a
// JetStream client tied to the test's lifetime.
func setupNATS(t *testing.T) (context.Context, jetstream.JetStream) {
t.Helper()
ctx := context.Background()

c, err := natsmod.Run(ctx, testimages.NATS)
require.NoError(t, err)
t.Cleanup(func() { _ = c.Terminate(ctx) })

url, err := c.ConnectionString(ctx)
require.NoError(t, err)

nc, err := nats.Connect(url)
require.NoError(t, err)
t.Cleanup(func() { nc.Close() })

js, err := jetstream.New(nc)
require.NoError(t, err)

return ctx, js
}

// TestInboxWorker_FilterScoping_Integration verifies the consumer filters
// out the local lane: a local-lane publish stays unreachable to inbox-worker.
func TestInboxWorker_FilterScoping_Integration(t *testing.T) {
const siteID = "site-filter"

ctx, js := setupNATS(t)

inboxCfg := stream.Inbox(siteID)
_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: inboxCfg.Name,
Subjects: inboxCfg.Subjects,
})
require.NoError(t, err)

cons, err := js.CreateOrUpdateConsumer(ctx, inboxCfg.Name, jetstream.ConsumerConfig{
Durable: "inbox-worker",
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubjects: []string{subject.InboxAggregateAll(siteID)},
})
require.NoError(t, err)

_, err = js.Publish(ctx, subject.InboxMemberAdded(siteID), []byte(`{"type":"member_added"}`))
require.NoError(t, err)
_, err = js.Publish(ctx, subject.InboxMemberAddedAggregate(siteID), []byte(`{"type":"member_added"}`))
require.NoError(t, err)

require.Eventually(t, func() bool {
info, err := js.Stream(ctx, inboxCfg.Name)
if err != nil {
return false
}
return info.CachedInfo().State.Msgs >= 2
}, 2*time.Second, 50*time.Millisecond, "stream must accept both publishes")

info, err := cons.Info(ctx)
require.NoError(t, err)
assert.EqualValues(t, 1, info.NumPending,
"FilterSubjects must scope inbox-worker to the aggregate.> lane only")
}
7 changes: 5 additions & 2 deletions inbox-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/hmchangw/chat/pkg/otelutil"
"github.com/hmchangw/chat/pkg/shutdown"
"github.com/hmchangw/chat/pkg/stream"
"github.com/hmchangw/chat/pkg/subject"
)

type config struct {
Expand Down Expand Up @@ -210,9 +211,11 @@ func main() {

inboxCfg := stream.Inbox(cfg.SiteID)

// Local lane is reserved for search-sync-worker; scope to aggregate.> only.
cons, err := js.CreateOrUpdateConsumer(ctx, inboxCfg.Name, jetstream.ConsumerConfig{
Durable: "inbox-worker",
AckPolicy: jetstream.AckExplicitPolicy,
Durable: "inbox-worker",
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubjects: []string{subject.InboxAggregateAll(cfg.SiteID)},
})
if err != nil {
slog.Error("create consumer failed", "error", err)
Expand Down
45 changes: 30 additions & 15 deletions pkg/oidc/oidc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"crypto/tls"
"errors"
"fmt"
"log/slog"
"net/http"
"slices"
"time"

"github.com/coreos/go-oidc/v3/oidc"
Expand All @@ -25,28 +27,35 @@ type Claims struct {
Extra map[string]interface{}
}

// ErrTokenExpired is returned when the SSO token has passed its expiry time.
var ErrTokenExpired = fmt.Errorf("oidc: token has expired")
var (
ErrTokenExpired = errors.New("oidc: token has expired")
ErrNoAudiences = errors.New("oidc: at least one allowed audience is required")
ErrAudienceNotAllowed = errors.New("oidc: token audience not in allowed list")
)

// Config controls how the OIDC validator behaves.
type Config struct {
IssuerURL string
Audience string
IssuerURL string
// A token is accepted when any of its `aud` claim entries appears here.
Audiences []string
TLSSkipVerify bool
}

// Validator verifies OIDC tokens against an issuer's JWKS endpoint.
type Validator struct {
verifier *oidc.IDTokenVerifier
httpClient *http.Client
audience string
audiences []string
}

const issuerDiscoveryTimeout = 10 * time.Second

// NewValidator connects to the OIDC issuer and fetches its JWKS keys.
// Fails fast if the issuer is unreachable.
func NewValidator(ctx context.Context, cfg Config) (*Validator, error) {
if len(cfg.Audiences) == 0 {
return nil, ErrNoAudiences
}

var httpClient *http.Client

if cfg.TLSSkipVerify {
Expand All @@ -62,7 +71,6 @@ func NewValidator(ctx context.Context, cfg Config) (*Validator, error) {
ctx = oidc.ClientContext(ctx, httpClient)
}

// Ensure issuer discovery cannot hang indefinitely.
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, issuerDiscoveryTimeout)
Expand All @@ -74,22 +82,19 @@ func NewValidator(ctx context.Context, cfg Config) (*Validator, error) {
return nil, fmt.Errorf("connect to oidc issuer %q: %w", cfg.IssuerURL, err)
}

oidcConfig := &oidc.Config{
ClientID: cfg.Audience,
}
// SkipClientIDCheck: we enforce a multi-audience allow-list ourselves below.
oidcConfig := &oidc.Config{SkipClientIDCheck: true}

return &Validator{
verifier: provider.Verifier(oidcConfig),
httpClient: httpClient,
audience: cfg.Audience,
audiences: cfg.Audiences,
}, nil
}

// Validate verifies the raw OIDC token string and extracts user claims.
// Returns ErrTokenExpired if the token's exp claim is in the past — expiry
// is enforced by go-oidc's Verifier, we just translate its sentinel error.
// Returns ErrTokenExpired when go-oidc reports an expired exp claim.
func (v *Validator) Validate(ctx context.Context, rawToken string) (Claims, error) {
// Re-attach the custom HTTP client so JWKS fetches also use TLSSkipVerify.
if v.httpClient != nil {
ctx = oidc.ClientContext(ctx, v.httpClient)
}
Expand All @@ -103,6 +108,11 @@ func (v *Validator) Validate(ctx context.Context, rawToken string) (Claims, erro
return Claims{}, fmt.Errorf("oidc token verification failed: %w", err)
}

if !containsAudience(idToken.Audience, v.audiences) {
slog.Warn("oidc audience mismatch", "token_aud", idToken.Audience, "allowed", v.audiences)
return Claims{}, ErrAudienceNotAllowed
}

var tokenClaims struct {
Email string `json:"email"`
Name string `json:"name"`
Expand All @@ -118,7 +128,6 @@ func (v *Validator) Validate(ctx context.Context, rawToken string) (Claims, erro
return Claims{}, fmt.Errorf("parse oidc token claims: %w", err)
}

// Parse all claims into Extra for custom fields (roles, groups, etc.)
var allClaims map[string]interface{}
if err := idToken.Claims(&allClaims); err != nil {
return Claims{}, fmt.Errorf("parse oidc extra claims: %w", err)
Expand All @@ -145,3 +154,9 @@ func (v *Validator) Validate(ctx context.Context, rawToken string) (Claims, erro
Extra: allClaims,
}, nil
}

func containsAudience(tokenAud, allowed []string) bool {
return slices.ContainsFunc(tokenAud, func(t string) bool {
return slices.Contains(allowed, t)
})
}
37 changes: 37 additions & 0 deletions pkg/oidc/oidc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package oidc

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestContainsAudience(t *testing.T) {
cases := []struct {
name string
tokenAud []string
allowed []string
wantMatch bool
}{
{"single token aud matches single allowed", []string{"a"}, []string{"a"}, true},
{"token aud matches one of many allowed", []string{"b"}, []string{"a", "b", "c"}, true},
{"one of many token auds matches allowed", []string{"x", "b"}, []string{"a", "b"}, true},
{"no match", []string{"x"}, []string{"a", "b"}, false},
{"empty token aud", nil, []string{"a"}, false},
{"empty allowed", []string{"a"}, nil, false},
{"both empty", nil, nil, false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.wantMatch, containsAudience(tc.tokenAud, tc.allowed))
})
}
}

func TestNewValidator_RejectsEmptyAudiences(t *testing.T) {
_, err := NewValidator(t.Context(), Config{
IssuerURL: "http://example.invalid",
Audiences: nil,
})
assert.ErrorIs(t, err, ErrNoAudiences)
}
23 changes: 20 additions & 3 deletions pkg/searchengine/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,36 @@ package searchengine

import (
"context"
"crypto/tls"
"fmt"
"net/http"
"time"

"github.com/elastic/go-elasticsearch/v8"
)

// New creates a SearchEngine for the given backend ("elasticsearch" or "opensearch").
// It verifies connectivity via Ping before returning.
func New(ctx context.Context, backend, url string) (SearchEngine, error) {
// It verifies connectivity via Ping before returning. When tlsSkipVerify is
// true, server certificate verification is disabled — intended for
// self-signed/internal ES clusters; MUST stay false in production.
func New(ctx context.Context, backend, url string, tlsSkipVerify bool) (SearchEngine, error) {
var transport Transporter
switch backend {
case "elasticsearch":
client, err := elasticsearch.NewClient(elasticsearch.Config{Addresses: []string{url}})
esCfg := elasticsearch.Config{Addresses: []string{url}}
if tlsSkipVerify {
dt, ok := http.DefaultTransport.(*http.Transport)
if !ok {
return nil, fmt.Errorf("create elasticsearch client: http.DefaultTransport is not *http.Transport")
}
httpTransport := dt.Clone()
httpTransport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true, //nolint:gosec // intentional: opt-in via config for self-signed ES certs
MinVersion: tls.VersionTLS12,
}
esCfg.Transport = httpTransport
}
client, err := elasticsearch.NewClient(esCfg)
if err != nil {
return nil, fmt.Errorf("create elasticsearch client: %w", err)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/subject/subject.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ func InboxMemberRemovedAggregate(siteID string) string {
return fmt.Sprintf("chat.inbox.%s.aggregate.member_removed", siteID)
}

// InboxAggregateAll returns the wildcard pattern matching every federated
// (aggregate-lane) event on a site's INBOX stream:
// `chat.inbox.{siteID}.aggregate.>`. Use with
// jetstream.ConsumerConfig.FilterSubjects to scope a consumer to the
// federated lane only — excluding local-lane publishes that are reserved
// for search-sync-worker.
func InboxAggregateAll(siteID string) string {
return fmt.Sprintf("chat.inbox.%s.aggregate.>", siteID)
}

// InboxMemberEventSubjects returns the subject filters a consumer should use
// to receive both local and federated member_added/member_removed events for
// the given site. Use with jetstream.ConsumerConfig.FilterSubjects (NATS 2.10+).
Expand Down
Loading
Loading