Skip to content

Commit ec5eabd

Browse files
committed
MLE-27078 Incremental write now uses an unsignedLong
Not sure eval will be kept, but all 3 approaches are now properly using an unsignedLong. Also fixed a performance issue in the view filter where "op.in" was being used instead of a "where" + documentQuery.
1 parent fb5f639 commit ec5eabd

File tree

8 files changed

+86
-34
lines changed

8 files changed

+86
-34
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,17 @@
1919
*/
2020
class IncrementalWriteEvalFilter extends IncrementalWriteFilter {
2121

22+
// The hash value is cast to a String based on this analysis from Copilot:
23+
// "Since the field index is unsignedLong, cts.valueTuples returns an
24+
// xs:unsignedLong value, which JavaScript represents as an IEEE 754
25+
// double. Values above −1 (~9.2 quadrillion) would silently lose precision."
26+
// It is then cast back to an unsignedLong when the value is retrieved
27+
// from the JSON response object.
2228
private static final String EVAL_SCRIPT = """
2329
const tuples = cts.valueTuples([cts.uriReference(), cts.fieldReference(hashKeyName)], null, cts.documentQuery(uris));
2430
const response = {};
2531
for (var tuple of tuples) {
26-
response[tuple[0]] = tuple[1];
32+
response[tuple[0]] = String(tuple[1]);
2733
}
2834
response
2935
""";
@@ -49,7 +55,7 @@ public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) {
4955

5056
return filterDocuments(context, uri -> {
5157
if (response.has(uri)) {
52-
return response.get(uri).asText();
58+
return Long.parseUnsignedLong(response.get(uri).asText());
5359
}
5460
return null;
5561
});

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ public IncrementalWriteConfig getConfig() {
227227
return config;
228228
}
229229

230-
protected final DocumentWriteSet filterDocuments(Context context, Function<String, String> hashRetriever) {
230+
protected final DocumentWriteSet filterDocuments(Context context, Function<String, Long> hashRetriever) {
231231
final DocumentWriteSet newWriteSet = context.getDatabaseClient().newDocumentManager().newWriteSet();
232232
final List<DocumentWriteOperation> skippedDocuments = new ArrayList<>();
233233
final String timestamp = Instant.now().toString();
@@ -246,8 +246,8 @@ protected final DocumentWriteSet filterDocuments(Context context, Function<Strin
246246
continue;
247247
}
248248

249-
final String contentHash = computeHash(serializedContent);
250-
final String existingHash = hashRetriever.apply(doc.getUri());
249+
final long contentHash = computeHash(serializedContent);
250+
final Long existingHash = hashRetriever.apply(doc.getUri());
251251
if (logger.isTraceEnabled()) {
252252
logger.trace("URI: {}, existing Hash: {}, new Hash: {}", doc.getUri(), existingHash, contentHash);
253253
}
@@ -317,13 +317,12 @@ private boolean isPossiblyJsonContent(String content) {
317317
return trimmed.startsWith("{") || trimmed.startsWith("[");
318318
}
319319

320-
private String computeHash(String content) {
320+
private long computeHash(String content) {
321321
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
322-
long hash = hashFunction.hashBytes(bytes);
323-
return Long.toHexString(hash);
322+
return hashFunction.hashBytes(bytes);
324323
}
325324

326-
protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation op, String hashKeyName, String hash,
325+
protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation op, String hashKeyName, long hash,
327326
String timestampKeyName, String timestamp) {
328327
DocumentMetadataHandle newMetadata = new DocumentMetadataHandle();
329328
if (op.getMetadata() != null) {
@@ -335,7 +334,7 @@ protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation
335334
newMetadata.getMetadataValues().putAll(originalMetadata.getMetadataValues());
336335
}
337336

338-
newMetadata.getMetadataValues().put(hashKeyName, hash);
337+
newMetadata.getMetadataValues().put(hashKeyName, Long.toUnsignedString(hash));
339338
newMetadata.getMetadataValues().put(timestampKeyName, timestamp);
340339

341340
return new DocumentWriteOperationImpl(op.getUri(), newMetadata, op.getContent(), op.getTemporalDocumentURI());

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public DocumentWriteSet apply(Context context) {
3434
RowTemplate rowTemplate = new RowTemplate(context.getDatabaseClient());
3535

3636
try {
37-
Map<String, String> existingHashes = rowTemplate.query(op ->
37+
Map<String, Long> existingHashes = rowTemplate.query(op ->
3838
op.fromLexicons(Map.of(
3939
"uri", op.cts.uriReference(),
4040
"hash", op.cts.fieldReference(getConfig().getHashKeyName())
@@ -43,10 +43,10 @@ public DocumentWriteSet apply(Context context) {
4343
),
4444

4545
rows -> {
46-
Map<String, String> map = new HashMap<>();
46+
Map<String, Long> map = new HashMap<>();
4747
rows.forEach(row -> {
4848
String uri = row.getString("uri");
49-
String existingHash = row.getString("hash");
49+
long existingHash = Long.parseUnsignedLong(row.getString("hash"));
5050
map.put(uri, existingHash);
5151
});
5252
return map;

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteViewFilter.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313

1414
/**
1515
* Uses an Optic query with fromView to get the existing hash values for a set of URIs from a TDE view.
16-
* This implementation requires a TDE template to be deployed that extracts the URI and hash metadata.
16+
* This implementation requires a TDE template to be deployed that contains at minimum a "uri" column
17+
* and a column matching the configured hash key name, plus any other columns desired.
18+
* The query uses a {@code where} with a {@code cts.documentQuery} to filter rows by URI, which is
19+
* significantly faster than filtering via {@code op.in}.
1720
*
1821
* @since 8.1.0
1922
*/
@@ -33,20 +36,22 @@ public DocumentWriteSet apply(Context context) {
3336
RowTemplate rowTemplate = new RowTemplate(context.getDatabaseClient());
3437

3538
try {
36-
Map<String, String> existingHashes = rowTemplate.query(op ->
37-
op.fromView(getConfig().getSchemaName(), getConfig().getViewName())
38-
.where(op.in(op.col("uri"), op.xs.stringSeq(uris))),
39-
39+
Map<String, Long> existingHashes = rowTemplate.query(op ->
40+
op.fromView(getConfig().getSchemaName(), getConfig().getViewName(), "")
41+
.where(op.cts.documentQuery(op.xs.stringSeq(uris)))
42+
,
4043
rows -> {
41-
Map<String, String> map = new HashMap<>();
44+
Map<String, Long> map = new HashMap<>();
4245
rows.forEach(row -> {
4346
String uri = row.getString("uri");
44-
String existingHash = row.getString("hash");
45-
map.put(uri, existingHash);
47+
String hashString = row.getString(getConfig().getHashKeyName());
48+
if (hashString != null && !hashString.isEmpty()) {
49+
long existingHash = Long.parseUnsignedLong(hashString);
50+
map.put(uri, existingHash);
51+
}
4652
});
4753
return map;
48-
}
49-
);
54+
});
5055

5156
if (logger.isDebugEnabled()) {
5257
logger.debug("Retrieved {} existing hashes for batch of size {}", existingHashes.size(), uris.length);

marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
2+
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
33
*/
44
package com.marklogic.client.datamovement.filter;
55

@@ -35,7 +35,7 @@ void addHashToMetadata() {
3535
DocumentWriteOperation doc2 = new DocumentWriteOperationImpl("/2.xml", metadata, new StringHandle("<doc2/>"));
3636

3737
final String timestamp = Instant.now().toString();
38-
doc2 = IncrementalWriteFilter.addHashToMetadata(doc2, "theField", "abc123", "theTimestamp", timestamp);
38+
doc2 = IncrementalWriteFilter.addHashToMetadata(doc2, "theField", 12345, "theTimestamp", timestamp);
3939

4040
assertEquals(metadata, doc1.getMetadata(), "doc1 should still have the original metadata object");
4141

@@ -46,7 +46,7 @@ void addHashToMetadata() {
4646
assertEquals("value1", metadata2.getProperties().get("prop1"), "property should be preserved");
4747

4848
assertEquals("value1", metadata2.getMetadataValues().get("meta1"), "metadata value should be preserved");
49-
assertEquals("abc123", metadata2.getMetadataValues().get("theField"), "hash field should be added");
49+
assertEquals("12345", metadata2.getMetadataValues().get("theField"), "hash field should be added");
5050
assertEquals(timestamp, metadata2.getMetadataValues().get("theTimestamp"), "timestamp should be added");
5151
}
5252
}

marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,50 @@ void fromView() {
251251
verifyIncrementalWriteWorks();
252252
}
253253

254+
@Test
255+
void loadWithEvalFilterThenVerifyOpticFilterSkipsAll() {
256+
filter = IncrementalWriteFilter.newBuilder()
257+
.useEvalQuery(true)
258+
.onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length))
259+
.build();
260+
261+
writeTenDocuments();
262+
assertEquals(10, writtenCount.get());
263+
assertEquals(0, skippedCount.get());
264+
265+
// Switch to the Optic fromLexicons filter.
266+
writtenCount.set(0);
267+
skippedCount.set(0);
268+
filter = IncrementalWriteFilter.newBuilder()
269+
.onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length))
270+
.build();
271+
272+
writeTenDocuments();
273+
assertEquals(0, writtenCount.get(), "No documents should be written since the hashes stored by the eval " +
274+
"filter should be recognized as unchanged by the Optic filter.");
275+
assertEquals(10, skippedCount.get());
276+
}
277+
278+
@Test
279+
void loadWithOpticFilterThenVerifyEvalFilterSkipsAll() {
280+
writeTenDocuments();
281+
assertEquals(10, writtenCount.get());
282+
assertEquals(0, skippedCount.get());
283+
284+
// Switch to the eval filter.
285+
writtenCount.set(0);
286+
skippedCount.set(0);
287+
filter = IncrementalWriteFilter.newBuilder()
288+
.useEvalQuery(true)
289+
.onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length))
290+
.build();
291+
292+
writeTenDocuments();
293+
assertEquals(0, writtenCount.get(), "No documents should be written since the hashes stored by the Optic " +
294+
"filter should be recognized as unchanged by the eval filter.");
295+
assertEquals(10, skippedCount.get());
296+
}
297+
254298
@Test
255299
void emptyValuesForFromView() {
256300
filter = IncrementalWriteFilter.newBuilder()
@@ -332,8 +376,8 @@ private void verifyDocumentsHasHashInMetadataKey() {
332376

333377
String hash = metadata.getMetadataValues().get("incrementalWriteHash");
334378
try {
335-
// Can use Java's support for parsing unsigned longs in base 16 to verify the hash is valid.
336-
Long.parseUnsignedLong(hash, 16);
379+
// Verify the hash is a valid unsigned long in base 10 decimal.
380+
Long.parseUnsignedLong(hash);
337381
} catch (NumberFormatException e) {
338382
fail("Document " + doc.getUri() + " has an invalid incrementalWriteHash value: " + hash);
339383
}

test-app/src/main/ml-config/databases/content-database.json

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,13 @@
224224
"invalid-values": "reject"
225225
},
226226
{
227-
"scalar-type": "string",
228-
"collation": "http://marklogic.com/collation/",
227+
"scalar-type": "unsignedLong",
229228
"field-name": "incrementalWriteHash",
230229
"range-value-positions": false,
231230
"invalid-values": "reject"
232231
},
233232
{
234-
"scalar-type": "string",
235-
"collation": "http://marklogic.com/collation/",
233+
"scalar-type": "unsignedLong",
236234
"field-name": "myWriteHash",
237235
"range-value-positions": false,
238236
"invalid-values": "reject"

test-app/src/main/ml-schemas/tde/incrementalWriteHash.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
"val": "xdmp:node-uri(.)"
1414
},
1515
{
16-
"name": "hash",
17-
"scalarType": "string",
16+
"name": "incrementalWriteHash",
17+
"scalarType": "unsignedLong",
1818
"val": "xdmp:node-metadata-value(., 'incrementalWriteHash')",
1919
"nullable": true
2020
}

0 commit comments

Comments
 (0)