Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ __debug*
!deploy/charts/firefly
containerlogs
.vscode/*.log
.idea
doc-site/site
.idea/
doc-site/site
*.iml
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ GOGC=30

all: build test go-mod-tidy
test: deps lint
$(VGO) test ./internal/... ./pkg/... ./cmd/... ./doc-site ./ffconfig/... -cover -coverprofile=coverage.txt -covermode=atomic -timeout=30s ${TEST_ARGS}
$(VGO) test ./internal/... ./pkg/... ./cmd/... ./doc-site ./ffconfig/... -cover -coverprofile=coverage.txt -covermode=atomic -timeout=45s ${TEST_ARGS}
coverage.html:
$(VGO) tool cover -html=coverage.txt
coverage: test coverage.html
Expand Down
208 changes: 202 additions & 6 deletions go.work.sum

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ func TestGetSubscriptionEventsFilteredNoSequenceIDsProvided(t *testing.T) {

r.ServeHTTP(res, req)
assert.Equal(t, 200, res.Result().StatusCode)
}
}
3 changes: 3 additions & 0 deletions internal/coremsgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,4 +318,7 @@ var (
MsgDuplicateContractListenerFilterLocation = ffe("FF10477", "Duplicate filter provided for contract listener for location", 400)
MsgInvalidNamespaceForOperationUpdate = ffe("FF10478", "Received different namespace for operation update '%s' than expected for manager '%s'")
MsgEmptyPluginForOperationUpdate = ffe("FF10479", "Received empty plugin for operation update '%s'")
MsgInvalidIdentityPatch = ffe("FF10480", "A profile must be provided when updating an identity", 400)
MsgNodeNotProvidedForCheck = ffe("FF10481", "Node not provided for check", 500)
MsgNodeMissingProfile = ffe("FF10482", "Node provided for check does not have a profile", 500)
)
5 changes: 2 additions & 3 deletions internal/database/sqlcommon/event_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestGetEventsInSequenceRangeE2EWithDB(t *testing.T) {
Type: core.EventTypeMessageConfirmed,
Reference: fftypes.NewUUID(),
Correlator: fftypes.NewUUID(),
Topic: fmt.Sprintf("topic%d", i % 2),
Topic: fmt.Sprintf("topic%d", i%2),
Created: fftypes.Now(),
}
err := s.InsertEvent(ctx, event)
Expand Down Expand Up @@ -322,10 +322,9 @@ func TestGetEventsInSequenceRangeBuildQueryFail(t *testing.T) {

func TestGetEventsInSequenceRangeShouldCallGetEventsWhenNoSequencedProvidedAndThrowAnError(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id", }).AddRow("only one"))
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("only one"))
f := database.EventQueryFactory.NewFilter(context.Background()).And()
_, _, err := s.GetEventsInSequenceRange(context.Background(), "ns1", f, -1, -1)
assert.NotNil(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}

105 changes: 104 additions & 1 deletion internal/dataexchange/ffdx/ffdx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ package ffdx

import (
"context"
"crypto/x509"
"encoding/json"
"encoding/pem"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"

"github.com/hyperledger/firefly/internal/metrics"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-common/pkg/config"
Expand Down Expand Up @@ -54,6 +60,8 @@ type FFDX struct {
retry *retry.Retry
backgroundStart bool
backgroundRetry *retry.Retry

metrics metrics.Manager // optional
}

type dxNode struct {
Expand Down Expand Up @@ -168,7 +176,7 @@ func (h *FFDX) Name() string {
return "ffdx"
}

func (h *FFDX) Init(ctx context.Context, cancelCtx context.CancelFunc, config config.Section) (err error) {
func (h *FFDX) Init(ctx context.Context, cancelCtx context.CancelFunc, config config.Section, metrics metrics.Manager) (err error) {
h.ctx = log.WithLogField(ctx, "dx", "https")
h.cancelCtx = cancelCtx
h.ackChannel = make(chan *ack)
Expand All @@ -179,6 +187,7 @@ func (h *FFDX) Init(ctx context.Context, cancelCtx context.CancelFunc, config co
}
h.needsInit = config.GetBool(DataExchangeInitEnabled)
h.nodes = make(map[string]*dxNode)
h.metrics = metrics

if config.GetString(ffresty.HTTPConfigURL) == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "dataexchange.ffdx")
Expand Down Expand Up @@ -295,6 +304,11 @@ func (h *FFDX) beforeConnect(ctx context.Context, w wsclient.WSClient) error {
return fmt.Errorf("DX returned non-ready status: %s", status.Status)
}
}

for _, cb := range h.callbacks.handlers {
cb.DXConnect(h)
}

h.initialized = true
return nil
}
Expand Down Expand Up @@ -448,6 +462,95 @@ func (h *FFDX) TransferBlob(ctx context.Context, nsOpID string, peer, sender fft
return nil
}

func (h *FFDX) CheckNodeIdentityStatus(ctx context.Context, node *core.Identity) error {
if err := h.checkInitialized(ctx); err != nil {
return err
}

if node == nil {
return i18n.NewError(ctx, coremsgs.MsgNodeNotProvidedForCheck)
}

var mismatchState = metrics.NodeIdentityDXCertMismatchStatusUnknown
defer func() {
if h.metrics != nil && h.metrics.IsMetricsEnabled() {
h.metrics.NodeIdentityDXCertMismatch(node.Namespace, mismatchState)
}
log.L(ctx).Debugf("Identity status checked against DX node='%s' mismatch_state='%s'", node.Name, mismatchState)
}()

dxPeer, err := h.GetEndpointInfo(ctx, node.Name) // should be the same as the local node
if err != nil {
return err
}

dxPeerCert := dxPeer.GetString("cert")
// if this occurs, it is either a misconfigured / broken DX or likely a DX that is compatible from an API perspective
// but does not have the same peer info as the HTTPS mTLS DX
if dxPeerCert == "" {
log.L(ctx).Debugf("DX peer does not have a 'cert', DX plugin may be unsupported")
return nil
}

expiry, err := extractSoonestExpiryFromCertBundle(strings.ReplaceAll(dxPeerCert, `\n`, "\n"))
if err == nil {
if expiry.Before(time.Now()) {
log.L(ctx).Warnf("DX certificate for node '%s' has expired", node.Name)
}

if h.metrics != nil && h.metrics.IsMetricsEnabled() {
h.metrics.NodeIdentityDXCertExpiry(node.Namespace, expiry)
}
} else {
log.L(ctx).Errorf("Failed to find x509 cert within DX cert bundle node='%s' namespace='%s'", node.Name, node.Namespace)
}

if node.Profile == nil {
return i18n.NewError(ctx, coremsgs.MsgNodeNotProvidedForCheck)
}

nodeCert := node.Profile.GetString("cert")
if nodeCert != "" {
mismatchState = metrics.NodeIdentityDXCertMismatchStatusHealthy
if dxPeerCert != nodeCert {
log.L(ctx).Warnf("DX certificate for node '%s' is out-of-sync with on-chain identity", node.Name)
mismatchState = metrics.NodeIdentityDXCertMismatchStatusMismatched
}
}

return nil
}

// We assume the cert with the soonest expiry is the leaf cert, but even if its the CA,
// that's what will invalidate the leaf anyways, so really we only care about the soonest expiry.
// So we loop through the bundle finding the soonest expiry, not necessarily the leaf.
func extractSoonestExpiryFromCertBundle(certBundle string) (time.Time, error) {
var expiringCert *x509.Certificate
var block *pem.Block
var rest = []byte(certBundle)

for {
block, rest = pem.Decode(rest)
if block == nil {
break
}

cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return time.Time{}, fmt.Errorf("failed to parse non-certificate within bundle: %v", err)
}
if expiringCert == nil || cert.NotAfter.Before(expiringCert.NotAfter) {
expiringCert = cert
}
}

if expiringCert == nil {
return time.Time{}, errors.New("no valid certificate found")
}

return expiringCert.NotAfter.UTC(), nil
}

func (h *FFDX) ackLoop() {
for {
select {
Expand Down
Loading