Skip to content

Commit c6a0260

Browse files
committed
[coordinator-test] Remove the need for a real VC
Signed-off-by: Liran Funaro <liran.funaro@gmail.com>
1 parent 82fa409 commit c6a0260

5 files changed

Lines changed: 192 additions & 83 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ MAKEFLAGS += --jobs=16
132132

133133
ROOT_PKG_REGEXP = github.com/hyperledger/fabric-x-committer
134134
CORE_DB_PACKAGES_REGEXP = ${ROOT_PKG_REGEXP}/service/(vc|query)
135-
REQUIRES_DB_PACKAGES_REGEXP = ${ROOT_PKG_REGEXP}/(service/coordinator|loadgen|cmd|utils/testdb)
135+
REQUIRES_DB_PACKAGES_REGEXP = ${ROOT_PKG_REGEXP}/(loadgen|cmd|utils/testdb)
136136
HEAVY_PACKAGES_REGEXP = ${ROOT_PKG_REGEXP}/(docker|integration)
137137

138138
NON_HEAVY_PACKAGES=$(shell $(go_cmd) list ./... | grep -vE "$(HEAVY_PACKAGES_REGEXP)")

mock/fifo_cache.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func newFifoCache[T any](size int) *fifoCache[T] {
2020
}
2121
}
2222

23-
// addIfNotExist returns true if the entry is unique.
23+
// addIfNotExist returns true if a new entry was added.
2424
// It employs a FIFO eviction policy.
2525
func (c *fifoCache[T]) addIfNotExist(key string, value T) bool {
2626
if _, ok := c.get(key); ok {
@@ -33,6 +33,17 @@ func (c *fifoCache[T]) addIfNotExist(key string, value T) bool {
3333
return true
3434
}
3535

36+
// updateOrAddIfNotExist returns true if a new entry was added.
37+
// It employs a FIFO eviction policy.
38+
func (c *fifoCache[T]) updateOrAddIfNotExist(key string, value T) bool {
39+
if _, ok := c.get(key); ok {
40+
c.cache[key] = value
41+
// We don't modify the eviction order for simplicity.
42+
return false
43+
}
44+
return c.addIfNotExist(key, value)
45+
}
46+
3647
// get returns the value of a key.
3748
// If the key does not exist, it returns false.
3849
func (c *fifoCache[T]) get(key string) (T, bool) {

mock/vcservice.go

Lines changed: 130 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"golang.org/x/sync/errgroup"
1818
"google.golang.org/grpc/health"
1919
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
20+
"google.golang.org/protobuf/proto"
2021
"google.golang.org/protobuf/types/known/emptypb"
2122

2223
"github.com/hyperledger/fabric-x-committer/api/servicepb"
@@ -32,27 +33,45 @@ type (
3233
VcService struct {
3334
servicepb.ValidationAndCommitServiceServer
3435
streamStateManager[VCStreamState]
35-
nextBlock atomic.Pointer[servicepb.BlockRef]
36-
txsStatus *fifoCache[*committerpb.TxStatus]
37-
txsStatusMu sync.Mutex
38-
healthcheck *health.Server
36+
nextBlock atomic.Pointer[servicepb.BlockRef]
37+
txsStatus *fifoCache[*committerpb.TxStatus]
38+
worldState *fifoCache[*WorldState]
39+
worldStateMu sync.Mutex
40+
healthcheck *health.Server
3941
// NumBatchesReceived is the number of batches received by VcService.
4042
NumBatchesReceived atomic.Uint32
4143
// MockFaultyNodeDropSize allows mocking a faulty node by dropping some TXs.
4244
MockFaultyNodeDropSize int
45+
// FullMVCC forces the mock VC to perform full MVCC validation.
46+
FullMVCC atomic.Bool
4347
}
4448

4549
// VCStreamState holds the stream's batch queue.
4650
VCStreamState struct {
4751
StreamInfo
4852
q chan *servicepb.VcBatch
4953
}
54+
55+
// WorldState describe the world-state of a key.
56+
WorldState struct {
57+
Namespace string
58+
Key []byte
59+
Value []byte
60+
Version uint64
61+
}
62+
63+
read struct {
64+
ns string
65+
key []byte
66+
version *uint64
67+
}
5068
)
5169

5270
// NewMockVcService returns a new VcService.
5371
func NewMockVcService() *VcService {
5472
return &VcService{
5573
txsStatus: newFifoCache[*committerpb.TxStatus](defaultTxStatusStorageSize),
74+
worldState: newFifoCache[*WorldState](defaultTxStatusStorageSize),
5675
healthcheck: serve.DefaultHealthCheckService(),
5776
}
5877
}
@@ -103,8 +122,8 @@ func (v *VcService) GetTransactionsStatus(
103122
query *committerpb.TxIDsBatch,
104123
) (*committerpb.TxStatusBatch, error) {
105124
s := &committerpb.TxStatusBatch{Status: make([]*committerpb.TxStatus, len(query.TxIds))}
106-
v.txsStatusMu.Lock()
107-
defer v.txsStatusMu.Unlock()
125+
v.worldStateMu.Lock()
126+
defer v.worldStateMu.Unlock()
108127
for i, id := range query.TxIds {
109128
if status, ok := v.txsStatus.get(id); ok {
110129
s.Status[i] = status
@@ -182,21 +201,28 @@ func (v *VcService) sendTransactionStatus(
182201
txsStatus := &committerpb.TxStatusBatch{
183202
Status: make([]*committerpb.TxStatus, 0, len(txBatch.Transactions)),
184203
}
185-
v.txsStatusMu.Lock()
204+
v.worldStateMu.Lock()
186205
for i, tx := range txBatch.Transactions {
187206
if i < v.MockFaultyNodeDropSize {
188207
// We simulate a faulty node by not responding to the first X TXs.
189208
continue
190209
}
191-
code := committerpb.Status_COMMITTED
192-
if tx.PrelimInvalidTxStatus != nil {
193-
code = *tx.PrelimInvalidTxStatus
210+
211+
code := v.validate(tx)
212+
if code == committerpb.Status_COMMITTED {
213+
for _, w := range getWrites(tx) {
214+
key := worldStateKey(w.Namespace, w.Key)
215+
if val, valExist := v.worldState.get(key); valExist {
216+
w.Version = val.Version + 1
217+
}
218+
v.worldState.updateOrAddIfNotExist(key, w)
219+
}
194220
}
195221
s := committerpb.NewTxStatusFromRef(tx.Ref, code)
196222
txsStatus.Status = append(txsStatus.Status, s)
197223
v.txsStatus.addIfNotExist(tx.Ref.TxId, s)
198224
}
199-
v.txsStatusMu.Unlock()
225+
v.worldStateMu.Unlock()
200226

201227
if err := stream.Send(txsStatus); err != nil {
202228
return errors.Wrap(err, "error sending transaction status")
@@ -218,3 +244,96 @@ func (v *VcService) SubmitTransactions(ctx context.Context, txsBatch *servicepb.
218244
channel.NewWriter(ctx, s.q).Write(txsBatch)
219245
return nil
220246
}
247+
248+
// GetKeys returns the WorldState of the given keys.
249+
func (v *VcService) GetKeys(nsID string, keys ...[]byte) []*WorldState {
250+
res := make([]*WorldState, 0, len(keys))
251+
v.worldStateMu.Lock()
252+
defer v.worldStateMu.Unlock()
253+
for _, k := range keys {
254+
if val, ok := v.worldState.get(worldStateKey(nsID, k)); ok {
255+
res = append(res, val)
256+
}
257+
}
258+
return res
259+
}
260+
261+
func (v *VcService) validate(tx *servicepb.VcTx) committerpb.Status {
262+
code := committerpb.Status_COMMITTED
263+
if tx.PrelimInvalidTxStatus != nil {
264+
return *tx.PrelimInvalidTxStatus
265+
}
266+
267+
if !v.FullMVCC.Load() {
268+
return code
269+
}
270+
271+
existingStatus, txIDExist := v.txsStatus.get(tx.Ref.TxId)
272+
if txIDExist {
273+
if proto.Equal(existingStatus.Ref, tx.Ref) {
274+
return existingStatus.Status
275+
}
276+
return committerpb.Status_REJECTED_DUPLICATE_TX_ID
277+
}
278+
279+
for _, r := range getReads(tx) {
280+
key := worldStateKey(r.ns, r.key)
281+
val, valExist := v.worldState.get(key)
282+
if (r.version == nil && valExist) || (r.version != nil && (!valExist || *r.version != val.Version)) {
283+
return committerpb.Status_ABORTED_MVCC_CONFLICT
284+
}
285+
}
286+
287+
return code
288+
}
289+
290+
func getReads(tx *servicepb.VcTx) (reads []read) {
291+
for _, ns := range tx.Namespaces {
292+
if ns.NsId != committerpb.MetaNamespaceID && ns.NsId != committerpb.ConfigNamespaceID {
293+
reads = append(reads, read{
294+
ns: committerpb.MetaNamespaceID,
295+
key: []byte(ns.NsId),
296+
version: &ns.NsVersion,
297+
})
298+
}
299+
for _, r := range ns.ReadsOnly {
300+
reads = append(reads, read{
301+
ns: ns.NsId,
302+
key: r.Key,
303+
version: r.Version,
304+
})
305+
}
306+
for _, rw := range ns.ReadWrites {
307+
reads = append(reads, read{
308+
ns: ns.NsId,
309+
key: rw.Key,
310+
version: rw.Version,
311+
})
312+
}
313+
}
314+
return reads
315+
}
316+
317+
func getWrites(tx *servicepb.VcTx) (writes []*WorldState) {
318+
for _, ns := range tx.Namespaces {
319+
for _, rw := range ns.ReadWrites {
320+
writes = append(writes, &WorldState{
321+
Namespace: ns.NsId,
322+
Key: rw.Key,
323+
Value: rw.Value,
324+
})
325+
}
326+
for _, w := range ns.BlindWrites {
327+
writes = append(writes, &WorldState{
328+
Namespace: ns.NsId,
329+
Key: w.Key,
330+
Value: w.Value,
331+
})
332+
}
333+
}
334+
return writes
335+
}
336+
337+
func worldStateKey(nsID string, key []byte) string {
338+
return nsID + "$" + string(key)
339+
}

0 commit comments

Comments
 (0)