Skip to content

Commit 9be4583

Browse files
committed
split ChangeStreamCursor from OplogCursor
1 parent 250e019 commit 9be4583

File tree

8 files changed

+195
-111
lines changed

8 files changed

+195
-111
lines changed

core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ private Document commandChangeStreamPipeline(Document query, Oplog oplog, String
644644
int batchSize = (int) cursorDocument.getOrDefault("batchSize", 0);
645645

646646
String namespace = getFullCollectionNamespace(collectionName);
647-
Cursor cursor = oplog.createCursor(changeStreamDocument, namespace, aggregation);
647+
Cursor cursor = oplog.createChangeStreamCursor(changeStreamDocument, namespace, aggregation);
648648
return Utils.firstBatchCursorResponse(namespace, cursor.takeDocuments(batchSize), cursor);
649649
}
650650

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package de.bwaldvogel.mongo.oplog;
2+
3+
import java.util.List;
4+
import java.util.stream.Collectors;
5+
6+
import de.bwaldvogel.mongo.MongoBackend;
7+
import de.bwaldvogel.mongo.backend.TailableCursor;
8+
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
9+
import de.bwaldvogel.mongo.bson.BsonTimestamp;
10+
import de.bwaldvogel.mongo.bson.Document;
11+
import de.bwaldvogel.mongo.exception.MongoServerException;
12+
13+
public class ChangeStreamCursor implements TailableCursor {
14+
15+
private static final String FULL_DOCUMENT = "fullDocument";
16+
17+
private final MongoBackend mongoBackend;
18+
private final Document changeStreamDocument;
19+
private final Aggregation aggregation;
20+
private final OplogCursor oplogCursor;
21+
22+
ChangeStreamCursor(
23+
MongoBackend mongoBackend,
24+
Document changeStreamDocument,
25+
Aggregation aggregation,
26+
OplogCursor oplogCursor
27+
) {
28+
this.mongoBackend = mongoBackend;
29+
this.changeStreamDocument = changeStreamDocument;
30+
this.aggregation = aggregation;
31+
this.oplogCursor = oplogCursor;
32+
}
33+
34+
@Override
35+
public long getId() {
36+
return oplogCursor.getId();
37+
}
38+
39+
@Override
40+
public boolean isEmpty() {
41+
return oplogCursor.isEmpty();
42+
}
43+
44+
@Override
45+
public List<Document> takeDocuments(int numberToReturn) {
46+
return aggregation.runStagesAsStream(
47+
oplogCursor.takeDocuments(numberToReturn).stream()
48+
.map(this::toChangeStreamResponseDocument)
49+
).collect(Collectors.toList());
50+
}
51+
52+
@Override
53+
public OplogPosition getPosition() {
54+
return oplogCursor.getPosition();
55+
}
56+
57+
private Document toChangeStreamResponseDocument(Document oplogDocument) {
58+
OperationType operationType = OperationType.fromCode(oplogDocument.get("op").toString());
59+
Document documentKey = new Document();
60+
Document document = getUpdateDocument(oplogDocument);
61+
BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(oplogDocument);
62+
OplogPosition oplogPosition = new OplogPosition(timestamp);
63+
switch (operationType) {
64+
case UPDATE:
65+
case DELETE:
66+
documentKey = document;
67+
break;
68+
case INSERT:
69+
documentKey.append(OplogDocumentFields.ID, document.get(OplogDocumentFields.ID));
70+
break;
71+
case COMMAND:
72+
return toChangeStreamCommandResponseDocument(oplogDocument, oplogPosition, timestamp);
73+
default:
74+
throw new IllegalArgumentException("Unexpected operation type: " + operationType);
75+
}
76+
77+
return new Document()
78+
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
79+
.append("operationType", operationType.getDescription())
80+
.append(FULL_DOCUMENT, getFullDocument(changeStreamDocument, oplogDocument, operationType))
81+
.append("documentKey", documentKey)
82+
.append("clusterTime", timestamp);
83+
}
84+
85+
private Document toChangeStreamCommandResponseDocument(Document oplogDocument, OplogPosition oplogPosition, BsonTimestamp timestamp) {
86+
Document document = getUpdateDocument(oplogDocument);
87+
String operationType = document.keySet().stream().findFirst().orElseThrow(
88+
() -> new MongoServerException("Unspecified command operation type")
89+
);
90+
91+
return new Document()
92+
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
93+
.append("operationType", operationType)
94+
.append("clusterTime", timestamp);
95+
}
96+
97+
private Document getFullDocument(Document changeStreamDocument, Document document, OperationType operationType) {
98+
switch (operationType) {
99+
case INSERT:
100+
return getUpdateDocument(document);
101+
case DELETE:
102+
return null;
103+
case UPDATE:
104+
return lookUpUpdateDocument(changeStreamDocument, document);
105+
}
106+
throw new IllegalArgumentException("Invalid operation type");
107+
}
108+
109+
private Document getUpdateDocument(Document document) {
110+
return (Document) document.get(OplogDocumentFields.O);
111+
}
112+
113+
private Document lookUpUpdateDocument(Document changeStreamDocument, Document document) {
114+
Document deltaUpdate = getDeltaUpdate(getUpdateDocument(document));
115+
if (changeStreamDocument.containsKey(FULL_DOCUMENT) && changeStreamDocument.get(FULL_DOCUMENT).equals("updateLookup")) {
116+
String namespace = (String) document.get(OplogDocumentFields.NAMESPACE);
117+
String databaseName = namespace.split("\\.")[0];
118+
String collectionName = namespace.split("\\.")[1];
119+
return mongoBackend.resolveDatabase(databaseName)
120+
.resolveCollection(collectionName, true)
121+
.queryAllAsStream()
122+
.filter(d -> d.get(OplogDocumentFields.ID).equals(((Document) document.get(OplogDocumentFields.O2)).get(OplogDocumentFields.ID)))
123+
.findFirst()
124+
.orElse(deltaUpdate);
125+
}
126+
return deltaUpdate;
127+
}
128+
129+
private Document getDeltaUpdate(Document updateDocument) {
130+
Document delta = new Document();
131+
if (updateDocument.containsKey("$set")) {
132+
delta.appendAll((Document) updateDocument.get("$set"));
133+
}
134+
if (updateDocument.containsKey("$unset")) {
135+
delta.appendAll((Document) updateDocument.get("$unset"));
136+
}
137+
return delta;
138+
}
139+
140+
}

core/src/main/java/de/bwaldvogel/mongo/oplog/CollectionBackedOplog.java

Lines changed: 23 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
package de.bwaldvogel.mongo.oplog;
22

33
import java.util.List;
4+
import java.util.Objects;
45
import java.util.UUID;
56
import java.util.function.Function;
7+
import java.util.stream.Collectors;
68
import java.util.stream.Stream;
79

810
import de.bwaldvogel.mongo.MongoBackend;
911
import de.bwaldvogel.mongo.MongoCollection;
10-
import de.bwaldvogel.mongo.backend.Cursor;
1112
import de.bwaldvogel.mongo.backend.CursorRegistry;
13+
import de.bwaldvogel.mongo.backend.TailableCursor;
1214
import de.bwaldvogel.mongo.backend.Utils;
1315
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
1416
import de.bwaldvogel.mongo.bson.BsonTimestamp;
1517
import de.bwaldvogel.mongo.bson.Document;
16-
import de.bwaldvogel.mongo.exception.MongoServerException;
1718

1819
public class CollectionBackedOplog implements Oplog {
1920

@@ -83,21 +84,19 @@ public void handleDropCollection(String namespace) {
8384
collection.addDocument(toOplogDropCollection(databaseName, collectionName));
8485
}
8586

86-
private Stream<Document> streamOplog(Document changeStreamDocument, OplogPosition position, Aggregation aggregation,
87-
String namespace) {
88-
return aggregation.runStagesAsStream(collection.queryAllAsStream()
87+
private Stream<Document> streamOplog(OplogPosition position, String namespace) {
88+
return collection.queryAllAsStream()
8989
.filter(document -> filterNamespace(document, namespace))
9090
.filter(document -> {
91-
BsonTimestamp timestamp = getOplogTimestamp(document);
91+
BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(document);
9292
OplogPosition documentOplogPosition = new OplogPosition(timestamp);
9393
return documentOplogPosition.isAfter(position);
9494
})
9595
.sorted((o1, o2) -> {
96-
BsonTimestamp timestamp1 = getOplogTimestamp(o1);
97-
BsonTimestamp timestamp2 = getOplogTimestamp(o2);
96+
BsonTimestamp timestamp1 = OplogUtils.getOplogTimestamp(o1);
97+
BsonTimestamp timestamp2 = OplogUtils.getOplogTimestamp(o2);
9898
return timestamp1.compareTo(timestamp2);
99-
})
100-
.map(document -> toChangeStreamResponseDocument(document, changeStreamDocument)));
99+
});
101100
}
102101

103102
private static boolean filterNamespace(Document document, String namespace) {
@@ -110,7 +109,16 @@ private static boolean filterNamespace(Document document, String namespace) {
110109
}
111110

112111
@Override
113-
public Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
112+
public OplogCursor createCursor(String namespace, OplogPosition initialOplogPosition) {
113+
return new OplogCursor(
114+
cursorRegistry.generateCursorId(),
115+
position -> streamOplog(position, namespace),
116+
initialOplogPosition
117+
);
118+
}
119+
120+
@Override
121+
public TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
114122
Document startAfter = (Document) changeStreamDocument.get(START_AFTER);
115123
Document resumeAfter = (Document) changeStreamDocument.get(RESUME_AFTER);
116124
BsonTimestamp startAtOperationTime = (BsonTimestamp) changeStreamDocument.get(START_AT_OPERATION_TIME);
@@ -123,7 +131,7 @@ public Cursor createCursor(Document changeStreamDocument, String namespace, Aggr
123131
String collectionName = Utils.getCollectionNameFromFullName(namespace);
124132
boolean resumeAfterTerminalEvent = collection.queryAllAsStream()
125133
.filter(document -> {
126-
BsonTimestamp timestamp = getOplogTimestamp(document);
134+
BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(document);
127135
OplogPosition documentOplogPosition = new OplogPosition(timestamp);
128136
return initialOplogPosition.isAfter(documentOplogPosition.inclusive());
129137
})
@@ -141,9 +149,9 @@ public Cursor createCursor(Document changeStreamDocument, String namespace, Aggr
141149
initialOplogPosition = new OplogPosition(oplogClock.now());
142150
}
143151

144-
Function<OplogPosition, Stream<Document>> streamSupplier =
145-
position -> streamOplog(changeStreamDocument, position, aggregation, namespace);
146-
OplogCursor cursor = new OplogCursor(cursorRegistry.generateCursorId(), streamSupplier, initialOplogPosition);
152+
OplogCursor oplogCursor = createCursor(namespace, initialOplogPosition);
153+
ChangeStreamCursor cursor
154+
= new ChangeStreamCursor(backend, changeStreamDocument, aggregation, oplogCursor);
147155
cursorRegistry.add(cursor);
148156
return cursor;
149157
}
@@ -185,91 +193,4 @@ private boolean isOplogCollection(String namespace) {
185193
return collection.getFullName().equals(namespace);
186194
}
187195

188-
private Document getFullDocument(Document changeStreamDocument, Document document, OperationType operationType) {
189-
switch (operationType) {
190-
case INSERT:
191-
return getUpdateDocument(document);
192-
case DELETE:
193-
return null;
194-
case UPDATE:
195-
return lookUpUpdateDocument(changeStreamDocument, document);
196-
}
197-
throw new IllegalArgumentException("Invalid operation type");
198-
}
199-
200-
private Document lookUpUpdateDocument(Document changeStreamDocument, Document document) {
201-
Document deltaUpdate = getDeltaUpdate(getUpdateDocument(document));
202-
if (changeStreamDocument.containsKey(FULL_DOCUMENT) && changeStreamDocument.get(FULL_DOCUMENT).equals("updateLookup")) {
203-
String namespace = (String) document.get(OplogDocumentFields.NAMESPACE);
204-
String databaseName = namespace.split("\\.")[0];
205-
String collectionName = namespace.split("\\.")[1];
206-
return backend.resolveDatabase(databaseName)
207-
.resolveCollection(collectionName, true)
208-
.queryAllAsStream()
209-
.filter(d -> d.get(OplogDocumentFields.ID).equals(((Document) document.get(OplogDocumentFields.O2)).get(OplogDocumentFields.ID)))
210-
.findFirst()
211-
.orElse(deltaUpdate);
212-
}
213-
return deltaUpdate;
214-
}
215-
216-
private Document getDeltaUpdate(Document updateDocument) {
217-
Document delta = new Document();
218-
if (updateDocument.containsKey("$set")) {
219-
delta.appendAll((Document) updateDocument.get("$set"));
220-
}
221-
if (updateDocument.containsKey("$unset")) {
222-
delta.appendAll((Document) updateDocument.get("$unset"));
223-
}
224-
return delta;
225-
}
226-
227-
private Document toChangeStreamResponseDocument(Document oplogDocument, Document changeStreamDocument) {
228-
OperationType operationType = OperationType.fromCode(oplogDocument.get(OplogDocumentFields.OPERATION_TYPE).toString());
229-
Document documentKey = new Document();
230-
Document document = getUpdateDocument(oplogDocument);
231-
BsonTimestamp timestamp = getOplogTimestamp(oplogDocument);
232-
OplogPosition oplogPosition = new OplogPosition(timestamp);
233-
switch (operationType) {
234-
case UPDATE:
235-
case DELETE:
236-
documentKey = document;
237-
break;
238-
case INSERT:
239-
documentKey.append(OplogDocumentFields.ID, document.get(OplogDocumentFields.ID));
240-
break;
241-
case COMMAND:
242-
return toChangeStreamCommandResponseDocument(oplogDocument, oplogPosition, timestamp);
243-
default:
244-
throw new IllegalArgumentException("Unexpected operation type: " + operationType);
245-
}
246-
247-
return new Document()
248-
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
249-
.append(OPERATION_TYPE, operationType.getDescription())
250-
.append(FULL_DOCUMENT, getFullDocument(changeStreamDocument, oplogDocument, operationType))
251-
.append(DOCUMENT_KEY, documentKey)
252-
.append(CLUSTER_TIME, timestamp);
253-
}
254-
255-
private Document toChangeStreamCommandResponseDocument(Document oplogDocument, OplogPosition oplogPosition, BsonTimestamp timestamp) {
256-
Document document = getUpdateDocument(oplogDocument);
257-
String operationType = document.keySet().stream().findFirst().orElseThrow(
258-
() -> new MongoServerException("Unspecified command operation type")
259-
);
260-
261-
return new Document()
262-
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
263-
.append(OPERATION_TYPE, operationType)
264-
.append(CLUSTER_TIME, timestamp);
265-
}
266-
267-
private static BsonTimestamp getOplogTimestamp(Document document) {
268-
return (BsonTimestamp) document.get(OplogDocumentFields.TIMESTAMP);
269-
}
270-
271-
private static Document getUpdateDocument(Document document) {
272-
return (Document) document.get(OplogDocumentFields.O);
273-
}
274-
275196
}

core/src/main/java/de/bwaldvogel/mongo/oplog/NoopOplog.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import java.util.List;
44

5-
import de.bwaldvogel.mongo.backend.Cursor;
65
import de.bwaldvogel.mongo.backend.EmptyCursor;
6+
import de.bwaldvogel.mongo.backend.TailableCursor;
77
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
88
import de.bwaldvogel.mongo.bson.Document;
99

@@ -35,7 +35,12 @@ public void handleDropCollection(String namespace) {
3535
}
3636

3737
@Override
38-
public Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
38+
public TailableCursor createCursor(String namespace, OplogPosition initialOplogPosition) {
39+
return EmptyCursor.get();
40+
}
41+
42+
@Override
43+
public TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
3944
return EmptyCursor.get();
4045
}
4146
}

core/src/main/java/de/bwaldvogel/mongo/oplog/Oplog.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import java.util.List;
44

5-
import de.bwaldvogel.mongo.backend.Cursor;
5+
import de.bwaldvogel.mongo.backend.TailableCursor;
66
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
77
import de.bwaldvogel.mongo.bson.Document;
88

@@ -16,5 +16,7 @@ public interface Oplog {
1616

1717
void handleDropCollection(String namespace);
1818

19-
Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation);
19+
TailableCursor createCursor(String namespace, OplogPosition initialOplogPosition);
20+
21+
TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation);
2022
}

0 commit comments

Comments
 (0)