15
15
*/
16
16
package org .commonjava .indy .content .index ;
17
17
18
+ import org .commonjava .cdi .util .weft .ExecutorConfig ;
19
+ import org .commonjava .cdi .util .weft .WeftManaged ;
18
20
import org .commonjava .indy .IndyWorkflowException ;
19
21
import org .commonjava .indy .content .ContentManager ;
20
22
import org .commonjava .indy .content .index .conf .ContentIndexConfig ;
21
23
import org .commonjava .indy .core .content .PathMaskChecker ;
22
24
import org .commonjava .indy .data .IndyDataException ;
23
25
import org .commonjava .indy .data .StoreDataManager ;
24
26
import org .commonjava .indy .measure .annotation .Measure ;
25
- import org .commonjava .indy .measure .annotation .MetricNamed ;
26
27
import org .commonjava .indy .model .core .ArtifactStore ;
27
28
import org .commonjava .indy .model .core .Group ;
28
29
import org .commonjava .indy .model .core .HostedRepository ;
51
52
import java .util .List ;
52
53
import java .util .Objects ;
53
54
import java .util .Set ;
55
+ import java .util .concurrent .Executor ;
54
56
55
57
import static org .commonjava .indy .core .content .group .GroupMergeHelper .GROUP_METADATA_EXISTS ;
56
58
import static org .commonjava .indy .core .content .group .GroupMergeHelper .GROUP_METADATA_GENERATED ;
57
- import static org .commonjava .indy .measure .annotation .MetricNamed .DEFAULT ;
58
59
59
60
/**
60
61
* Decorator for ContentManager which uses Infinispan to index content to avoid having to iterate all members of large
@@ -88,6 +89,11 @@ public abstract class IndexingContentManagerDecorator
88
89
@ Inject
89
90
private ContentIndexConfig indexCfg ;
90
91
92
+ @ Inject
93
+ @ WeftManaged
94
+ @ ExecutorConfig ( named ="content-index-store-deindex" , priority = 4 , threads = 10 )
95
+ private Executor deIndexExecutor ;
96
+
91
97
protected IndexingContentManagerDecorator ()
92
98
{
93
99
}
@@ -112,6 +118,15 @@ protected IndexingContentManagerDecorator( final ContentManager delegate, final
112
118
this .indexCfg = indexCfg ;
113
119
}
114
120
121
+ protected IndexingContentManagerDecorator ( final ContentManager delegate , final StoreDataManager storeDataManager ,
122
+ final SpecialPathManager specialPathManager ,
123
+ final ContentIndexManager indexManager , final NotFoundCache nfc ,
124
+ final ContentIndexConfig indexCfg , final Executor deIndexExecutor )
125
+ {
126
+ this (delegate , storeDataManager , specialPathManager , indexManager , nfc , indexCfg );
127
+ this .deIndexExecutor = deIndexExecutor ;
128
+ }
129
+
115
130
@ Override
116
131
public Transfer retrieveFirst ( final List <? extends ArtifactStore > stores , final String path )
117
132
throws IndyWorkflowException
@@ -163,12 +178,7 @@ public List<Transfer> retrieveAll( final List<? extends ArtifactStore> stores, f
163
178
}
164
179
165
180
return null ;
166
- } ).filter ( Objects ::nonNull ).forEachOrdered ( ( transfer ) -> {
167
- if ( transfer != null )
168
- {
169
- results .add ( transfer );
170
- }
171
- } );
181
+ } ).filter ( Objects ::nonNull ).forEachOrdered ( results ::add );
172
182
173
183
return results ;
174
184
}
@@ -200,7 +210,7 @@ public Transfer retrieve( final ArtifactStore store, final String path, final Ev
200
210
else if ( isAuthoritativelyMissing ( store ) )
201
211
{
202
212
logger .debug (
203
- "Not found indexed transfer: {} and authoritative index switched on. Considering not found and return null." );
213
+ "Not found indexed transfer: {} and authoritative index switched on. Considering not found and return null." , transfer );
204
214
return null ;
205
215
}
206
216
@@ -286,7 +296,7 @@ else if ( isAuthoritativelyMissing( store ) )
286
296
{
287
297
; // metadata generated/exists but missing due to membership change, not add to nfc so next req can retry
288
298
}
289
- else if ( StoreType . hosted != type ) // don't track NFC for hosted repos
299
+ else // don't track NFC for hosted repos
290
300
{
291
301
nfc .addMissing ( resource );
292
302
}
@@ -458,7 +468,7 @@ public Transfer getTransfer( final ArtifactStore store, final String path, final
458
468
else if ( isAuthoritativelyMissing ( store ) )
459
469
{
460
470
logger .info (
461
- "Not found indexed transfer: {} and authoritative index switched on. Considering not found and return null." );
471
+ "Not found indexed transfer: {} and authoritative index switched on. Considering not found and return null." , transfer );
462
472
return null ;
463
473
}
464
474
@@ -510,7 +520,7 @@ else if ( isAuthoritativelyMissing( store ) )
510
520
return transfer ;
511
521
}
512
522
513
- @ Measure ( timers = @ MetricNamed ( DEFAULT ), exceptions = @ MetricNamed ( DEFAULT ) )
523
+ @ Measure
514
524
@ Deprecated
515
525
public Transfer getIndexedMemberTransfer ( final Group group , final String path , TransferOperation op ,
516
526
ContentManagementFunction func , final EventMetadata metadata )
@@ -602,7 +612,7 @@ public Transfer getTransfer( final StoreKey storeKey, final String path, final T
602
612
603
613
if ( isAuthoritativelyMissing ( store ) )
604
614
{
605
- logger .debug ( "Not found indexed transfer: {} and authoritative index switched on. Return null." );
615
+ logger .debug ( "Not found indexed transfer: {} and authoritative index switched on. Return null." , transfer );
606
616
return null ;
607
617
}
608
618
@@ -713,21 +723,25 @@ public Transfer store( final ArtifactStore store, final String path, final Input
713
723
// may change the content index sequence based on the constituents sequence in parent groups
714
724
if ( store .getType () == StoreType .hosted )
715
725
{
716
- try
717
- {
718
- Set <Group > groups = storeDataManager .query ().getGroupsAffectedBy ( store .getKey () );
719
- if ( groups != null && !groups .isEmpty () && indexCfg .isEnabled () )
726
+ //FIXME: One potential problem here: The fixed thread pool is using a blocking queue to
727
+ // cache runnables, which could cause OOM if there are bunch of uploading happened in
728
+ // a short time period. We need to monitor if this could happen.
729
+ deIndexExecutor .execute ( () -> {
730
+ try
720
731
{
721
- groups .forEach ( g -> indexManager .deIndexStorePath ( g .getKey (), path ) );
732
+ Set <Group > groups = storeDataManager .query ().getGroupsAffectedBy ( store .getKey () );
733
+ if ( groups != null && !groups .isEmpty () && indexCfg .isEnabled () )
734
+ {
735
+ groups .forEach ( g -> indexManager .deIndexStorePath ( g .getKey (), path ) );
736
+ }
722
737
}
723
- }
724
- catch ( IndyDataException e )
725
- {
726
- throw new IndyWorkflowException (
727
- "Failed to get groups which contains: %s for NFC handling. Reason: %s" , e , store .getKey (),
728
- e .getMessage () );
729
- }
730
-
738
+ catch ( IndyDataException e )
739
+ {
740
+ logger .error (
741
+ String .format ( "Failed to get groups which contains: %s for NFC handling. Reason: %s" ,
742
+ store .getKey (), e .getMessage () ), e );
743
+ }
744
+ } );
731
745
}
732
746
}
733
747
// nfcClearByContaining( store, path );
0 commit comments