23
23
import org .opensearch .client .transport .TransportOptions ;
24
24
import org .opensearch .common .unit .ByteSizeUnit ;
25
25
import org .opensearch .dataprepper .aws .api .AwsCredentialsSupplier ;
26
+ import org .opensearch .dataprepper .common .concurrent .BackgroundThreadFactory ;
26
27
import org .opensearch .dataprepper .expression .ExpressionEvaluationException ;
27
28
import org .opensearch .dataprepper .expression .ExpressionEvaluator ;
28
29
import org .opensearch .dataprepper .metrics .MetricNames ;
61
62
import org .opensearch .dataprepper .plugins .sink .opensearch .dlq .FailedDlqData ;
62
63
import org .opensearch .dataprepper .plugins .sink .opensearch .index .ClusterSettingsParser ;
63
64
import org .opensearch .dataprepper .plugins .sink .opensearch .index .DocumentBuilder ;
65
+ import org .opensearch .dataprepper .plugins .sink .opensearch .index .ExistingDocumentQueryManager ;
64
66
import org .opensearch .dataprepper .plugins .sink .opensearch .index .IndexManager ;
65
67
import org .opensearch .dataprepper .plugins .sink .opensearch .index .IndexManagerFactory ;
66
68
import org .opensearch .dataprepper .plugins .sink .opensearch .index .IndexTemplateAPIWrapper ;
78
80
import java .nio .file .StandardOpenOption ;
79
81
import java .util .Collection ;
80
82
import java .util .Collections ;
83
+ import java .util .HashSet ;
81
84
import java .util .List ;
82
85
import java .util .Objects ;
83
86
import java .util .Optional ;
87
+ import java .util .Set ;
84
88
import java .util .StringJoiner ;
85
89
import java .util .concurrent .ConcurrentHashMap ;
90
+ import java .util .concurrent .ExecutorService ;
91
+ import java .util .concurrent .Executors ;
86
92
import java .util .concurrent .locks .ReentrantLock ;
87
93
import java .util .function .Function ;
88
94
import java .util .function .Supplier ;
@@ -149,6 +155,12 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
149
155
private final ConcurrentHashMap <Long , Long > lastFlushTimeMap ;
150
156
private final PluginConfigObservable pluginConfigObservable ;
151
157
158
+ private ExistingDocumentQueryManager existingDocumentQueryManager ;
159
+
160
+ private final ExecutorService queryExecutorService ;
161
+
162
+ private final int processWorkerThreads ;
163
+
152
164
@ DataPrepperPluginConstructor
153
165
public OpenSearchSink (final PluginSetting pluginSetting ,
154
166
final SinkContext sinkContext ,
@@ -158,6 +170,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
158
170
final PluginConfigObservable pluginConfigObservable ,
159
171
final OpenSearchSinkConfig openSearchSinkConfiguration ) {
160
172
super (pluginSetting , Integer .MAX_VALUE , INITIALIZE_RETRY_WAIT_TIME_MS );
173
+ this .processWorkerThreads = pipelineDescription .getNumberOfProcessWorkers ();
161
174
this .awsCredentialsSupplier = awsCredentialsSupplier ;
162
175
this .sinkContext = sinkContext != null ? sinkContext : new SinkContext (null , Collections .emptyList (), Collections .emptyList (), Collections .emptyList ());
163
176
this .expressionEvaluator = expressionEvaluator ;
@@ -190,6 +203,8 @@ public OpenSearchSink(final PluginSetting pluginSetting,
190
203
this .lastFlushTimeMap = new ConcurrentHashMap <>();
191
204
this .pluginConfigObservable = pluginConfigObservable ;
192
205
this .objectMapper = new ObjectMapper ();
206
+ this .queryExecutorService = openSearchSinkConfig .getIndexConfiguration ().getQueryTerm () != null ?
207
+ Executors .newSingleThreadExecutor (BackgroundThreadFactory .defaultExecutorThreadFactory ("existing-document-query-manager" )) : null ;
193
208
194
209
final Optional <DlqConfiguration > dlqConfig = openSearchSinkConfig .getRetryConfiguration ().getDlq ();
195
210
if (dlqConfig .isPresent ()) {
@@ -233,6 +248,12 @@ private void doInitializeInternal() throws IOException {
233
248
};
234
249
openSearchClientRefresher = new OpenSearchClientRefresher (
235
250
pluginMetrics , connectionConfiguration , clientFunction );
251
+
252
+ if (queryExecutorService != null ) {
253
+ existingDocumentQueryManager = new ExistingDocumentQueryManager (openSearchSinkConfig .getIndexConfiguration (), pluginMetrics , openSearchClient );
254
+ queryExecutorService .submit (existingDocumentQueryManager );
255
+ }
256
+
236
257
pluginConfigObservable .addPluginConfigObserver (
237
258
newOpenSearchSinkConfig -> openSearchClientRefresher .update ((OpenSearchSinkConfig ) newOpenSearchSinkConfig ));
238
259
configuredIndexAlias = openSearchSinkConfig .getIndexConfiguration ().getIndexAlias ();
@@ -280,7 +301,8 @@ private void doInitializeInternal() throws IOException {
280
301
maxRetries ,
281
302
bulkRequestSupplier ,
282
303
pipeline ,
283
- PLUGIN_NAME );
304
+ PLUGIN_NAME ,
305
+ openSearchSinkConfig .getIndexConfiguration ().getQueryOnBulkFailures () ? existingDocumentQueryManager : null );
284
306
285
307
this .initialized = true ;
286
308
LOG .info ("Initialized OpenSearch sink" );
@@ -394,6 +416,22 @@ public void doOutput(final Collection<Record<Event>> records) {
394
416
AccumulatingBulkRequest <BulkOperationWrapper , BulkRequest > bulkRequest = bulkRequestMap .get (threadId );
395
417
long lastFlushTime = lastFlushTimeMap .get (threadId );
396
418
419
+ Set <BulkOperationWrapper > documentsReadyForIndexing = new HashSet <>();
420
+ if (openSearchSinkConfig .getIndexConfiguration ().getQueryTerm () != null ) {
421
+ documentsReadyForIndexing = existingDocumentQueryManager .getAndClearBulkOperationsReadyToIndex ();
422
+ }
423
+
424
+
425
+ if (!documentsReadyForIndexing .isEmpty ()) {
426
+ LOG .info ("Found {} documents ready for indexing from query manager" , documentsReadyForIndexing .size ());
427
+ }
428
+
429
+ for (final BulkOperationWrapper bulkOperationWrapper : documentsReadyForIndexing ) {
430
+ bulkRequest = flushBatch (bulkRequest , bulkOperationWrapper , lastFlushTime );
431
+ bulkRequest .addOperation (bulkOperationWrapper );
432
+ }
433
+
434
+
397
435
for (final Record <Event > record : records ) {
398
436
final Event event = record .getData ();
399
437
final SerializedJson document = getDocument (event );
@@ -465,13 +503,18 @@ public void doOutput(final Collection<Record<Event>> records) {
465
503
continue ;
466
504
}
467
505
468
- BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper (bulkOperation , event .getEventHandle (), serializedJsonNode );
469
- final long estimatedBytesBeforeAdd = bulkRequest .estimateSizeInBytesWithDocument (bulkOperationWrapper );
470
- if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest .getOperationsCount () > 0 ) {
471
- flushBatch (bulkRequest );
472
- lastFlushTime = System .currentTimeMillis ();
473
- bulkRequest = bulkRequestSupplier .get ();
506
+ final String queryTermKey = openSearchSinkConfig .getIndexConfiguration ().getQueryTerm ();
507
+ final String termValue = queryTermKey != null ?
508
+ event .get (queryTermKey , String .class ) : null ;
509
+ BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper (bulkOperation , event .getEventHandle (), serializedJsonNode , termValue );
510
+
511
+ if (openSearchSinkConfig .getIndexConfiguration ().getQueryWhen () != null &&
512
+ expressionEvaluator .evaluateConditional (openSearchSinkConfig .getIndexConfiguration ().getQueryWhen (), event )) {
513
+ existingDocumentQueryManager .addBulkOperation (bulkOperationWrapper );
514
+ continue ;
474
515
}
516
+
517
+ bulkRequest = flushBatch (bulkRequest , bulkOperationWrapper , lastFlushTime );
475
518
bulkRequest .addOperation (bulkOperationWrapper );
476
519
}
477
520
@@ -606,6 +649,10 @@ public void shutdown() {
606
649
super .shutdown ();
607
650
closeFiles ();
608
651
openSearchClient .shutdown ();
652
+ if (queryExecutorService != null ) {
653
+ existingDocumentQueryManager .stop ();
654
+ queryExecutorService .shutdown ();
655
+ }
609
656
}
610
657
611
658
private void maybeUpdateServerlessNetworkPolicy () {
@@ -652,4 +699,18 @@ private boolean isUsingDocumentFilters() {
652
699
(sinkContext .getExcludeKeys () != null && !sinkContext .getExcludeKeys ().isEmpty ()) ||
653
700
sinkContext .getTagsTargetKey () != null ;
654
701
}
702
+
703
+ private AccumulatingBulkRequest <BulkOperationWrapper , BulkRequest > flushBatch (
704
+ final AccumulatingBulkRequest <BulkOperationWrapper , BulkRequest > bulkRequest ,
705
+ final BulkOperationWrapper bulkOperationWrapper ,
706
+ long lastFlushTime
707
+ ) {
708
+ final long estimatedBytesBeforeAdd = bulkRequest .estimateSizeInBytesWithDocument (bulkOperationWrapper );
709
+ if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest .getOperationsCount () > 0 ) {
710
+ flushBatch (bulkRequest );
711
+ lastFlushTime = System .currentTimeMillis ();
712
+ return bulkRequestSupplier .get ();
713
+ }
714
+ return bulkRequest ;
715
+ }
655
716
}
0 commit comments