@@ -17,6 +17,7 @@ import (
1717 "golang.org/x/sync/errgroup"
1818 "google.golang.org/grpc/health"
1919 healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
20+ "google.golang.org/protobuf/proto"
2021 "google.golang.org/protobuf/types/known/emptypb"
2122
2223 "github.com/hyperledger/fabric-x-committer/api/servicepb"
@@ -32,27 +33,45 @@ type (
3233 VcService struct {
3334 servicepb.ValidationAndCommitServiceServer
3435 streamStateManager [VCStreamState ]
35- nextBlock atomic.Pointer [servicepb.BlockRef ]
36- txsStatus * fifoCache [* committerpb.TxStatus ]
37- txsStatusMu sync.Mutex
38- healthcheck * health.Server
36+ nextBlock atomic.Pointer [servicepb.BlockRef ]
37+ txsStatus * fifoCache [* committerpb.TxStatus ]
38+ worldState * fifoCache [* WorldState ]
39+ worldStateMu sync.Mutex
40+ healthcheck * health.Server
3941 // NumBatchesReceived is the number of batches received by VcService.
4042 NumBatchesReceived atomic.Uint32
4143 // MockFaultyNodeDropSize allows mocking a faulty node by dropping some TXs.
4244 MockFaultyNodeDropSize int
45+ // FullMVCC forces the mock VC to perform full MVCC validation.
46+ FullMVCC atomic.Bool
4347 }
4448
4549 // VCStreamState holds the stream's batch queue.
4650 VCStreamState struct {
4751 StreamInfo
4852 q chan * servicepb.VcBatch
4953 }
54+
55+ // WorldState describe the world-state of a key.
56+ WorldState struct {
57+ Namespace string
58+ Key []byte
59+ Value []byte
60+ Version uint64
61+ }
62+
63+ read struct {
64+ ns string
65+ key []byte
66+ version * uint64
67+ }
5068)
5169
5270// NewMockVcService returns a new VcService.
5371func NewMockVcService () * VcService {
5472 return & VcService {
5573 txsStatus : newFifoCache [* committerpb.TxStatus ](defaultTxStatusStorageSize ),
74+ worldState : newFifoCache [* WorldState ](defaultTxStatusStorageSize ),
5675 healthcheck : serve .DefaultHealthCheckService (),
5776 }
5877}
@@ -103,8 +122,8 @@ func (v *VcService) GetTransactionsStatus(
103122 query * committerpb.TxIDsBatch ,
104123) (* committerpb.TxStatusBatch , error ) {
105124 s := & committerpb.TxStatusBatch {Status : make ([]* committerpb.TxStatus , len (query .TxIds ))}
106- v .txsStatusMu .Lock ()
107- defer v .txsStatusMu .Unlock ()
125+ v .worldStateMu .Lock ()
126+ defer v .worldStateMu .Unlock ()
108127 for i , id := range query .TxIds {
109128 if status , ok := v .txsStatus .get (id ); ok {
110129 s .Status [i ] = status
@@ -182,21 +201,28 @@ func (v *VcService) sendTransactionStatus(
182201 txsStatus := & committerpb.TxStatusBatch {
183202 Status : make ([]* committerpb.TxStatus , 0 , len (txBatch .Transactions )),
184203 }
185- v .txsStatusMu .Lock ()
204+ v .worldStateMu .Lock ()
186205 for i , tx := range txBatch .Transactions {
187206 if i < v .MockFaultyNodeDropSize {
188207 // We simulate a faulty node by not responding to the first X TXs.
189208 continue
190209 }
191- code := committerpb .Status_COMMITTED
192- if tx .PrelimInvalidTxStatus != nil {
193- code = * tx .PrelimInvalidTxStatus
210+
211+ code := v .validate (tx )
212+ if code == committerpb .Status_COMMITTED {
213+ for _ , w := range getWrites (tx ) {
214+ key := worldStateKey (w .Namespace , w .Key )
215+ if val , valExist := v .worldState .get (key ); valExist {
216+ w .Version = val .Version + 1
217+ }
218+ v .worldState .updateOrAddIfNotExist (key , w )
219+ }
194220 }
195221 s := committerpb .NewTxStatusFromRef (tx .Ref , code )
196222 txsStatus .Status = append (txsStatus .Status , s )
197223 v .txsStatus .addIfNotExist (tx .Ref .TxId , s )
198224 }
199- v .txsStatusMu .Unlock ()
225+ v .worldStateMu .Unlock ()
200226
201227 if err := stream .Send (txsStatus ); err != nil {
202228 return errors .Wrap (err , "error sending transaction status" )
@@ -218,3 +244,96 @@ func (v *VcService) SubmitTransactions(ctx context.Context, txsBatch *servicepb.
218244 channel .NewWriter (ctx , s .q ).Write (txsBatch )
219245 return nil
220246}
247+
248+ // GetKeys returns the WorldState of the given keys.
249+ func (v * VcService ) GetKeys (nsID string , keys ... []byte ) []* WorldState {
250+ res := make ([]* WorldState , 0 , len (keys ))
251+ v .worldStateMu .Lock ()
252+ defer v .worldStateMu .Unlock ()
253+ for _ , k := range keys {
254+ if val , ok := v .worldState .get (worldStateKey (nsID , k )); ok {
255+ res = append (res , val )
256+ }
257+ }
258+ return res
259+ }
260+
261+ func (v * VcService ) validate (tx * servicepb.VcTx ) committerpb.Status {
262+ code := committerpb .Status_COMMITTED
263+ if tx .PrelimInvalidTxStatus != nil {
264+ return * tx .PrelimInvalidTxStatus
265+ }
266+
267+ if ! v .FullMVCC .Load () {
268+ return code
269+ }
270+
271+ existingStatus , txIDExist := v .txsStatus .get (tx .Ref .TxId )
272+ if txIDExist {
273+ if proto .Equal (existingStatus .Ref , tx .Ref ) {
274+ return existingStatus .Status
275+ }
276+ return committerpb .Status_REJECTED_DUPLICATE_TX_ID
277+ }
278+
279+ for _ , r := range getReads (tx ) {
280+ key := worldStateKey (r .ns , r .key )
281+ val , valExist := v .worldState .get (key )
282+ if (r .version == nil && valExist ) || (r .version != nil && (! valExist || * r .version != val .Version )) {
283+ return committerpb .Status_ABORTED_MVCC_CONFLICT
284+ }
285+ }
286+
287+ return code
288+ }
289+
290+ func getReads (tx * servicepb.VcTx ) (reads []read ) {
291+ for _ , ns := range tx .Namespaces {
292+ if ns .NsId != committerpb .MetaNamespaceID && ns .NsId != committerpb .ConfigNamespaceID {
293+ reads = append (reads , read {
294+ ns : committerpb .MetaNamespaceID ,
295+ key : []byte (ns .NsId ),
296+ version : & ns .NsVersion ,
297+ })
298+ }
299+ for _ , r := range ns .ReadsOnly {
300+ reads = append (reads , read {
301+ ns : ns .NsId ,
302+ key : r .Key ,
303+ version : r .Version ,
304+ })
305+ }
306+ for _ , rw := range ns .ReadWrites {
307+ reads = append (reads , read {
308+ ns : ns .NsId ,
309+ key : rw .Key ,
310+ version : rw .Version ,
311+ })
312+ }
313+ }
314+ return reads
315+ }
316+
317+ func getWrites (tx * servicepb.VcTx ) (writes []* WorldState ) {
318+ for _ , ns := range tx .Namespaces {
319+ for _ , rw := range ns .ReadWrites {
320+ writes = append (writes , & WorldState {
321+ Namespace : ns .NsId ,
322+ Key : rw .Key ,
323+ Value : rw .Value ,
324+ })
325+ }
326+ for _ , w := range ns .BlindWrites {
327+ writes = append (writes , & WorldState {
328+ Namespace : ns .NsId ,
329+ Key : w .Key ,
330+ Value : w .Value ,
331+ })
332+ }
333+ }
334+ return writes
335+ }
336+
337+ func worldStateKey (nsID string , key []byte ) string {
338+ return nsID + "$" + string (key )
339+ }
0 commit comments