@@ -17,6 +17,7 @@ import (
1717 "golang.org/x/sync/errgroup"
1818 "google.golang.org/grpc/health"
1919 healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
20+ "google.golang.org/protobuf/proto"
2021 "google.golang.org/protobuf/types/known/emptypb"
2122
2223 "github.com/hyperledger/fabric-x-committer/api/servicepb"
@@ -32,27 +33,45 @@ type (
3233 VcService struct {
3334 servicepb.ValidationAndCommitServiceServer
3435 streamStateManager [VCStreamState ]
35- nextBlock atomic.Pointer [servicepb.BlockRef ]
36- txsStatus * fifoCache [* committerpb.TxStatus ]
37- txsStatusMu sync.Mutex
38- healthcheck * health.Server
36+ nextBlock atomic.Pointer [servicepb.BlockRef ]
37+ txsStatus * fifoCache [* committerpb.TxStatus ]
38+ worldState * fifoCache [* WorldState ]
39+ worldStateMu sync.Mutex
40+ healthcheck * health.Server
3941 // NumBatchesReceived is the number of batches received by VcService.
4042 NumBatchesReceived atomic.Uint32
4143 // MockFaultyNodeDropSize allows mocking a faulty node by dropping some TXs.
4244 MockFaultyNodeDropSize int
45+ // FullMVCC forces the mock VC to perform full MVCC validation.
46+ FullMVCC atomic.Bool
4347 }
4448
4549 // VCStreamState holds the stream's batch queue.
4650 VCStreamState struct {
4751 StreamInfo
4852 q chan * servicepb.VcBatch
4953 }
54+
55+ // WorldState describe the world-state of a key.
56+ WorldState struct {
57+ Namespace string
58+ Key []byte
59+ Value []byte
60+ Version uint64
61+ }
62+
63+ read struct {
64+ ns string
65+ key []byte
66+ version * uint64
67+ }
5068)
5169
5270// NewMockVcService returns a new VcService.
5371func NewMockVcService () * VcService {
5472 return & VcService {
5573 txsStatus : newFifoCache [* committerpb.TxStatus ](defaultTxStatusStorageSize ),
74+ worldState : newFifoCache [* WorldState ](defaultTxStatusStorageSize ),
5675 healthcheck : serve .DefaultHealthCheckService (),
5776 }
5877}
@@ -103,8 +122,8 @@ func (v *VcService) GetTransactionsStatus(
103122 query * committerpb.TxIDsBatch ,
104123) (* committerpb.TxStatusBatch , error ) {
105124 s := & committerpb.TxStatusBatch {Status : make ([]* committerpb.TxStatus , len (query .TxIds ))}
106- v .txsStatusMu .Lock ()
107- defer v .txsStatusMu .Unlock ()
125+ v .worldStateMu .Lock ()
126+ defer v .worldStateMu .Unlock ()
108127 for i , id := range query .TxIds {
109128 if status , ok := v .txsStatus .get (id ); ok {
110129 s .Status [i ] = status
@@ -179,26 +198,8 @@ func (v *VcService) sendTransactionStatus(
179198 if ! ok {
180199 break
181200 }
182- txsStatus := & committerpb.TxStatusBatch {
183- Status : make ([]* committerpb.TxStatus , 0 , len (txBatch .Transactions )),
184- }
185- v .txsStatusMu .Lock ()
186- for i , tx := range txBatch .Transactions {
187- if i < v .MockFaultyNodeDropSize {
188- // We simulate a faulty node by not responding to the first X TXs.
189- continue
190- }
191- code := committerpb .Status_COMMITTED
192- if tx .PrelimInvalidTxStatus != nil {
193- code = * tx .PrelimInvalidTxStatus
194- }
195- s := committerpb .NewTxStatusFromRef (tx .Ref , code )
196- txsStatus .Status = append (txsStatus .Status , s )
197- v .txsStatus .addIfNotExist (tx .Ref .TxId , s )
198- }
199- v .txsStatusMu .Unlock ()
200-
201- if err := stream .Send (txsStatus ); err != nil {
201+ status := v .process (txBatch .Transactions )
202+ if err := stream .Send (& committerpb.TxStatusBatch {Status : status }); err != nil {
202203 return errors .Wrap (err , "error sending transaction status" )
203204 }
204205 }
@@ -218,3 +219,123 @@ func (v *VcService) SubmitTransactions(ctx context.Context, txsBatch *servicepb.
218219 channel .NewWriter (ctx , s .q ).Write (txsBatch )
219220 return nil
220221}
222+
223+ // GetKeys returns the WorldState of the given keys.
224+ func (v * VcService ) GetKeys (nsID string , keys ... []byte ) []* WorldState {
225+ res := make ([]* WorldState , 0 , len (keys ))
226+ v .worldStateMu .Lock ()
227+ defer v .worldStateMu .Unlock ()
228+ for _ , k := range keys {
229+ if val , ok := v .worldState .get (worldStateKey (nsID , k )); ok {
230+ res = append (res , val )
231+ }
232+ }
233+ return res
234+ }
235+
236+ func (v * VcService ) process (txs []* servicepb.VcTx ) []* committerpb.TxStatus {
237+ status := make ([]* committerpb.TxStatus , 0 , len (txs ))
238+
239+ // We simulate a faulty node by not responding to the first X TXs.
240+ skip := max (0 , min (v .MockFaultyNodeDropSize , len (txs )))
241+
242+ v .worldStateMu .Lock ()
243+ defer v .worldStateMu .Unlock ()
244+ for _ , tx := range txs [skip :] {
245+ code := v .validate (tx )
246+ if code == committerpb .Status_COMMITTED {
247+ for _ , w := range getWrites (tx ) {
248+ key := worldStateKey (w .Namespace , w .Key )
249+ if val , valExist := v .worldState .get (key ); valExist {
250+ w .Version = val .Version + 1
251+ }
252+ v .worldState .updateOrAddIfNotExist (key , w )
253+ }
254+ }
255+ s := committerpb .NewTxStatusFromRef (tx .Ref , code )
256+ status = append (status , s )
257+ v .txsStatus .addIfNotExist (tx .Ref .TxId , s )
258+ }
259+
260+ return status
261+ }
262+
263+ func (v * VcService ) validate (tx * servicepb.VcTx ) committerpb.Status {
264+ code := committerpb .Status_COMMITTED
265+ if tx .PrelimInvalidTxStatus != nil {
266+ return * tx .PrelimInvalidTxStatus
267+ }
268+
269+ if ! v .FullMVCC .Load () {
270+ return code
271+ }
272+
273+ existingStatus , txIDExist := v .txsStatus .get (tx .Ref .TxId )
274+ if txIDExist {
275+ if proto .Equal (existingStatus .Ref , tx .Ref ) {
276+ return existingStatus .Status
277+ }
278+ return committerpb .Status_REJECTED_DUPLICATE_TX_ID
279+ }
280+
281+ for _ , r := range getReads (tx ) {
282+ key := worldStateKey (r .ns , r .key )
283+ val , valExist := v .worldState .get (key )
284+ if (r .version == nil && valExist ) || (r .version != nil && (! valExist || * r .version != val .Version )) {
285+ return committerpb .Status_ABORTED_MVCC_CONFLICT
286+ }
287+ }
288+
289+ return code
290+ }
291+
292+ func getReads (tx * servicepb.VcTx ) (reads []read ) {
293+ for _ , ns := range tx .Namespaces {
294+ if ns .NsId != committerpb .MetaNamespaceID && ns .NsId != committerpb .ConfigNamespaceID {
295+ reads = append (reads , read {
296+ ns : committerpb .MetaNamespaceID ,
297+ key : []byte (ns .NsId ),
298+ version : & ns .NsVersion ,
299+ })
300+ }
301+ for _ , r := range ns .ReadsOnly {
302+ reads = append (reads , read {
303+ ns : ns .NsId ,
304+ key : r .Key ,
305+ version : r .Version ,
306+ })
307+ }
308+ for _ , rw := range ns .ReadWrites {
309+ reads = append (reads , read {
310+ ns : ns .NsId ,
311+ key : rw .Key ,
312+ version : rw .Version ,
313+ })
314+ }
315+ }
316+ return reads
317+ }
318+
319+ func getWrites (tx * servicepb.VcTx ) (writes []* WorldState ) {
320+ for _ , ns := range tx .Namespaces {
321+ for _ , rw := range ns .ReadWrites {
322+ writes = append (writes , & WorldState {
323+ Namespace : ns .NsId ,
324+ Key : rw .Key ,
325+ Value : rw .Value ,
326+ })
327+ }
328+ for _ , w := range ns .BlindWrites {
329+ writes = append (writes , & WorldState {
330+ Namespace : ns .NsId ,
331+ Key : w .Key ,
332+ Value : w .Value ,
333+ })
334+ }
335+ }
336+ return writes
337+ }
338+
339+ func worldStateKey (nsID string , key []byte ) string {
340+ return nsID + "$" + string (key )
341+ }
0 commit comments