@@ -130,10 +130,10 @@ public void writeRecord(List<TapRecordEvent> tapRecordEvents, TapTable table, Co
130130
131131 private void doubleActiveWrite (List <TapRecordEvent > tapRecordEvents , TapTable table , Consumer <WriteListResult <TapRecordEvent >> writeListResultConsumer , ClientSession session ) throws Throwable {
132132 Document doubleActiveDoc = new Document ("_id" , "aaaaaaaa" );
133- UpdateOptions options = new UpdateOptions ().upsert (true );
134- mongoDatabase .getCollection ("_tap_double_active" ).updateOne (session , doubleActiveDoc , new Document ("$set" , new Document ("ts" , System .currentTimeMillis ())), options );
135- write (table , tapRecordEvents , writeListResultConsumer , session );
136- }
133+ UpdateOptions options = new UpdateOptions ().upsert (true );
134+ mongoDatabase .getCollection ("_tap_double_active" ).updateOne (session , doubleActiveDoc , new Document ("$set" , new Document ("ts" , System .currentTimeMillis ())), options );
135+ write (table , tapRecordEvents , writeListResultConsumer , session );
136+ }
137137
138138 private void write (TapTable table , List <TapRecordEvent > tapRecordEvents , Consumer <WriteListResult <TapRecordEvent >> writeListResultConsumer , ClientSession session ) throws Throwable {
139139 AtomicLong inserted = new AtomicLong (0 ); //insert count
@@ -528,6 +528,18 @@ private static BulkWriteOptions buildBulkWriteOptions(BulkWriteModel bulkWriteMo
528528 return bulkWriteOptions ;
529529 }
530530
531+ private boolean isShardedCollection (String tableId ) {
532+ return shardKeyMap != null && shardKeyMap .containsKey (tableId );
533+ }
534+
535+ private WriteModel <Document > createUpdateModel (String tableId , Document filter , Document update , UpdateOptions options ) {
536+ if (isShardedCollection (tableId )) {
537+ return new UpdateOneModel <>(filter , update , options );
538+ } else {
539+ return new UpdateManyModel <>(filter , update , options );
540+ }
541+ }
542+
531543 protected List <WriteModel <Document >> normalWriteMode (AtomicLong inserted , AtomicLong updated , AtomicLong deleted , UpdateOptions options , TapTable tapTable , Collection <String > pks , TapRecordEvent recordEvent ) {
532544 List <WriteModel <Document >> writeModels = new ArrayList <>();
533545 if (recordEvent instanceof TapInsertRecordEvent ) {
@@ -552,17 +564,17 @@ protected List<WriteModel<Document>> normalWriteMode(AtomicLong inserted, Atomic
552564 }
553565 MongodbUtil .removeIdIfNeed (pks , insertRecordEvent .getAfter ());
554566 Document update = new Document (operation , insertRecordEvent .getAfter ());
555- writeModels .add (new UpdateManyModel <>( pkFilter , update , options ));
567+ writeModels .add (createUpdateModel ( tapTable . getId (), pkFilter , update , options ));
556568 if (MapUtils .isNotEmpty (unsetDoc )) {
557- writeModels .add (new UpdateManyModel <>( pkFilter , new Document ("$unset" , unsetDoc ), options ));
569+ writeModels .add (createUpdateModel ( tapTable . getId (), pkFilter , new Document ("$unset" , unsetDoc ), options ));
558570 }
559571 } else {
560572 if (CollectionUtils .isNotEmpty (pks ) && MapUtils .isNotEmpty (unsetDoc )) {
561573 Document pkFilter = getPkFilter (pks , insertRecordEvent .getAfter ());
562574 Document update = new Document ("$set" , insertRecordEvent .getAfter ());
563- writeModels .add (new UpdateManyModel <>( pkFilter , update , options ));
575+ writeModels .add (createUpdateModel ( tapTable . getId (), pkFilter , update , options ));
564576 if (MapUtils .isNotEmpty (unsetDoc )) {
565- writeModels .add (new UpdateManyModel <>( pkFilter , new Document ("$unset" , unsetDoc ), options ));
577+ writeModels .add (createUpdateModel ( tapTable . getId (), pkFilter , new Document ("$unset" , unsetDoc ), options ));
566578 }
567579 } else {
568580 writeModels .add (new InsertOneModel <>(new Document (insertRecordEvent .getAfter ())));
@@ -590,7 +602,7 @@ protected List<WriteModel<Document>> normalWriteMode(AtomicLong inserted, Atomic
590602 u .remove ("$v" ); // Exists '$v' in update operation of MongoDB(v3.6), remove it because can't apply in write model.
591603 boolean isUpdate = u .keySet ().stream ().anyMatch (k -> k .startsWith ("$" ));
592604 if (isUpdate ) {
593- writeModel = new UpdateManyModel <>( pkFilter , u , options );
605+ writeModel = createUpdateModel ( tapTable . getId (), pkFilter , u , options );
594606 options .upsert (false );
595607 } else {
596608 writeModel = new ReplaceOneModel <>(pkFilter , u , new ReplaceOptions ().upsert (false ));
@@ -607,10 +619,10 @@ protected List<WriteModel<Document>> normalWriteMode(AtomicLong inserted, Atomic
607619 }
608620 MongodbUtil .removeIdIfNeed (pks , after );
609621 u .append ("$set" , after );
610- writeModels .add (new UpdateManyModel <>( pkFilter , u , options ));
622+ writeModels .add (createUpdateModel ( tapTable . getId (), pkFilter , u , options ));
611623 Document unsetDoc = wrapUnset (recordEvent );
612624 if (MapUtils .isNotEmpty (unsetDoc )) {
613- writeModels .add (new UpdateManyModel <>( pkFilter , new Document ("$unset" , unsetDoc ), options ));
625+ writeModels .add (createUpdateModel ( tapTable . getId (), pkFilter , new Document ("$unset" , unsetDoc ), options ));
614626 }
615627 }
616628 }
0 commit comments