@@ -199,69 +199,6 @@ void testMultiThreadWrapper() throws Exception {
199199 }
200200 }
201201
202- @ Test
203- void testOptimizedWithMultiThreadWrapper () throws Exception {
204- PlainPBStoragePlugin storageplugin = storagePluginPB ;
205-
206- Instant end = TimeUtils .now ();
207- Instant start = TimeUtils .minusDays (end , 365 );
208- Optimized optimized_100 = (Optimized ) PostProcessors .findPostProcessor ("optimized_100" );
209- optimized_100 .initialize ("optimized_100" , pvName );
210- PVTypeInfo info = new PVTypeInfo ();
211- info .setComputedStorageRate (40 );
212- optimized_100 .estimateMemoryConsumption (pvName , info , start , end , null );
213- try (BasicContext context = new BasicContext ()) {
214- ExecutorService executors = Executors .newFixedThreadPool (2 );
215- long t0 = System .currentTimeMillis ();
216- assert storageplugin != null ;
217- List <Callable <EventStream >> callables = storageplugin .getDataForPV (context , pvName , start , end , optimized_100 );
218- List <Future <EventStream >> futures = new ArrayList <Future <EventStream >>();
219- for (Callable <EventStream > callable : callables ) {
220- futures .add (executors .submit (callable ));
221- }
222-
223- for (Future <EventStream > future : futures ) {
224- try {
225- future .get ();
226- } catch (Exception ex ) {
227- logger .error ("Exception computing optimized_100" , ex );
228- }
229- }
230-
231- long eventCount = 0 ;
232- EventStream consolidatedEventStream =
233- ((PostProcessorWithConsolidatedEventStream ) optimized_100 ).getConsolidatedEventStream ();
234- // In cases where the data spans year boundaries, we continue with the same stream.
235- boolean continueprocessing = true ;
236- while (continueprocessing ) {
237- try {
238- Optional <Instant > previousEventTimeStamp = Optional .empty ();
239- for (Event e : consolidatedEventStream ) {
240- // Check that the output is ordered:
241- if (previousEventTimeStamp .isPresent ()) {
242- Assertions .assertTrue (previousEventTimeStamp .get ().isBefore (e .getEventTimeStamp ()));
243- }
244- previousEventTimeStamp = Optional .of (e .getEventTimeStamp ());
245-
246- // Check that the mean equals 1.0 for every datapoint in the output:
247- Assertions .assertEquals (
248- 1.0 ,
249- e .getSampleValue ().getValue ().doubleValue (),
250- 0.0 ,
251- "All values are 1 so mean should be 1. Instead we got "
252- + e .getSampleValue ().getValue ().doubleValue () + " at " + eventCount + " for pv "
253- + pvName );
254- eventCount ++;
255- }
256- continueprocessing = false ;
257- } catch (ChangeInYearsException ex ) {
258- logger .debug ("Change in years" );
259- }
260- long t1 = System .currentTimeMillis ();
261- executors .shutdown ();
262- // assertTrue("Expecting 365 values got " + eventCount + " for pv " + pvName, eventCount == 365);
263- logger .info ("Multi threaded wrapper took " + (t1 - t0 ) + "(ms)" );
264- }
265- }
266- }
202+ // Note that not all the post processors are thread safe.
203+ // And they need to be made thread safe before adding to this test set
267204}
0 commit comments