Skip to content

Commit 155b50b

Browse files
Decouple Bulk from BulkOperations.
1 parent fbfa3b1 commit 155b50b

30 files changed

+3641
-89
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb;
17+
18+
/**
19+
* Interface that can provide access to a MongoDB cluster.
20+
*
21+
* @param <T> the MongoDB cluster/client type (e.g. {@link com.mongodb.client.MongoCluster} or
22+
* {@link com.mongodb.reactivestreams.client.MongoCluster}).
23+
* @author Christoph Strobl
24+
* @since 5.1
25+
*/
26+
public interface MongoClusterCapable<T> {
27+
28+
/**
29+
* Returns the MongoDB cluster used by this factory.
30+
*
31+
* @return the cluster; never {@literal null}.
32+
* @throws IllegalStateException if cluster cannot be obtained.
33+
*/
34+
T getMongoCluster();
35+
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/SessionAwareMethodInterceptor.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class SessionAwareMethodInterceptor<D, C> implements MethodInterceptor {
5454
private final ClientSessionOperator databaseDecorator;
5555
private final Object target;
5656
private final Class<?> targetType;
57+
private final Class<?> clientType;
5758
private final Class<?> collectionType;
5859
private final Class<?> databaseType;
5960
private final Class<? extends ClientSession> sessionType;
@@ -63,15 +64,17 @@ public class SessionAwareMethodInterceptor<D, C> implements MethodInterceptor {
6364
*
6465
* @param session the {@link ClientSession} to be used on invocation.
6566
* @param target the original target object.
66-
* @param databaseType the MongoDB database type
67+
* @param clientType the MongoDB cluster/client type (e.g. {@link com.mongodb.client.MongoCluster}).
68+
* @param sessionType the {@link ClientSession} type.
69+
* @param databaseType the MongoDB database type.
6770
* @param databaseDecorator a {@link ClientSessionOperator} used to create the proxy for an imperative / reactive
6871
* {@code MongoDatabase}.
6972
* @param collectionType the MongoDB collection type.
7073
* @param collectionDecorator a {@link ClientSessionOperator} used to create the proxy for an imperative / reactive
7174
* {@code MongoCollection}.
7275
* @param <T> target object type.
7376
*/
74-
public <T> SessionAwareMethodInterceptor(ClientSession session, T target, Class<? extends ClientSession> sessionType,
77+
public <T> SessionAwareMethodInterceptor(ClientSession session, T target, Class<?> clientType, Class<? extends ClientSession> sessionType,
7578
Class<D> databaseType, ClientSessionOperator<D> databaseDecorator, Class<C> collectionType,
7679
ClientSessionOperator<C> collectionDecorator) {
7780

@@ -85,15 +88,24 @@ public <T> SessionAwareMethodInterceptor(ClientSession session, T target, Class<
8588

8689
this.session = session;
8790
this.target = target;
91+
this.clientType = ClassUtils.getUserClass(clientType);
8892
this.databaseType = ClassUtils.getUserClass(databaseType);
8993
this.collectionType = ClassUtils.getUserClass(collectionType);
9094
this.collectionDecorator = collectionDecorator;
9195
this.databaseDecorator = databaseDecorator;
9296

93-
this.targetType = ClassUtils.isAssignable(databaseType, target.getClass()) ? databaseType : collectionType;
97+
this.targetType = targetType(target.getClass());
9498
this.sessionType = sessionType;
9599
}
96100

101+
Class<?> targetType(@Nullable Class<?> targetType) {
102+
103+
if(ClassUtils.isAssignable(clientType, targetType)) {
104+
return clientType;
105+
}
106+
return ClassUtils.isAssignable(databaseType, targetType) ? databaseType : collectionType;
107+
}
108+
97109
@Override
98110
public @Nullable Object invoke(MethodInvocation methodInvocation) throws Throwable {
99111

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core;
17+
18+
import java.util.List;
19+
20+
import com.mongodb.client.model.bulk.ClientDeleteOneOptions;
21+
import com.mongodb.client.model.bulk.ClientReplaceOneOptions;
22+
import org.bson.Document;
23+
24+
import com.mongodb.MongoNamespace;
25+
import com.mongodb.client.model.DeleteOptions;
26+
import com.mongodb.client.model.UpdateOptions;
27+
import com.mongodb.client.model.bulk.ClientDeleteManyOptions;
28+
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
29+
import com.mongodb.client.model.bulk.ClientUpdateManyOptions;
30+
import com.mongodb.client.model.bulk.ClientUpdateOneOptions;
31+
32+
/**
33+
* @author Christoph Strobl
34+
*/
35+
abstract class BulkWriteSupport {
36+
37+
static ClientNamespacedWriteModel updateMany(MongoNamespace namespace, Document query, Object update,
38+
UpdateOptions updateOptions) {
39+
40+
ClientUpdateManyOptions updateManyOptions = ClientUpdateManyOptions.clientUpdateManyOptions();
41+
updateManyOptions.arrayFilters(updateOptions.getArrayFilters());
42+
updateManyOptions.collation(updateOptions.getCollation());
43+
updateManyOptions.upsert(updateOptions.isUpsert());
44+
updateManyOptions.hint(updateOptions.getHint());
45+
updateManyOptions.hintString(updateOptions.getHintString());
46+
47+
if (update instanceof List<?> pipeline) {
48+
return ClientNamespacedWriteModel.updateMany(namespace, query, (List<Document>) pipeline, updateManyOptions);
49+
} else {
50+
return ClientNamespacedWriteModel.updateMany(namespace, query, (Document) update, updateManyOptions);
51+
}
52+
}
53+
54+
static ClientNamespacedWriteModel updateOne(MongoNamespace namespace, Document query, Object update,
55+
UpdateOptions updateOptions) {
56+
57+
ClientUpdateOneOptions updateOneOptions = ClientUpdateOneOptions.clientUpdateOneOptions();
58+
updateOneOptions.sort(updateOptions.getSort());
59+
updateOneOptions.arrayFilters(updateOptions.getArrayFilters());
60+
updateOneOptions.collation(updateOptions.getCollation());
61+
updateOneOptions.upsert(updateOptions.isUpsert());
62+
updateOneOptions.hint(updateOptions.getHint());
63+
updateOneOptions.hintString(updateOptions.getHintString());
64+
65+
if (update instanceof List<?> pipeline) {
66+
return ClientNamespacedWriteModel.updateOne(namespace, query, (List<Document>) pipeline, updateOneOptions);
67+
} else {
68+
return ClientNamespacedWriteModel.updateOne(namespace, query, (Document) update, updateOneOptions);
69+
}
70+
}
71+
72+
static ClientNamespacedWriteModel removeMany(MongoNamespace namespace, Document query, DeleteOptions deleteOptions) {
73+
74+
ClientDeleteManyOptions clientDeleteManyOptions = ClientDeleteManyOptions.clientDeleteManyOptions();
75+
clientDeleteManyOptions.collation(deleteOptions.getCollation());
76+
clientDeleteManyOptions.hint(deleteOptions.getHint());
77+
clientDeleteManyOptions.hintString(deleteOptions.getHintString());
78+
79+
return ClientNamespacedWriteModel.deleteMany(namespace, query, clientDeleteManyOptions);
80+
}
81+
82+
static ClientNamespacedWriteModel removeOne(MongoNamespace namespace, Document query, DeleteOptions deleteOptions) {
83+
84+
ClientDeleteOneOptions clientDeleteOneOptions = ClientDeleteOneOptions.clientDeleteOneOptions();
85+
// TODO: open an issue with MongoDB to enable sort for deleteOne
86+
clientDeleteOneOptions.collation(deleteOptions.getCollation());
87+
clientDeleteOneOptions.hint(deleteOptions.getHint());
88+
clientDeleteOneOptions.hintString(deleteOptions.getHintString());
89+
90+
91+
return ClientNamespacedWriteModel.deleteOne(namespace, query, clientDeleteOneOptions);
92+
}
93+
94+
static ClientNamespacedWriteModel replaceOne(MongoNamespace namespace, Document query, Document replacement, UpdateOptions updateOptions) {
95+
96+
ClientReplaceOneOptions replaceOptions = ClientReplaceOneOptions.clientReplaceOneOptions();
97+
replaceOptions.sort(updateOptions.getSort());
98+
replaceOptions.upsert(updateOptions.isUpsert());
99+
replaceOptions.hint(updateOptions.getHint());
100+
replaceOptions.hintString(updateOptions.getHintString());
101+
replaceOptions.collation(updateOptions.getCollation());
102+
103+
return ClientNamespacedWriteModel.replaceOne(namespace, query,
104+
replacement, replaceOptions);
105+
}
106+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
21+
import org.bson.Document;
22+
import org.springframework.dao.DataAccessException;
23+
import org.springframework.data.mongodb.core.MongoTemplate.SourceAwareDocument;
24+
import org.springframework.data.mongodb.core.QueryOperations.DeleteContext;
25+
import org.springframework.data.mongodb.core.QueryOperations.UpdateContext;
26+
import org.springframework.data.mongodb.core.bulk.Bulk;
27+
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions.Order;
28+
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions;
29+
import org.springframework.data.mongodb.core.bulk.BulkOperation;
30+
import org.springframework.data.mongodb.core.bulk.BulkOperation.Insert;
31+
import org.springframework.data.mongodb.core.bulk.BulkOperation.Remove;
32+
import org.springframework.data.mongodb.core.bulk.BulkOperation.RemoveFirst;
33+
import org.springframework.data.mongodb.core.bulk.BulkOperation.Replace;
34+
import org.springframework.data.mongodb.core.bulk.BulkOperation.Update;
35+
import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateFirst;
36+
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
37+
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
38+
39+
import com.mongodb.MongoBulkWriteException;
40+
import com.mongodb.MongoNamespace;
41+
import com.mongodb.client.model.DeleteOptions;
42+
import com.mongodb.client.model.UpdateOptions;
43+
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
44+
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
45+
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
46+
47+
/**
48+
* Internal API wrapping a {@link MongoTemplate} to encapsulate {@link Bulk} handling.
49+
*
50+
* @author Christoph Strobl
51+
* @since 2026/02
52+
*/
53+
class BulkWriter {
54+
55+
MongoTemplate template;
56+
57+
BulkWriter(MongoTemplate template) {
58+
this.template = template;
59+
}
60+
61+
public ClientBulkWriteResult write(String defaultDatabase, BulkWriteOptions options, Bulk bulk) {
62+
63+
List<ClientNamespacedWriteModel> writeModels = new ArrayList<>();
64+
List<SourceAwareDocument<Object>> afterSaveCallables = new ArrayList<>();
65+
66+
for (BulkOperation bulkOp : bulk.operations()) {
67+
68+
String collectionName = bulkOp.context().namespace().collection() != null
69+
? bulkOp.context().namespace().collection()
70+
: template.getCollectionName(bulkOp.context().namespace().type());
71+
72+
MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase, collectionName);
73+
if (bulkOp instanceof Insert insert) {
74+
75+
SourceAwareDocument<Object> sourceAwareDocument = template.prepareObjectForSave(collectionName, insert.value(),
76+
template.getConverter());
77+
writeModels.add(ClientNamespacedWriteModel.insertOne(mongoNamespace, sourceAwareDocument.document()));
78+
afterSaveCallables.add(sourceAwareDocument);
79+
} else if (bulkOp instanceof Update update) {
80+
81+
Class<?> domainType = update.context().namespace().type();
82+
boolean multi = !(bulkOp instanceof UpdateFirst);
83+
84+
UpdateContext updateContext = template.getQueryOperations().updateContext(update.update(), update.query(),
85+
update.upsert());
86+
MongoPersistentEntity<?> entity = template.getPersistentEntity(domainType);
87+
88+
Document mappedQuery = updateContext.getMappedQuery(entity);
89+
Object mappedUpdate = updateContext.isAggregationUpdate() ? updateContext.getUpdatePipeline(domainType)
90+
: updateContext.getMappedUpdate(entity);
91+
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, update.query());
92+
93+
if (multi) {
94+
writeModels.add(BulkWriteSupport.updateMany(mongoNamespace, mappedQuery, mappedUpdate, updateOptions));
95+
} else {
96+
writeModels.add(BulkWriteSupport.updateOne(mongoNamespace, mappedQuery, mappedUpdate, updateOptions));
97+
}
98+
} else if (bulkOp instanceof Remove remove) {
99+
100+
Class<?> domainType = remove.context().namespace().type();
101+
DeleteContext deleteContext = template.getQueryOperations().deleteQueryContext(remove.query());
102+
103+
Document mappedQuery = deleteContext.getMappedQuery(template.getPersistentEntity(domainType));
104+
DeleteOptions deleteOptions = deleteContext.getDeleteOptions(domainType);
105+
106+
if (remove instanceof RemoveFirst) {
107+
writeModels.add(BulkWriteSupport.removeOne(mongoNamespace, mappedQuery, deleteOptions));
108+
} else {
109+
writeModels.add(BulkWriteSupport.removeMany(mongoNamespace, mappedQuery, deleteOptions));
110+
}
111+
} else if (bulkOp instanceof Replace replace) {
112+
113+
Class<?> domainType = replace.context().namespace().type();
114+
115+
SourceAwareDocument<Object> sourceAwareDocument = template.prepareObjectForSave(collectionName,
116+
replace.replacement(), template.getConverter());
117+
118+
UpdateContext updateContext = template.getQueryOperations().replaceSingleContext(replace.query(),
119+
MappedDocument.of(sourceAwareDocument.document()), replace.upsert());
120+
121+
Document mappedQuery = updateContext.getMappedQuery(template.getPersistentEntity(domainType));
122+
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, replace.query());
123+
124+
writeModels.add(
125+
BulkWriteSupport.replaceOne(mongoNamespace, mappedQuery, sourceAwareDocument.document(), updateOptions));
126+
afterSaveCallables.add(sourceAwareDocument);
127+
}
128+
}
129+
130+
try {
131+
132+
ClientBulkWriteResult clientBulkWriteResult = template.doWithClient(client -> client.bulkWrite(writeModels,
133+
ClientBulkWriteOptions.clientBulkWriteOptions().ordered(options.getOrder().equals(BulkWriteOptions.Order.ORDERED))));
134+
135+
afterSaveCallables.forEach(callable -> {
136+
template
137+
.maybeEmitEvent(new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName()));
138+
template.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName());
139+
});
140+
return clientBulkWriteResult;
141+
} catch (MongoBulkWriteException e) {
142+
DataAccessException dataAccessException = template.getExceptionTranslator().translateExceptionIfPossible(e);
143+
if (dataAccessException != null) {
144+
throw dataAccessException;
145+
}
146+
throw e;
147+
}
148+
}
149+
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/EntityOperations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
* @see MongoTemplate
9393
* @see ReactiveMongoTemplate
9494
*/
95-
class EntityOperations {
95+
public class EntityOperations {
9696

9797
private static final String ID_FIELD = FieldName.ID.name();
9898

@@ -109,7 +109,7 @@ class EntityOperations {
109109
this(converter, new QueryMapper(converter));
110110
}
111111

112-
EntityOperations(MongoConverter converter, QueryMapper queryMapper) {
112+
public EntityOperations(MongoConverter converter, QueryMapper queryMapper) {
113113
this(converter, converter.getMappingContext(), converter.getCustomConversions(), converter.getProjectionFactory(),
114114
queryMapper);
115115
}

0 commit comments

Comments
 (0)