Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ linters-settings:
values:
regexp:
COMPANY: .*
YEAR: '\d\d\d\d(-\d\d\d\d)?'
template: |-
Copyright © {{ YEAR }} {{ COMPANY }}

Expand Down
2 changes: 1 addition & 1 deletion doc-site/docs/reference/types/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ title: Subscription
| Field Name | Description | Type |
|------------|-------------|------|
| `firstEvent` | Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest' | `SubOptsFirstEvent` |
| `readAhead` | The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | `uint16` |
| `readAhead` | The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | `uint` |
| `withData` | Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports. | `bool` |
| `batch` | Events are delivered in batches in an ordered array. The batch size is capped to the readAhead limit. The event payload is always an array even if there is a single event in the batch, allowing client-side optimizations when processing the events in a group. Available for both Webhooks and WebSockets. | `bool` |
| `batchTimeout` | When batching is enabled, the optional timeout to send events even when the batch hasn't filled. | `string` |
Expand Down
2 changes: 1 addition & 1 deletion doc-site/docs/reference/types/wsstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ title: WSStart
| Field Name | Description | Type |
|------------|-------------|------|
| `firstEvent` | Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest' | `SubOptsFirstEvent` |
| `readAhead` | The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | `uint16` |
| `readAhead` | The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | `uint` |
| `withData` | Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports. | `bool` |
| `batch` | Events are delivered in batches in an ordered array. The batch size is capped to the readAhead limit. The event payload is always an array even if there is a single event in the batch, allowing client-side optimizations when processing the events in a group. Available for both Webhooks and WebSockets. | `bool` |
| `batchTimeout` | When batching is enabled, the optional timeout to send events even when the batch hasn't filled. | `string` |
Expand Down
12 changes: 0 additions & 12 deletions doc-site/docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29287,7 +29287,6 @@ paths:
used in FireFly, so if your application crashes/reconnects
this is the maximum number of events you would expect
to be redelivered after it restarts
maximum: 65535
minimum: 0
type: integer
reply:
Expand Down Expand Up @@ -29572,7 +29571,6 @@ paths:
At least once delivery semantics are used in FireFly, so if
your application crashes/reconnects this is the maximum number
of events you would expect to be redelivered after it restarts
maximum: 65535
minimum: 0
type: integer
reply:
Expand Down Expand Up @@ -29846,7 +29844,6 @@ paths:
in FireFly, so if your application crashes/reconnects this
is the maximum number of events you would expect to be redelivered
after it restarts
maximum: 65535
minimum: 0
type: integer
reply:
Expand Down Expand Up @@ -30128,7 +30125,6 @@ paths:
At least once delivery semantics are used in FireFly, so if
your application crashes/reconnects this is the maximum number
of events you would expect to be redelivered after it restarts
maximum: 65535
minimum: 0
type: integer
reply:
Expand Down Expand Up @@ -30402,7 +30398,6 @@ paths:
in FireFly, so if your application crashes/reconnects this
is the maximum number of events you would expect to be redelivered
after it restarts
maximum: 65535
minimum: 0
type: integer
reply:
Expand Down Expand Up @@ -30750,7 +30745,6 @@ paths:
in FireFly, so if your application crashes/reconnects this
is the maximum number of events you would expect to be redelivered
after it restarts
maximum: 65535
minimum: 0
type: integer
reply:
Expand Down Expand Up @@ -38574,7 +38568,6 @@ paths:
used in FireFly, so if your application crashes/reconnects
this is the maximum number of events you would expect
to be redelivered after it restarts
maximum: 65535
minimum: 0
type: integer
reply:
Expand Down Expand Up @@ -38852,7 +38845,6 @@ paths:
At least once delivery semantics are used in FireFly, so if
your application crashes/reconnects this is the maximum number
of events you would expect to be redelivered after it restarts
maximum: 65535
minimum: 0
type: integer
reply:
Expand Down Expand Up @@ -39126,7 +39118,6 @@ paths:
in FireFly, so if your application crashes/reconnects this
is the maximum number of events you would expect to be redelivered
after it restarts
maximum: 65535
minimum: 0
type: integer
reply:
Expand Down Expand Up @@ -39401,7 +39392,6 @@ paths:
At least once delivery semantics are used in FireFly, so if
your application crashes/reconnects this is the maximum number
of events you would expect to be redelivered after it restarts
maximum: 65535
minimum: 0
type: integer
reply:
Expand Down Expand Up @@ -39675,7 +39665,6 @@ paths:
in FireFly, so if your application crashes/reconnects this
is the maximum number of events you would expect to be redelivered
after it restarts
maximum: 65535
minimum: 0
type: integer
reply:
Expand Down Expand Up @@ -40009,7 +39998,6 @@ paths:
in FireFly, so if your application crashes/reconnects this
is the maximum number of events you would expect to be redelivered
after it restarts
maximum: 65535
minimum: 0
type: integer
reply:
Expand Down
4 changes: 2 additions & 2 deletions internal/apiserver/ffi2swagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (swg *ffiSwaggerGen) Build(ctx context.Context, api *core.ContractAPI, ffi

func addFFIMethod(ctx context.Context, routes []*ffapi.Route, method *fftypes.FFIMethod, hasLocation bool) []*ffapi.Route {
description := method.Description
if method.Details != nil && len(method.Details) > 0 {
if len(method.Details) > 0 {
additionalDetailsHeader := i18n.Expand(ctx, coremsgs.APISmartContractDetails)
description = fmt.Sprintf("%s\n\n%s:\n\n%s", description, additionalDetailsHeader, buildDetailsTable(ctx, method.Details))
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func addFFIMethod(ctx context.Context, routes []*ffapi.Route, method *fftypes.FF

func addFFIEvent(ctx context.Context, routes []*ffapi.Route, event *fftypes.FFIEvent, hasLocation bool) []*ffapi.Route {
description := event.Description
if event.Details != nil && len(event.Details) > 0 {
if len(event.Details) > 0 {
additionalDetailsHeader := i18n.Expand(ctx, coremsgs.APISmartContractDetails)
description = fmt.Sprintf("%s\n\n%s:\n\n%s", description, additionalDetailsHeader, buildDetailsTable(ctx, event.Details))
}
Expand Down
17 changes: 11 additions & 6 deletions internal/batch/batch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ func NewBatchManager(ctx context.Context, ns string, di database.Plugin, dm data
return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "BatchManager")
}
pCtx, cancelCtx := context.WithCancel(log.WithLogField(ctx, "role", "batchmgr"))
readPageSize := config.GetUint(coreconfig.BatchManagerReadPageSize)
readPageSize := uint16(1)
confReadPageSize := config.GetUint64(coreconfig.BatchManagerReadPageSize)
if confReadPageSize > 0 && confReadPageSize <= 65535 {
readPageSize = uint16(confReadPageSize)
}
bm := &batchManager{
ctx: pCtx,
cancelCtx: cancelCtx,
Expand All @@ -54,7 +58,7 @@ func NewBatchManager(ctx context.Context, ns string, di database.Plugin, dm data
data: dm,
txHelper: txHelper,
readOffset: -1, // On restart we trawl for all ready messages
readPageSize: uint64(readPageSize),
readPageSize: readPageSize,
minimumPollDelay: config.GetDuration(coreconfig.BatchManagerMinimumPollDelay),
messagePollTimeout: config.GetDuration(coreconfig.BatchManagerReadPollTimeout),
startupOffsetRetryAttempts: config.GetInt(coreconfig.OrchestratorStartupAttempts),
Expand Down Expand Up @@ -116,7 +120,7 @@ type batchManager struct {
inflightSequences map[int64]*batchProcessor
inflightFlushed []int64
shoulderTap chan bool
readPageSize uint64
readPageSize uint16
minimumPollDelay time.Duration
messagePollTimeout time.Duration
startupOffsetRetryAttempts int
Expand All @@ -126,7 +130,7 @@ type DispatchHandler func(context.Context, *DispatchPayload) error

type DispatcherOptions struct {
BatchType core.BatchType
BatchMaxSize uint
BatchMaxSize int
BatchMaxBytes int64
BatchTimeout time.Duration
DisposeTimeout time.Duration
Expand Down Expand Up @@ -279,11 +283,11 @@ func (bm *batchManager) readPage(lastPageFull bool) ([]*core.IDAndSequence, bool
// Read a page from the DB
var ids []*core.IDAndSequence
err := bm.retry.Do(bm.ctx, "retrieve messages", func(attempt int) (retry bool, err error) {
fb := database.MessageQueryFactory.NewFilterLimit(bm.ctx, bm.readPageSize)
fb := database.MessageQueryFactory.NewFilterLimit(bm.ctx, uint64(bm.readPageSize))
ids, err = bm.database.GetMessageIDs(bm.ctx, bm.namespace, fb.And(
fb.Gt("sequence", bm.readOffset),
fb.Eq("state", core.MessageStateReady),
).Sort("sequence").Limit(bm.readPageSize))
).Sort("sequence").Limit(uint64(bm.readPageSize)))
return true, err
})

Expand Down Expand Up @@ -549,6 +553,7 @@ func (bm *batchManager) maskContext(ctx context.Context, state *dispatchState, m

// Now we have the nonce, add that at the end of the hash to make it unqiue to this message
nonceBytes := make([]byte, 8)
//nolint:gosec
binary.BigEndian.PutUint64(nonceBytes, uint64(nonce))
hashBuilder.Write(nonceBytes)

Expand Down
1 change: 1 addition & 0 deletions internal/batch/batch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func newTestBatchManager(t *testing.T) (*batchManager, func()) {
cmi := &cachemocks.Manager{}
cmi.On("GetCache", mock.Anything).Return(cache.NewUmanagedCache(ctx, 100, 5*time.Minute), nil)
txHelper, _ := txcommon.NewTransactionHelper(ctx, "ns1", mdi, mdm, cmi)
config.Set(coreconfig.BatchManagerReadPageSize, 0) // will get min value of 1
bm, err := NewBatchManager(context.Background(), "ns1", mdi, mdm, mim, txHelper)
assert.NoError(t, err)
return bm.(*batchManager), bm.(*batchManager).cancelCtx
Expand Down
2 changes: 1 addition & 1 deletion internal/batch/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (bp *batchProcessor) addWork(newWork *batchWork) (full, overflow bool) {
bp.assemblyQueueBytes += newWork.estimateSize()
bp.assemblyQueue = newQueue

full = len(bp.assemblyQueue) >= int(bp.conf.BatchMaxSize) || bp.assemblyQueueBytes >= bp.conf.BatchMaxBytes
full = len(bp.assemblyQueue) >= bp.conf.BatchMaxSize || bp.assemblyQueueBytes >= bp.conf.BatchMaxBytes
overflow = len(bp.assemblyQueue) > 1 && (batchOfOne || bp.assemblyQueueBytes > bp.conf.BatchMaxBytes)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (e *Ethereum) Init(ctx context.Context, cancelCtx context.CancelFunc, conf
e.streamID = make(map[string]string)
e.closed = make(map[string]chan struct{})
e.wsconn = make(map[string]wsclient.WSClient)
e.streams = newStreamManager(e.client, e.cache, e.ethconnectConf.GetUint(EthconnectConfigBatchSize), uint(e.ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds()))
e.streams = newStreamManager(e.client, e.cache, e.ethconnectConf.GetUint(EthconnectConfigBatchSize), e.ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds())

return nil
}
Expand Down
10 changes: 5 additions & 5 deletions internal/blockchain/ethereum/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ type streamManager struct {
client *resty.Client
cache cache.CInterface
batchSize uint
batchTimeout uint
batchTimeout int64
}

type eventStream struct {
ID string `json:"id"`
Name string `json:"name"`
ErrorHandling string `json:"errorHandling"`
BatchSize uint `json:"batchSize"`
BatchTimeoutMS uint `json:"batchTimeoutMS"`
BatchTimeoutMS int64 `json:"batchTimeoutMS"`
Type string `json:"type"`
WebSocket eventStreamWebsocket `json:"websocket"`
Timestamps bool `json:"timestamps"`
Expand All @@ -73,7 +73,7 @@ type subscriptionCheckpoint struct {
Catchup bool `json:"catchup,omitempty"`
}

func newStreamManager(client *resty.Client, cache cache.CInterface, batchSize, batchTimeout uint) *streamManager {
func newStreamManager(client *resty.Client, cache cache.CInterface, batchSize uint, batchTimeout int64) *streamManager {
return &streamManager{
client: client,
cache: cache,
Expand All @@ -93,7 +93,7 @@ func (s *streamManager) getEventStreams(ctx context.Context) (streams []*eventSt
return streams, nil
}

func buildEventStream(topic string, batchSize, batchTimeout uint) *eventStream {
func buildEventStream(topic string, batchSize uint, batchTimeout int64) *eventStream {
return &eventStream{
Name: topic,
ErrorHandling: "block",
Expand All @@ -120,7 +120,7 @@ func (s *streamManager) createEventStream(ctx context.Context, topic string) (*e
return stream, nil
}

func (s *streamManager) updateEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint, eventStreamID string) (*eventStream, error) {
func (s *streamManager) updateEventStream(ctx context.Context, topic string, batchSize uint, batchTimeout int64, eventStreamID string) (*eventStream, error) {
stream := buildEventStream(topic, batchSize, batchTimeout)
res, err := s.client.R().
SetContext(ctx).
Expand Down
8 changes: 4 additions & 4 deletions internal/blockchain/fabric/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ type streamManager struct {
signer string
cache cache.CInterface
batchSize uint
batchTimeoutMS uint
batchTimeoutMS int64
}

type eventStream struct {
ID string `json:"id"`
Name string `json:"name"`
ErrorHandling string `json:"errorHandling"`
BatchSize uint `json:"batchSize"`
BatchTimeoutMS uint `json:"batchTimeoutMS"`
BatchTimeoutMS int64 `json:"batchTimeoutMS"`
Type string `json:"type"`
WebSocket eventStreamWebsocket `json:"websocket"`
Timestamps bool `json:"timestamps"`
Expand All @@ -65,7 +65,7 @@ type eventFilter struct {
EventFilter string `json:"eventFilter"`
}

func newStreamManager(client *resty.Client, signer string, cache cache.CInterface, batchSize, batchTimeout uint) *streamManager {
func newStreamManager(client *resty.Client, signer string, cache cache.CInterface, batchSize uint, batchTimeout int64) *streamManager {
return &streamManager{
client: client,
signer: signer,
Expand All @@ -86,7 +86,7 @@ func (s *streamManager) getEventStreams(ctx context.Context) (streams []*eventSt
return streams, nil
}

func buildEventStream(topic string, batchSize, batchTimeout uint) *eventStream {
func buildEventStream(topic string, batchSize uint, batchTimeout int64) *eventStream {
return &eventStream{
Name: topic,
ErrorHandling: "block",
Expand Down
2 changes: 1 addition & 1 deletion internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (f *Fabric) Init(ctx context.Context, cancelCtx context.CancelFunc, conf co
f.streamID = make(map[string]string)
f.closed = make(map[string]chan struct{})
f.wsconn = make(map[string]wsclient.WSClient)
f.streams = newStreamManager(f.client, f.signer, f.cache, f.fabconnectConf.GetUint(FabconnectConfigBatchSize), uint(f.fabconnectConf.GetDuration(FabconnectConfigBatchTimeout).Milliseconds()))
f.streams = newStreamManager(f.client, f.signer, f.cache, f.fabconnectConf.GetUint(FabconnectConfigBatchSize), f.fabconnectConf.GetDuration(FabconnectConfigBatchTimeout).Milliseconds())

return nil
}
Expand Down
Loading