1+ package io .opencmw .client ;
2+
3+ import static org .junit .jupiter .api .Assertions .*;
4+
5+ import java .io .File ;
6+ import java .io .IOException ;
7+ import java .net .URI ;
8+ import java .nio .charset .Charset ;
9+ import java .nio .file .Files ;
10+ import java .time .Duration ;
11+ import java .util .*;
12+ import java .util .concurrent .*;
13+ import java .util .concurrent .locks .LockSupport ;
14+
15+ import org .awaitility .Awaitility ;
16+ import org .junit .jupiter .api .Assumptions ;
17+ import org .junit .jupiter .api .BeforeEach ;
18+ import org .junit .jupiter .api .Test ;
19+ import org .junit .jupiter .api .Timeout ;
20+ import org .slf4j .Logger ;
21+ import org .slf4j .LoggerFactory ;
22+ import org .zeromq .*;
23+
24+ import io .opencmw .EventStore ;
25+ import io .opencmw .Filter ;
26+ import io .opencmw .MimeType ;
27+ import io .opencmw .client .DataSourcePublisher .NotificationListener ;
28+ import io .opencmw .filter .EvtTypeFilter ;
29+ import io .opencmw .serialiser .IoClassSerialiser ;
30+ import io .opencmw .serialiser .spi .BinarySerialiser ;
31+ import io .opencmw .serialiser .spi .FastByteBuffer ;
32+
33+ /**
34+ * @author Alexander Krimm
35+ */
36+ @ Timeout (20 )
37+ class OpenCmwCppInteropTest {
38+ private static final Logger LOGGER = LoggerFactory .getLogger (OpenCmwCppInteropTest .class );
39+ private static final String TEST_EXCEPTION_MSG = "test exception message" ;
40+ private EventStore eventStore ;
41+ private DataSourcePublisher dataSourcePublisher ;
42+ private final URI brokerAddress = URI .create ("mdp://localhost:12345" );
43+ private final URI brokerPubAddress = URI .create ("mds://localhost:12346" );
44+
45+ @ BeforeEach
46+ void setupBrokerAndEventStore () throws IOException {
47+ OpenCmwDataSource .class .getClass (); // just make sure the static registration inside the class is run
48+ eventStore = EventStore .getFactory ().setFilterConfig (EvtTypeFilter .class ).build ();
49+ dataSourcePublisher = new DataSourcePublisher (null , eventStore , null , null , "testOpenCmwPublisher" );
50+ }
51+
52+ @ Test
53+ void testSerialiserCompatibility () throws Exception {
54+ Assumptions .assumeTrue (new File ("../CompatibilityTest" ).exists ()); // skip if the test binary is not available
55+ int value = 42 ;
56+ int arraySize = 3 ;
57+ Process cppOpencmw = new ProcessBuilder ("../CompatibilityTest" , "serialise" , "--value" , value + "" , "--array-size" , arraySize + "" ).start ();
58+
59+ final TestData testData = new TestData ();
60+ testData .booleanValue = value % 2 == 0 ;
61+ testData .int8Value = value ;
62+ testData .int16Value = value ;
63+ testData .int32Value = value ;
64+ testData .int64Value = value ;
65+ testData .floatValue = value ;
66+ testData .doubleValue = value ;
67+ testData .charValue = (char ) value ;
68+ testData .stringValue = "test 42" ;
69+ testData .boolArray = new boolean [] { true , false , true };
70+ testData .int8Array = new byte [] { 42 , 43 , 44 };
71+ testData .int16Array = new short [] { 42 , 43 , 44 };
72+ testData .int32Array = new int [] { 42 , 43 , 44 };
73+ testData .int64Array = new long [] { 42L , 43L , 44L };
74+ testData .floatArray = new float [] { 42.0f , 43.0f , 44.0f };
75+ testData .doubleArray = new double [] { 42.0 , 43.0 , 44.0 };
76+ testData .charArray = new char [] { (char ) 42 , (char ) 43 , (char ) 44 };
77+ testData .stringArray = new String [] { "test42" , "test43" , "test44" };
78+
79+ final FastByteBuffer buffer = new FastByteBuffer (10000 );
80+ // buffer.setAutoResize(true);
81+ IoClassSerialiser serialiser = new IoClassSerialiser (buffer , BinarySerialiser .class );
82+ buffer .reset (); // '0' writing at start of buffer
83+ serialiser .serialiseObject (testData );
84+ String opencmwJavaString = new String (buffer .elements (), 0 , buffer .position (), Charset .defaultCharset ());
85+ Files .write (new File ("java.hex" ).toPath (), buffer .elements ());
86+
87+ cppOpencmw .waitFor ();
88+ String opencmwCppString = new String (cppOpencmw .getInputStream ().readAllBytes ());
89+ Files .write (new File ("cpp.hex" ).toPath (), opencmwCppString .getBytes ());
90+
91+ // this assertion does not work, because the name of the Object is different between java and cpp, so all indices are shifted by the difference in object name length
92+ // assertEquals(opencmwCppString, opencmwJavaString);
93+
94+ final FastByteBuffer cppSerialisedBuffer = FastByteBuffer .wrap (opencmwCppString .getBytes ());
95+ cppSerialisedBuffer .reset ();
96+ IoClassSerialiser cppSerialisedDeserialiser = new IoClassSerialiser (cppSerialisedBuffer , BinarySerialiser .class );
97+ TestData cppDeserialisedObject = cppSerialisedDeserialiser .deserialiseObject (TestData .class );
98+ assertEquals (testData , cppDeserialisedObject );
99+ }
100+
101+ @ Test
102+ void testSubscriptionWithListener () throws Exception {
103+ Assumptions .assumeTrue (new File ("../CompatibilityTest" ).exists ()); // skip if the test binary is not available
104+ int port = brokerAddress .getPort ();
105+ int portMds = brokerPubAddress .getPort ();
106+ Process cppOpencmw = new ProcessBuilder ("../CompatibilityTest" , "serve" , "--port" , port + "" , "--port-mds" , portMds + "" ).inheritIO ().start ();
107+ assertNotNull (dataSourcePublisher .getRawDataEventStore ());
108+ final Queue <TestData > updates = new ArrayBlockingQueue <>(20 );
109+ eventStore .register ((event , seq , last ) -> {
110+ if (!event .throwables .isEmpty ()) {
111+ System .err .println ("errors" );
112+ event .throwables .forEach ((Throwable e ) -> System .err .println (e .getMessage ()));
113+ } else {
114+ if (event .payload .getType () == TestData .class ) {
115+ System .out .println (event .payload .get (TestData .class ));
116+ } else {
117+ System .err .println ("unexpected payload type" + event .payload .getType ().getSimpleName ());
118+ }
119+ }
120+ });
121+
122+ eventStore .start ();
123+ dataSourcePublisher .start ();
124+ LockSupport .parkNanos (Duration .ofMillis (200 ).toNanos ());
125+
126+ try (final DataSourcePublisher .Client client = dataSourcePublisher .getClient ()) {
127+ final URI requestURI = URI .create (brokerPubAddress + "/testProperty" );
128+ final TestContext requestContext = new TestContext ();
129+ // requestContext.contextA = "P1";
130+ // requestContext.contextB = "T2";
131+ requestContext .contentType = MimeType .BINARY ;
132+ final NotificationListener <TestData , TestContext > listener = new NotificationListener <>() {
133+ @ Override
134+ public void dataUpdate (final TestData updatedObject , final TestContext contextObject ) {
135+ // assertEquals(new TestContext(), contextObject);
136+ updates .add (updatedObject );
137+ System .err .println ("update" );
138+ }
139+
140+ @ Override
141+ public void updateException (final Throwable exception ) {
142+ fail ("Unexpected exception notification" , exception );
143+ }
144+ };
145+ LOGGER .atInfo ().addArgument (requestURI ).log ("subscribing to endpoint: {}" );
146+ final String reqId = client .subscribe (requestURI , TestData .class , requestContext , TestContext .class , listener );
147+
148+ LockSupport .parkNanos (Duration .ofMillis (10 ).toNanos ());
149+
150+ // check if all notifications were received
151+ LOGGER .atDebug ().log ("Waiting for subscription updates to be received" );
152+ Awaitility .await ().atMost (Duration .ofSeconds (15 )).until (() -> updates .size () >= 4 );
153+ LOGGER .atDebug ().log ("Subscription updates complete" );
154+
155+ LOGGER .atDebug ().addArgument (reqId ).log ("Unsubscribing: {}" );
156+ client .unsubscribe (reqId );
157+ }
158+
159+ // stop event store
160+ eventStore .stop ();
161+ dataSourcePublisher .stop ();
162+ cppOpencmw .destroy ();
163+ }
164+
165+ @ Test
166+ void testGetRequest () throws InterruptedException , ExecutionException , TimeoutException , IOException {
167+ Assumptions .assumeTrue (new File ("../CompatibilityTest" ).exists ()); // skip if the test binary is not available
168+ int port = brokerAddress .getPort ();
169+ int portMds = brokerPubAddress .getPort ();
170+ Process cppOpencmw = new ProcessBuilder ("../CompatibilityTest" , "serve" , "--port" , port + "" , "--port-mds" , portMds + "" ).inheritIO ().start ();
171+ eventStore .start ();
172+ dataSourcePublisher .start ();
173+ LockSupport .parkNanos (Duration .ofMillis (200 ).toNanos ());
174+
175+ // get request
176+ final URI requestURI = URI .create (brokerAddress + "/testProperty?contentType=application/octet-stream&contextA=test&contextB=asdf" );
177+ LOGGER .atInfo ().addArgument (requestURI ).log ("requesting GET from endpoint: {}" );
178+ final Future <TestData > future ;
179+ try (final DataSourcePublisher .Client client = dataSourcePublisher .getClient ()) {
180+ future = client .get (requestURI , null , TestData .class ); // uri_without_query oder serviceName + resolver, requestContext, type
181+ } catch (Exception e ) {
182+ System .err .println (e .getMessage ());
183+ throw e ;
184+ }
185+
186+ // assert result
187+ final TestData result = future .get (1000 , TimeUnit .MILLISECONDS );
188+
189+ System .out .println (result .doubleValue );
190+
191+ eventStore .stop ();
192+ dataSourcePublisher .stop ();
193+ }
194+
195+ @ Test
196+ void testGetRequestWithAnnotations () throws InterruptedException , ExecutionException , TimeoutException , IOException {
197+ Assumptions .assumeTrue (new File ("../CompatibilityTest" ).exists ()); // skip if the test binary is not available
198+ int port = brokerAddress .getPort ();
199+ int portMds = brokerPubAddress .getPort ();
200+ Process cppOpencmw = new ProcessBuilder ("../CompatibilityTest" , "serve" , "--port" , port + "" , "--port-mds" , portMds + "" ).inheritIO ().start ();
201+ eventStore .start ();
202+ dataSourcePublisher .start ();
203+ LockSupport .parkNanos (Duration .ofMillis (200 ).toNanos ());
204+
205+ // get request
206+ final URI requestURI = URI .create (brokerAddress + "/annotatedProperty?contentType=application/octet-stream&contextA=test&contextB=asdf" );
207+ LOGGER .atInfo ().addArgument (requestURI ).log ("requesting GET from endpoint: {}" );
208+ final Future <TestData > future ;
209+ try (final DataSourcePublisher .Client client = dataSourcePublisher .getClient ()) {
210+ future = client .get (requestURI , null , TestData .class ); // uri_without_query oder serviceName + resolver, requestContext, type
211+ } catch (Exception e ) {
212+ System .err .println (e .getMessage ());
213+ throw e ;
214+ }
215+
216+ // assert result
217+ final TestData result = future .get (1000 , TimeUnit .MILLISECONDS );
218+
219+ System .out .println (result .doubleValue );
220+
221+ eventStore .stop ();
222+ dataSourcePublisher .stop ();
223+ }
224+
225+ public static class TestData {
226+ boolean booleanValue ;
227+ int int8Value ;
228+ int int16Value ;
229+ int int32Value ;
230+ long int64Value ;
231+ float floatValue ;
232+ double doubleValue ;
233+ char charValue ;
234+ String stringValue ;
235+ boolean [] boolArray ;
236+ byte [] int8Array ;
237+ short [] int16Array ;
238+ int [] int32Array ;
239+ long [] int64Array ;
240+ float [] floatArray ;
241+ double [] doubleArray ;
242+ char [] charArray ;
243+ String [] stringArray ;
244+ // TestData nestedData;
245+ }
246+
247+ public static class TestContext implements Filter {
248+ // public String contextA;
249+ // public String contextB;
250+ public MimeType contentType = MimeType .BINARY ;
251+
252+ @ Override
253+ public void clear () {
254+ // contextA = "";
255+ // contextB = "";
256+ contentType = MimeType .BINARY ;
257+ }
258+
259+ @ Override
260+ public void copyTo (Filter other ) {
261+ if (!(other instanceof TestContext otherContext )) {
262+ throw new RuntimeException ("Trying to copy TestContext into a different filter type" );
263+ }
264+ // otherContext.contextA = contextA;
265+ // otherContext.contextB = contextB;
266+ otherContext .contentType = contentType ;
267+ }
268+
269+ @ Override
270+ public String getKey () {
271+ return "" ;
272+ }
273+
274+ @ Override
275+ public String getValue () {
276+ return "" ;
277+ }
278+
279+ @ Override
280+ public Filter get (String ctxString ) {
281+ return null ;
282+ }
283+
284+ @ Override
285+ public boolean matches (Filter other ) {
286+ return false ;
287+ }
288+ }
289+ }
0 commit comments