-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy pathauditor.go
388 lines (338 loc) · 13.2 KB
/
auditor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package ttx
import (
"context"
"encoding/base64"
"time"
view2 "github.com/hyperledger-labs/fabric-smart-client/platform/view"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/hash"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
"github.com/hyperledger-labs/fabric-token-sdk/token"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/auditdb"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/auditor"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/db/driver"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/tokens"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/ttxdb"
session2 "github.com/hyperledger-labs/fabric-token-sdk/token/services/utils/json/session"
view3 "github.com/hyperledger-labs/fabric-token-sdk/token/services/utils/view"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap/zapcore"
)
type TxAuditor struct {
w *token.AuditorWallet
auditor *auditor.Auditor
auditDB *auditdb.DB
transactionInfoProvider *TransactionInfoProvider
}
func NewAuditor(sp token.ServiceProvider, w *token.AuditorWallet) (*TxAuditor, error) {
backend := auditor.New(sp, w)
auditDB, err := auditdb.GetByTMSId(sp, w.TMS().ID())
if err != nil {
return nil, err
}
ttxDB, err := ttxdb.GetByTMSId(sp, w.TMS().ID())
if err != nil {
return nil, err
}
return NewTxAuditor(w, backend, auditDB, ttxDB), nil
}
func NewTxAuditor(w *token.AuditorWallet, backend *auditor.Auditor, auditDB *auditdb.DB, ttxDB *ttxdb.DB) *TxAuditor {
return &TxAuditor{
w: w,
auditor: backend,
auditDB: auditDB,
transactionInfoProvider: newTransactionInfoProvider(w.TMS(), ttxDB),
}
}
func (a *TxAuditor) Validate(tx *Transaction) error {
return a.auditor.Validate(tx.Context, tx.TokenRequest)
}
func (a *TxAuditor) Audit(tx *Transaction) (*token.InputStream, *token.OutputStream, error) {
return a.auditor.Audit(tx)
}
// Release unlocks the passed enrollment IDs.
func (a *TxAuditor) Release(tx *Transaction) {
a.auditor.Release(tx)
}
// Transactions returns an iterator of transaction records filtered by the given params.
func (a *TxAuditor) Transactions(params QueryTransactionsParams) (driver.TransactionIterator, error) {
return a.auditDB.Transactions(params)
}
// NewPaymentsFilter returns a programmable filter over the payments sent or received by enrollment IDs.
func (a *TxAuditor) NewPaymentsFilter() *auditdb.PaymentsFilter {
return a.auditDB.NewPaymentsFilter()
}
// NewHoldingsFilter returns a programmable filter over the holdings owned by enrollment IDs.
func (a *TxAuditor) NewHoldingsFilter() *auditdb.HoldingsFilter {
return a.auditDB.NewHoldingsFilter()
}
// SetStatus sets the status of the audit records with the passed transaction id to the passed status
func (a *TxAuditor) SetStatus(ctx context.Context, txID string, status driver.TxStatus, message string) error {
return a.auditDB.SetStatus(ctx, txID, status, message)
}
func (a *TxAuditor) GetTokenRequest(txID string) ([]byte, error) {
return a.auditor.GetTokenRequest(txID)
}
func (a *TxAuditor) Check(context context.Context) ([]string, error) {
return a.auditor.Check(context)
}
type RegisterAuditorView struct {
TMSID token.TMSID
AuditView view.View
}
func NewRegisterAuditorView(auditView view.View, opts ...token.ServiceOption) *RegisterAuditorView {
options, err := token.CompileServiceOptions(opts...)
if err != nil {
return nil
}
return &RegisterAuditorView{
AuditView: auditView,
TMSID: options.TMSID(),
}
}
func (r *RegisterAuditorView) Call(context view.Context) (interface{}, error) {
// register responder
if err := view2.GetRegistry(context).RegisterResponder(r.AuditView, &AuditingViewInitiator{}); err != nil {
return nil, errors.Wrapf(err, "failed to register auditor view")
}
return nil, nil
}
type AuditingViewInitiator struct {
tx *Transaction
local bool
}
func newAuditingViewInitiator(tx *Transaction, local bool) *AuditingViewInitiator {
return &AuditingViewInitiator{tx: tx, local: local}
}
func (a *AuditingViewInitiator) Call(context view.Context) (interface{}, error) {
span := trace.SpanFromContext(context.Context())
var err error
var session view.Session
span.AddEvent("start_session")
if a.local {
session, err = a.startLocal(context)
} else {
session, err = a.startRemote(context)
}
if err != nil {
return nil, errors.WithMessage(err, "failed starting auditing session")
}
// Receive signature
logger.Debugf("Receiving signature for [%s]", a.tx.ID())
span.AddEvent("start_receiving")
jsonSession := session2.NewFromSession(context, session)
signature, err := jsonSession.ReceiveRawWithTimeout(time.Minute)
if err != nil {
span.RecordError(err)
return nil, errors.WithMessage(err, "failed to read audit event")
}
if len(signature) == 0 {
return nil, errors.New("Empty signature data.")
}
span.AddEvent("received_message")
logger.Debugf("reply received from %s", a.tx.Opts.Auditor)
// Check signature
signed, err := a.tx.MarshallToAudit()
if err != nil {
return nil, errors.Wrapf(err, "failed marshalling message to sign")
}
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("Verifying auditor signature on [%s][%s][%s]", a.tx.Opts.Auditor.UniqueID(), hash.Hashable(signed).String(), a.tx.ID())
}
validAuditing := false
span.AddEvent("validate_auditing")
for _, auditorID := range a.tx.TokenService().PublicParametersManager().PublicParameters().Auditors() {
v, err := a.tx.TokenService().SigService().AuditorVerifier(auditorID)
if err != nil {
logger.Debugf("failed to get auditor verifier for [%s]", auditorID)
continue
}
span.AddEvent("verify_auditor_signature")
if err := v.Verify(signed, signature); err != nil {
logger.Errorf("failed verifying auditor signature [%s][%s][%s]", auditorID, hash.Hashable(signed).String(), a.tx.TokenRequest.Anchor)
} else {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("auditor signature verified [%s][%s][%s]", auditorID, base64.StdEncoding.EncodeToString(signature), hash.Hashable(signed))
}
validAuditing = true
break
}
}
if !validAuditing {
return nil, errors.Errorf("failed verifying auditor signature [%s][%s]", hash.Hashable(signed).String(), a.tx.TokenRequest.Anchor)
}
span.AddEvent("append_auditor_signature")
a.tx.TokenRequest.AddAuditorSignature(signature)
logger.Debug("auditor signature verified")
return session, nil
}
func (a *AuditingViewInitiator) startRemote(context view.Context) (view.Session, error) {
logger.Debugf("Starting remote auditing session with [%s] for [%s]", a.tx.Opts.Auditor.UniqueID(), a.tx.ID())
session, err := context.GetSession(a, a.tx.Opts.Auditor)
if err != nil {
return nil, errors.Wrap(err, "failed getting session")
}
// Send transaction
txRaw, err := a.tx.Bytes()
if err != nil {
return nil, err
}
err = session.SendWithContext(context.Context(), txRaw)
if err != nil {
return nil, errors.Wrap(err, "failed sending transaction")
}
return session, nil
}
func (a *AuditingViewInitiator) startLocal(context view.Context) (view.Session, error) {
logger.Debugf("Starting local auditing for %s", a.tx.ID())
// This code is executed everytime the auditor is the same as the
// initiator of a token transaction.
// For example, if an issuer is also an auditor, then when the issuer asks
// for auditing, the issuer is essentially talking to itself.
// FSC does not yet support opening communication session to itself,
// therefore we create a fake bidirectional communication channel between
// the AuditingViewInitiator view and its registered responder.
// Notice also the use of view.AsResponder(right) to run the responder
// using a predefined session.
// This code can be removed once FSC supports opening communication session to self.
// Prepare a bidirectional channel
// Give to the responder view the right channel, and keep for
// AuditingViewInitiator the left channel.
biChannel, err := NewLocalBidirectionalChannel("", context.ID(), "", nil)
if err != nil {
return nil, errors.Wrap(err, "failed creating session")
}
left := biChannel.LeftSession()
right := biChannel.RightSession()
// Send transaction
txRaw, err := a.tx.Bytes()
if err != nil {
return nil, err
}
err = left.SendWithContext(context.Context(), txRaw)
if err != nil {
return nil, errors.Wrap(err, "failed sending transaction")
}
// execute the auditor responder using the fake communication session
responderView, err := view2.GetRegistry(context).GetResponder(&AuditingViewInitiator{})
if err != nil {
return nil, errors.Wrapf(err, "failed to get auditor view")
}
// Run the view in a new goroutine
view3.RunView(logger, context, responderView, view.AsResponder(right))
return left, nil
}
type AuditApproveView struct {
w *token.AuditorWallet
tx *Transaction
}
func NewAuditApproveView(w *token.AuditorWallet, tx *Transaction) *AuditApproveView {
return &AuditApproveView{w: w, tx: tx}
}
func (a *AuditApproveView) Call(context view.Context) (interface{}, error) {
span := trace.SpanFromContext(context.Context())
span.AddEvent("start_audit_approve_view")
defer span.AddEvent("end_audit_approve_view")
// Append audit records
if err := auditor.New(context, a.w).Append(a.tx); err != nil {
return nil, errors.Wrapf(err, "failed appending audit records for transaction %s", a.tx.ID())
}
if err := a.signAndSendBack(context); err != nil {
return nil, err
}
// cache the token request into the tokens db
t, err := tokens.GetService(context, a.tx.TMSID())
if err != nil {
return nil, errors.Wrapf(err, "failed to get tokens db for [%s]", a.tx.TMSID())
}
if err := t.CacheRequest(a.tx.TMSID(), a.tx.TokenRequest); err != nil {
logger.Warnf("failed to cache token request [%s], this might cause delay, investigate when possible: [%s]", a.tx.TokenRequest.Anchor, err)
}
labels := []string{
"network", a.tx.Network(),
"channel", a.tx.Channel(),
"namespace", a.tx.Namespace(),
}
GetMetrics(context).AuditApprovedTransactions.With(labels...).Add(1)
return nil, nil
}
func (a *AuditApproveView) signAndSendBack(context view.Context) error {
span := trace.SpanFromContext(context.Context())
logger.Debugf("Signing and sending back transaction... [%s]", a.tx.ID())
// Sign
aid, err := a.w.GetAuditorIdentity()
if err != nil {
return errors.WithMessagef(err, "failed getting auditor identity for node [%s]", context.Me())
}
signer, err := a.w.GetSigner(aid)
if err != nil {
return errors.WithMessagef(err, "failed getting signing identity for auditor identity [%s]", aid)
}
raw, err := a.tx.MarshallToAudit()
if err != nil {
return errors.Wrapf(err, "failed marshalling tx [%s] to audit", a.tx.ID())
}
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("Audit Approve [%s][%s][%s]", aid.UniqueID(), hash.Hashable(raw).String(), a.tx.TokenRequest.Anchor)
}
span.AddEvent("sign_tx")
sigma, err := signer.Sign(raw)
if err != nil {
return errors.Wrapf(err, "failed sign audit message for tx [%s]", a.tx.ID())
}
logger.Debug("auditor sending sigma back", hash.Hashable(sigma))
session := context.Session()
span.AddEvent("send_back_tx")
if err := session.Send(sigma); err != nil {
return errors.WithMessagef(err, "failed sending back auditor signature")
}
logger.Debugf("Signing and sending back transaction...done [%s]", a.tx.ID())
span.AddEvent("wait_envelope")
if err := a.waitEnvelope(context); err != nil {
return errors.WithMessagef(err, "failed obtaining auditor signature")
}
return nil
}
func (a *AuditApproveView) waitEnvelope(context view.Context) error {
span := trace.SpanFromContext(context.Context())
logger.Debugf("Waiting for envelope... [%s]", a.tx.ID())
tx, err := ReceiveTransaction(context, WithNoTransactionVerification())
if err != nil {
return errors.Wrapf(err, "failed to receive transaction with network envelope")
}
logger.Debugf("Waiting for envelope...transaction received[%s]", a.tx.ID())
// Processes
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("Processes envelope...")
}
if tx.Payload == nil {
return errors.Errorf("expected transaction payload not found")
}
// Ack for distribution
// Send the signature back
var sigma []byte
logger.Debugf("auditor signing ack response [%s] with identity [%s]", hash.Hashable(tx.FromRaw), view2.GetIdentityProvider(context).DefaultIdentity())
signer, err := view2.GetSigService(context).GetSigner(view2.GetIdentityProvider(context).DefaultIdentity())
if err != nil {
return errors.WithMessagef(err, "failed getting signing identity for [%s]", view2.GetIdentityProvider(context).DefaultIdentity())
}
span.AddEvent("sign_ack")
sigma, err = signer.Sign(tx.FromRaw)
if err != nil {
return errors.WithMessage(err, "failed to sign ack response")
}
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("ack response: [%s] from [%s]", hash.Hashable(sigma), view2.GetIdentityProvider(context).DefaultIdentity())
}
session := context.Session()
span.AddEvent("send_back_ack")
if err := session.Send(sigma); err != nil {
return errors.WithMessage(err, "failed sending ack")
}
logger.Debugf("Waiting for envelope...done [%s]", a.tx.ID())
return nil
}