Skip to content

Commit d66c34a

Browse files
committed
Support showExpandedEvents in change streams.
Expose showExpandedEvents in ChangeStreamOptions and propagate to sync/reactive change streams so expanded events can be consumed. Closes #5069 Signed-off-by: Kyuhong Han [email protected]
1 parent c54ceb6 commit d66c34a

File tree

7 files changed

+91
-0
lines changed

7 files changed

+91
-0
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class ChangeStreamOptions {
5353
private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup;
5454
private @Nullable Collation collation;
5555
private @Nullable Object resumeTimestamp;
56+
private @Nullable Boolean showExpandedEvents;
5657
private Resume resume = Resume.UNDEFINED;
5758

5859
protected ChangeStreamOptions() {}
@@ -108,6 +109,13 @@ public Optional<BsonTimestamp> getResumeBsonTimestamp() {
108109
return Optional.ofNullable(resumeTimestamp).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class));
109110
}
110111

112+
/**
113+
* @return {@link Optional#empty()} if not set.
114+
*/
115+
public Optional<Boolean> getShowExpandedEvents() {
116+
return Optional.ofNullable(showExpandedEvents);
117+
}
118+
111119
/**
112120
* @return {@literal true} if the change stream should be started after the {@link #getResumeToken() token}.
113121
* @since 2.2
@@ -191,6 +199,9 @@ public boolean equals(@Nullable Object o) {
191199
if (!ObjectUtils.nullSafeEquals(this.resumeTimestamp, that.resumeTimestamp)) {
192200
return false;
193201
}
202+
if (!ObjectUtils.nullSafeEquals(this.showExpandedEvents, that.showExpandedEvents)) {
203+
return false;
204+
}
194205
return resume == that.resume;
195206
}
196207

@@ -202,6 +213,7 @@ public int hashCode() {
202213
result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentBeforeChangeLookup);
203214
result = 31 * result + ObjectUtils.nullSafeHashCode(collation);
204215
result = 31 * result + ObjectUtils.nullSafeHashCode(resumeTimestamp);
216+
result = 31 * result + ObjectUtils.nullSafeHashCode(showExpandedEvents);
205217
result = 31 * result + ObjectUtils.nullSafeHashCode(resume);
206218
return result;
207219
}
@@ -239,6 +251,7 @@ public static class ChangeStreamOptionsBuilder {
239251
private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup;
240252
private @Nullable Collation collation;
241253
private @Nullable Object resumeTimestamp;
254+
private @Nullable Boolean showExpandedEvents;
242255
private Resume resume = Resume.UNDEFINED;
243256

244257
private ChangeStreamOptionsBuilder() {}
@@ -432,6 +445,19 @@ public ChangeStreamOptionsBuilder startAfter(BsonValue resumeToken) {
432445
return this;
433446
}
434447

448+
/**
449+
* Set whether expanded change events (e.g. createIndexes, shardCollection) should be emitted.
450+
*
451+
* @param showExpandedEvents {@code true} to include expanded events.
452+
* @return this.
453+
*/
454+
@Contract("_ -> this")
455+
public ChangeStreamOptionsBuilder showExpandedEvents(boolean showExpandedEvents) {
456+
457+
this.showExpandedEvents = showExpandedEvents;
458+
return this;
459+
}
460+
435461
/**
436462
* @return the built {@link ChangeStreamOptions}
437463
*/
@@ -446,6 +472,7 @@ public ChangeStreamOptions build() {
446472
options.fullDocumentBeforeChangeLookup = this.fullDocumentBeforeChangeLookup;
447473
options.collation = this.collation;
448474
options.resumeTimestamp = this.resumeTimestamp;
475+
options.showExpandedEvents = this.showExpandedEvents;
449476
options.resume = this.resume;
450477

451478
return options;

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2148,6 +2148,7 @@ public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @N
21482148
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
21492149
.orElse(publisher);
21502150
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
2151+
publisher = options.getShowExpandedEvents().map(publisher::showExpandedEvents).orElse(publisher);
21512152

21522153
if (options.getFullDocumentBeforeChangeLookup().isPresent()) {
21532154
publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get());

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ public static class ChangeStreamRequestBuilder<T> {
243243
private @Nullable String databaseName;
244244
private @Nullable String collectionName;
245245
private @Nullable Duration maxAwaitTime;
246+
private @Nullable Boolean showExpandedEvents;
246247
private @Nullable MessageListener<ChangeStreamDocument<Document>, ? super T> listener;
247248
private final ChangeStreamOptionsBuilder delegate = ChangeStreamOptions.builder();
248249

@@ -470,6 +471,20 @@ public ChangeStreamRequestBuilder<T> maxAwaitTime(Duration timeout) {
470471
return this;
471472
}
472473

474+
/**
475+
* Set whether expanded change events (e.g. createIndexes, shardCollection) should be emitted.
476+
*
477+
* @param showExpandedEvents {@code true} to include expanded events.
478+
* @return this.
479+
*/
480+
@Contract("_ -> this")
481+
public ChangeStreamRequestBuilder<T> showExpandedEvents(boolean showExpandedEvents) {
482+
483+
this.showExpandedEvents = showExpandedEvents;
484+
this.delegate.showExpandedEvents(showExpandedEvents);
485+
return this;
486+
}
487+
473488
/**
474489
* @return the build {@link ChangeStreamRequest}.
475490
*/

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
9090
FullDocumentBeforeChange fullDocumentBeforeChange = null;
9191
BsonTimestamp startAt = null;
9292
boolean resumeAfter = true;
93+
boolean showExpandedEvents = false;
9394

9495
if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions requestOptions) {
9596

@@ -105,6 +106,10 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
105106
}
106107
}
107108

109+
if (changeStreamOptions.getShowExpandedEvents().isPresent()) {
110+
showExpandedEvents = changeStreamOptions.getShowExpandedEvents().get();
111+
}
112+
108113
if (changeStreamOptions.getResumeToken().isPresent()) {
109114

110115
resumeToken = changeStreamOptions.getResumeToken().get().asDocument();
@@ -155,6 +160,10 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
155160
iterable = iterable.collation(collation);
156161
}
157162

163+
if (showExpandedEvents) {
164+
iterable = iterable.showExpandedEvents(showExpandedEvents);
165+
}
166+
158167
iterable = iterable.fullDocument(fullDocument);
159168
if(fullDocumentBeforeChange != null) {
160169
iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange);

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ChangeStreamOptionsUnitTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,12 @@ public void shouldNotReportResumeStartAfter() {
5353
assertThat(options.isResumeAfter()).isFalse();
5454
assertThat(options.isStartAfter()).isFalse();
5555
}
56+
57+
@Test // GH-5069
58+
void shouldStoreShowExpandedEvents() {
59+
60+
ChangeStreamOptions options = ChangeStreamOptions.builder().showExpandedEvents(true).build();
61+
62+
assertThat(options.getShowExpandedEvents()).isPresent().hasValue(true);
63+
}
5664
}

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1668,6 +1668,21 @@ void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() {
16681668

16691669
}
16701670

1671+
@Test // GH-5069
1672+
void changeStreamOptionShowExpandedEventsShouldBeApplied() {
1673+
1674+
when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db));
1675+
when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher);
1676+
when(changeStreamPublisher.showExpandedEvents(anyBoolean())).thenReturn(changeStreamPublisher);
1677+
when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher);
1678+
1679+
ChangeStreamOptions options = ChangeStreamOptions.builder()
1680+
.showExpandedEvents(true).build();
1681+
template.changeStream("database", "collection", options, Object.class).subscribe();
1682+
1683+
verify(changeStreamPublisher).showExpandedEvents(true);
1684+
}
1685+
16711686
@Test // GH-4462
16721687
void replaceShouldUseCollationWhenPresent() {
16731688

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,22 @@ void shouldApplyFullDocumentBeforeChangeToChangeStream() {
140140
verify(changeStreamIterable).fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
141141
}
142142

143+
@Test // GH-5069
144+
void shouldApplyShowExpandedEventsToChangeStream() {
145+
146+
when(changeStreamIterable.showExpandedEvents(true)).thenReturn(changeStreamIterable);
147+
148+
ChangeStreamRequest request = ChangeStreamRequest.builder() //
149+
.collection("start-wars") //
150+
.showExpandedEvents(true) //
151+
.publishTo(message -> {}) //
152+
.build();
153+
154+
initTask(request, Document.class);
155+
156+
verify(changeStreamIterable).showExpandedEvents(true);
157+
}
158+
143159
private MongoCursor<ChangeStreamDocument<Document>> initTask(ChangeStreamRequest request, Class<?> targetType) {
144160

145161
ChangeStreamTask task = new ChangeStreamTask(template, request, targetType, er -> {});

0 commit comments

Comments
 (0)