Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions token/services/certifier/interactive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,12 @@ const (
// in a CertificationRequest. Requests exceeding this limit are rejected to prevent
// memory exhaustion on the certifier node.
MaxRequestBytes = 1 << 20 // 1 MiB

// MaxWireMessageBytes is the maximum byte-length of the entire JSON-encoded
// certification request as received from the wire. This guard fires before
// JSON deserialisation so that an oversized message is dropped without ever
// allocating the decoded struct — preventing memory exhaustion from large
// payloads. It is set to 2 MiB to accommodate the base64 overhead of
// MaxRequestBytes plus the JSON-encoded IDs and header fields.
MaxWireMessageBytes = MaxRequestBytes * 2 // 2 MiB
)
20 changes: 13 additions & 7 deletions token/services/certifier/interactive/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ type ResponderRegistry interface {
type CertificationService struct {
ResponderRegistry ResponderRegistry

startOnce sync.Once
mu sync.RWMutex
wallets map[string]string
backend Backend
metrics *Metrics
startOnce sync.Once
mu sync.RWMutex
wallets map[string]string
backend Backend
metrics *Metrics
sessionFactory func(view.Context) session.JsonSession
}

func NewCertificationService(responderRegistry ResponderRegistry, mp metrics.Provider, backend Backend) *CertificationService {
Expand All @@ -45,6 +46,9 @@ func NewCertificationService(responderRegistry ResponderRegistry, mp metrics.Pro
metrics: NewMetrics(mp),
backend: backend,
ResponderRegistry: responderRegistry,
sessionFactory: func(ctx view.Context) session.JsonSession {
return session.JSONWithLimit(ctx, MaxWireMessageBytes)
},
}
}

Expand All @@ -67,9 +71,11 @@ func (c *CertificationService) SetWallet(tms *token2.ManagementService, wallet s
}

func (c *CertificationService) Call(context view.Context) (interface{}, error) {
// 1. receive request
// 1. receive request — the session returned by sessionFactory enforces
// MaxWireMessageBytes before JSON deserialisation, preventing memory
// exhaustion from oversized payloads. See session.SizeLimitedJsonSession.
logger.Debugf("receive certification request [%s]", context.ID())
s := session.JSON(context)
s := c.sessionFactory(context)

var cr *CertificationRequest
if err := s.Receive(&cr); err != nil {
Expand Down
125 changes: 125 additions & 0 deletions token/services/certifier/interactive/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,89 @@ SPDX-License-Identifier: Apache-2.0
package interactive

import (
"bytes"
"context"
"strconv"
"testing"
"time"

"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/utils/json/session"
"github.com/hyperledger-labs/fabric-token-sdk/token/token"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
)

// ---------------------------------------------------------------------------
// fakeJsonSession — minimal session.JsonSession for unit-testing Call().
// Only ReceiveRaw and Send are used in the paths exercised by these tests.
// ---------------------------------------------------------------------------

type fakeJsonSession struct {
raw []byte // bytes returned by ReceiveRaw
rawErr error // error returned by ReceiveRaw
}

func (f *fakeJsonSession) ReceiveRaw() ([]byte, error) { return f.raw, f.rawErr }
func (f *fakeJsonSession) ReceiveRawWithTimeout(_ time.Duration) ([]byte, error) {
return f.raw, f.rawErr
}
func (f *fakeJsonSession) Receive(_ interface{}) error { return nil }
func (f *fakeJsonSession) ReceiveWithTimeout(_ interface{}, _ time.Duration) error { return nil }
func (f *fakeJsonSession) Send(_ interface{}) error { return nil }
func (f *fakeJsonSession) SendRaw(_ context.Context, _ []byte) error { return nil }
func (f *fakeJsonSession) SendWithContext(_ context.Context, _ interface{}) error { return nil }
func (f *fakeJsonSession) SendError(_ string) error { return nil }
func (f *fakeJsonSession) SendErrorWithContext(_ context.Context, _ string) error { return nil }
func (f *fakeJsonSession) Info() view.SessionInfo { return view.SessionInfo{} }
func (f *fakeJsonSession) Session() view.Session { return nil }

var _ session.JsonSession = (*fakeJsonSession)(nil)

// fakeViewContext satisfies view.Context for tests that only need ID() and the
// session factory — all other methods panic if called unexpectedly.
type fakeViewContext struct{}

func (f *fakeViewContext) ID() string { return "test-context-id" }

// Unused methods — panic to surface accidental calls in tests.
func (f *fakeViewContext) StartSpanFrom(_ context.Context, _ string, _ ...trace.SpanStartOption) (context.Context, trace.Span) {
panic("StartSpanFrom called unexpectedly")
}
func (f *fakeViewContext) GetService(_ interface{}) (interface{}, error) {
panic("GetService called unexpectedly")
}
func (f *fakeViewContext) RunView(_ view.View, _ ...view.RunViewOption) (interface{}, error) {
panic("RunView called unexpectedly")
}
func (f *fakeViewContext) Me() view.Identity { return nil }
func (f *fakeViewContext) IsMe(_ view.Identity) bool { return false }
func (f *fakeViewContext) Initiator() view.View { return nil }
func (f *fakeViewContext) GetSession(_ view.View, _ view.Identity, _ ...view.View) (view.Session, error) {
return nil, nil
}
func (f *fakeViewContext) GetSessionByID(_ string, _ view.Identity) (view.Session, error) {
return nil, nil
}
func (f *fakeViewContext) Session() view.Session { return nil }
func (f *fakeViewContext) Context() context.Context { return context.Background() }
func (f *fakeViewContext) OnError(_ func()) {}

// newServiceWithSession builds a CertificationService whose Call() will use
// the provided fakeJsonSession wrapped in a SizeLimitedJsonSession so that
// the wire-size guard fires exactly as in production.
func newServiceWithSession(fs *fakeJsonSession) *CertificationService {
svc := NewCertificationService(&ResponderRegistryMock{}, &disabled.Provider{}, &BackendMock{})
svc.sessionFactory = func(_ view.Context) session.JsonSession {
return session.NewSizeLimitedSession(fs, MaxWireMessageBytes)
}

return svc
}

// Note: We use counterfeiter-generated mocks that are in the same package (not a subpackage).
// This avoids import cycles that would occur if mocks were in interactive/mock, since the
// Backend interface references *CertificationRequest from the interactive package.
Expand Down Expand Up @@ -102,6 +175,58 @@ func TestCertificationRequest_String(t *testing.T) {
assert.Contains(t, str, "test-namespace")
}

// ---------------------------------------------------------------------------
// Wire-size guard — Call() must reject oversized messages before JSON decode.
// The guard lives in session.SizeLimitedJsonSession; these tests verify it is
// wired up correctly through the service's sessionFactory.
// ---------------------------------------------------------------------------

// TestCall_WireMessageTooLarge verifies that Call() rejects a message whose raw
// byte length exceeds MaxWireMessageBytes without attempting JSON decoding.
func TestCall_WireMessageTooLarge(t *testing.T) {
oversized := bytes.Repeat([]byte("x"), MaxWireMessageBytes+1)
svc := newServiceWithSession(&fakeJsonSession{raw: oversized})

_, err := svc.Call(&fakeViewContext{})

require.Error(t, err)
assert.Contains(t, err.Error(), "message too large")
assert.Contains(t, err.Error(), strconv.Itoa(MaxWireMessageBytes))
}

// TestCall_WireMessageAtLimit verifies that a message exactly at MaxWireMessageBytes
// is not rejected by the size guard (it may fail for other reasons, e.g. JSON parse).
func TestCall_WireMessageAtLimit(t *testing.T) {
atLimit := bytes.Repeat([]byte("x"), MaxWireMessageBytes)
svc := newServiceWithSession(&fakeJsonSession{raw: atLimit})

_, err := svc.Call(&fakeViewContext{})

require.Error(t, err)
// Must NOT be a size-guard error — the guard must pass.
assert.NotContains(t, err.Error(), "message too large")
}

// TestCall_ReceiveRawError verifies that a transport error from ReceiveRaw is
// propagated with a descriptive message.
func TestCall_ReceiveRawError(t *testing.T) {
transportErr := errors.New("connection reset by peer")
svc := newServiceWithSession(&fakeJsonSession{rawErr: transportErr})

_, err := svc.Call(&fakeViewContext{})

require.Error(t, err)
require.ErrorIs(t, err, transportErr)
assert.Contains(t, err.Error(), "failed receiving certification request")
}

// TestMaxWireMessageBytes_IsDoubleMaxRequestBytes documents and enforces the
// expected relationship between the two size constants.
func TestMaxWireMessageBytes_IsDoubleMaxRequestBytes(t *testing.T) {
assert.Equal(t, MaxRequestBytes*2, MaxWireMessageBytes,
"MaxWireMessageBytes should be 2× MaxRequestBytes to accommodate base64 overhead and ID encoding")
}

// TestNewCertificationRequestView verifies construction of a CertificationRequestView.
func TestNewCertificationRequestView(t *testing.T) {
channel := "test-channel"
Expand Down
116 changes: 116 additions & 0 deletions token/services/utils/json/session/limited.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package session

import (
"context"
"encoding/json"
"time"

"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
)

// SizeLimitedJsonSession wraps a JsonSession and rejects any incoming message
// whose raw byte length exceeds maxBytes. The size check fires before JSON
// deserialization so that oversized payloads are dropped without ever
// allocating the decoded struct, preventing memory exhaustion attacks.
//
// This is the recommended way to enforce per-service message-size limits
// without modifying each view's Call implementation individually.
type SizeLimitedJsonSession struct {
inner JsonSession
maxBytes int
}

// NewSizeLimitedSession wraps inner with a size guard. Any received message
// whose raw wire length exceeds maxBytes is rejected before deserialization.
func NewSizeLimitedSession(inner JsonSession, maxBytes int) JsonSession {
return &SizeLimitedJsonSession{inner: inner, maxBytes: maxBytes}
}

// JSONWithLimit returns a JsonSession backed by the current FSC session that
// enforces an upper bound on received message sizes. Any message whose raw
// wire length exceeds maxBytes is rejected before JSON deserialization.
func JSONWithLimit(ctx view.Context, maxBytes int) JsonSession {
return NewSizeLimitedSession(JSON(ctx), maxBytes)
}

// ReceiveRaw reads the next raw message and returns an error if its length
// exceeds the configured limit.
func (s *SizeLimitedJsonSession) ReceiveRaw() ([]byte, error) {
raw, err := s.inner.ReceiveRaw()
if err != nil {
return nil, err
}
if err := s.checkSize(raw); err != nil {
return nil, err
}
return raw, nil
}

// ReceiveRawWithTimeout reads the next raw message with a deadline and returns
// an error if its length exceeds the configured limit.
func (s *SizeLimitedJsonSession) ReceiveRawWithTimeout(d time.Duration) ([]byte, error) {
raw, err := s.inner.ReceiveRawWithTimeout(d)
if err != nil {
return nil, err
}
if err := s.checkSize(raw); err != nil {
return nil, err
}
return raw, nil
}

// Receive reads the next message, enforces the size limit, then unmarshals it
// into state.
func (s *SizeLimitedJsonSession) Receive(state interface{}) error {
raw, err := s.ReceiveRaw()
if err != nil {
return err
}
return json.Unmarshal(raw, state)
}

// ReceiveWithTimeout reads the next message with a deadline, enforces the size
// limit, then unmarshals it into state.
func (s *SizeLimitedJsonSession) ReceiveWithTimeout(state interface{}, d time.Duration) error {
raw, err := s.ReceiveRawWithTimeout(d)
if err != nil {
return err
}
return json.Unmarshal(raw, state)
}

// The remaining methods delegate unchanged to the inner session.

func (s *SizeLimitedJsonSession) Info() view.SessionInfo { return s.inner.Info() }

func (s *SizeLimitedJsonSession) Send(payload any) error { return s.inner.Send(payload) }

func (s *SizeLimitedJsonSession) SendRaw(ctx context.Context, raw []byte) error {
return s.inner.SendRaw(ctx, raw)
}

func (s *SizeLimitedJsonSession) SendWithContext(ctx context.Context, payload any) error {
return s.inner.SendWithContext(ctx, payload)
}

func (s *SizeLimitedJsonSession) SendError(msg string) error { return s.inner.SendError(msg) }

func (s *SizeLimitedJsonSession) SendErrorWithContext(ctx context.Context, msg string) error {
return s.inner.SendErrorWithContext(ctx, msg)
}

func (s *SizeLimitedJsonSession) Session() Session { return s.inner.Session() }

func (s *SizeLimitedJsonSession) checkSize(raw []byte) error {
if len(raw) > s.maxBytes {
return errors.Errorf("message too large (%d > %d bytes)", len(raw), s.maxBytes)
}
return nil
}