@@ -113,7 +113,7 @@ func (v *dataTxValidator) parallelValidation(txsEnv []*types.DataTxEnvelope, use
113113}
114114
115115func (v * dataTxValidator ) validateSignatures (txEnv * types.DataTxEnvelope ) ([]string , * types.ValidationInfo , error ) {
116- var userIDsWithValidSign []string
116+ userIDsWithValidSign := make ( []string , 0 , len ( txEnv . Signatures ))
117117 for userID , signature := range txEnv .Signatures {
118118 valRes , err := v .sigValidator .validate (userID , signature , txEnv .Payload )
119119 if err != nil {
@@ -484,6 +484,108 @@ func (v *dataTxValidator) validateACLForWriteOrDelete(userIDs []string, dbName,
484484 }, nil
485485}
486486
487+ type readCache struct {
488+ dbName string
489+ key string
490+ ver * types.Version
491+ err error
492+ wg sync.WaitGroup
493+ }
494+
495+ func (v * dataTxValidator ) parallelReadMvccValidation (
496+ valInfoArray []* types.ValidationInfo , dataTxEnvs []* types.DataTxEnvelope ,
497+ ) error {
498+ reads := make (map [string ]map [string ]* readCache )
499+ errorChan := make (chan error )
500+
501+ // Submit a "get-version" Go routine for each key in the envelope.
502+ // We avoid reading the same key twice.
503+ for txNum , txEnv := range dataTxEnvs {
504+ if valInfoArray [txNum ].Flag != types .Flag_VALID {
505+ continue
506+ }
507+ for _ , txOps := range txEnv .Payload .DbOperations {
508+ dbName := txOps .DbName
509+ dbReads , ok := reads [dbName ]
510+ if ! ok {
511+ dbReads = make (map [string ]* readCache )
512+ reads [dbName ] = dbReads
513+ }
514+
515+ for _ , r := range txOps .DataReads {
516+ key := r .Key
517+ if _ , ok := dbReads [key ]; ok {
518+ continue
519+ }
520+
521+ c := & readCache {
522+ dbName : dbName ,
523+ key : key ,
524+ }
525+ c .wg .Add (1 )
526+ dbReads [key ] = c
527+ go func (txNum int , c * readCache ) {
528+ defer c .wg .Done ()
529+ c .ver , c .err = v .db .GetVersion (c .dbName , c .key )
530+ if c .err != nil {
531+ v .logger .Errorf ("error validating signatures in tx number %d, error: %s" , txNum , c .err )
532+ defer func () {
533+ // Ignore panic when errorChan is closed
534+ recover ()
535+ }()
536+ errorChan <- c .err
537+ }
538+ }(txNum , c )
539+ }
540+ }
541+ }
542+
543+ // Submit a "validation" Go routine for read operation in the envelope.
544+ wg := sync.WaitGroup {}
545+ for txNum , txEnv := range dataTxEnvs {
546+ for _ , txOps := range txEnv .Payload .DbOperations {
547+ for _ , r := range txOps .DataReads {
548+ if valInfoArray [txNum ].Flag != types .Flag_VALID {
549+ continue
550+ }
551+
552+ wg .Add (1 )
553+ go func (txNum int , c * readCache , expectedVer * types.Version ) {
554+ defer wg .Done ()
555+ if c == nil {
556+ panic ("all reads keys should be in the map" )
557+ }
558+
559+ c .wg .Wait ()
560+ if valInfoArray [txNum ].Flag != types .Flag_VALID || c .err != nil {
561+ return
562+ }
563+ if proto .Equal (expectedVer , c .ver ) {
564+ return
565+ }
566+ valInfoArray [txNum ] = & types.ValidationInfo {
567+ Flag : types .Flag_INVALID_MVCC_CONFLICT_WITH_COMMITTED_STATE ,
568+ ReasonIfInvalid : "mvcc conflict has occurred as the committed state for the key [" + c .key + "] in database [" + c .dbName + "] changed" ,
569+ }
570+ }(txNum , reads [txOps.DbName ][r.Key ], r .Version )
571+ }
572+ }
573+ }
574+
575+ // Wait for all the validation routines to end.
576+ go func () {
577+ wg .Wait ()
578+ // Inject nil to make sure we have data to read from the channel if no error occurred.
579+ errorChan <- nil
580+ }()
581+
582+ select {
583+ case err := <- errorChan :
584+ close (errorChan )
585+ return err
586+ }
587+ }
588+
487589func (v * dataTxValidator ) mvccValidation (dbName string , txOps * types.DBOperation , pendingOps * pendingOperations ) (* types.ValidationInfo , error ) {
488590 for _ , r := range txOps .DataReads {
489591 if pendingOps .exist (dbName , r .Key ) {
@@ -532,3 +634,38 @@ func (v *dataTxValidator) mvccValidation(dbName string, txOps *types.DBOperation
532634 Flag : types .Flag_VALID ,
533635 }, nil
534636}
637+
638+ func (v * dataTxValidator ) mvccValidationPending (dbName string , txOps * types.DBOperation , pendingOps * pendingOperations ) (* types.ValidationInfo , error ) {
639+ for _ , r := range txOps .DataReads {
640+ if pendingOps .exist (dbName , r .Key ) {
641+ return & types.ValidationInfo {
642+ Flag : types .Flag_INVALID_MVCC_CONFLICT_WITHIN_BLOCK ,
643+ ReasonIfInvalid : "mvcc conflict has occurred within the block for the key [" + r .Key + "] in database [" + dbName + "]" ,
644+ }, nil
645+ }
646+ }
647+ // as state trie generation work at the boundary of block, we cannot allow more than one write per key. This is because, the state trie
648+ // generation considers only the final updates and not intermediate updates within a block boundary. As a result, we would have intermediate
649+ // entries in the provenance store but cannot generate proof of existence for the same using the state trie. As blind writes/deletes are quite
650+ // rare, we allow only one write per key within a block. In general, user reads the key before writing to it.
651+ for _ , w := range txOps .DataWrites {
652+ if pendingOps .exist (dbName , w .Key ) {
653+ return & types.ValidationInfo {
654+ Flag : types .Flag_INVALID_MVCC_CONFLICT_WITHIN_BLOCK ,
655+ ReasonIfInvalid : "mvcc conflict has occurred within the block for the key [" + w .Key + "] in database [" + dbName + "]. Within a block, a key can be modified only once" ,
656+ }, nil
657+ }
658+ }
659+ for _ , d := range txOps .DataDeletes {
660+ if pendingOps .exist (dbName , d .Key ) {
661+ return & types.ValidationInfo {
662+ Flag : types .Flag_INVALID_MVCC_CONFLICT_WITHIN_BLOCK ,
663+ ReasonIfInvalid : "mvcc conflict has occurred within the block for the key [" + d .Key + "] in database [" + dbName + "]. Within a block, a key can be modified only once" ,
664+ }, nil
665+ }
666+ }
667+
668+ return & types.ValidationInfo {
669+ Flag : types .Flag_VALID ,
670+ }, nil
671+ }
0 commit comments