1+ package io .opencmw .client ;
2+
3+ import static org .junit .jupiter .api .Assertions .*;
4+
5+ import java .io .File ;
6+ import java .io .IOException ;
7+ import java .net .URI ;
8+ import java .nio .file .Files ;
9+ import java .time .Duration ;
10+ import java .util .*;
11+ import java .util .concurrent .*;
12+ import java .util .concurrent .locks .LockSupport ;
13+
14+ import org .awaitility .Awaitility ;
15+ import org .junit .jupiter .api .Assumptions ;
16+ import org .junit .jupiter .api .BeforeEach ;
17+ import org .junit .jupiter .api .Test ;
18+ import org .junit .jupiter .api .Timeout ;
19+ import org .slf4j .Logger ;
20+ import org .slf4j .LoggerFactory ;
21+ import org .zeromq .*;
22+
23+ import io .opencmw .EventStore ;
24+ import io .opencmw .Filter ;
25+ import io .opencmw .MimeType ;
26+ import io .opencmw .client .DataSourcePublisher .NotificationListener ;
27+ import io .opencmw .filter .EvtTypeFilter ;
28+ import io .opencmw .serialiser .IoClassSerialiser ;
29+ import io .opencmw .serialiser .spi .BinarySerialiser ;
30+ import io .opencmw .serialiser .spi .FastByteBuffer ;
31+
32+ /**
33+ * @author Alexander Krimm
34+ */
35+ @ Timeout (20 )
36+ class OpenCmwCppInteropTest {
37+ private static final Logger LOGGER = LoggerFactory .getLogger (OpenCmwCppInteropTest .class );
38+ private EventStore eventStore ;
39+ private DataSourcePublisher dataSourcePublisher ;
40+ private final URI brokerAddress = URI .create ("mdp://localhost:12345" );
41+ private final URI brokerPubAddress = URI .create ("mds://localhost:12346" );
42+
43+ @ BeforeEach
44+ void setupBrokerAndEventStore () throws IOException {
45+ eventStore = EventStore .getFactory ().setFilterConfig (EvtTypeFilter .class ).build ();
46+ dataSourcePublisher = new DataSourcePublisher (null , eventStore , null , null , "testOpenCmwPublisher" );
47+ }
48+
49+ @ Test
50+ void testSerialiserCompatibility () throws Exception {
51+ Assumptions .assumeTrue (new File ("../CompatibilityTest" ).exists ()); // skip if the test binary is not available
52+ int value = 42 ;
53+ int arraySize = 3 ;
54+ Process cppOpencmw = new ProcessBuilder ("../CompatibilityTest" , "serialise" , "--value" , value + "" , "--array-size" , arraySize + "" ).start ();
55+
56+ final TestData testData = new TestData ();
57+ testData .booleanValue = value % 2 == 0 ;
58+ testData .int8Value = (byte ) value ;
59+ testData .int16Value = (short ) value ;
60+ testData .int32Value = value ;
61+ testData .int64Value = value ;
62+ testData .floatValue = value ;
63+ testData .doubleValue = value ;
64+ testData .charValue = (char ) value ;
65+ testData .stringValue = "test 42" ;
66+ testData .boolArray = new boolean [] { true , false , true };
67+ testData .int8Array = new byte [] { 42 , 43 , 44 };
68+ testData .int16Array = new short [] { 42 , 43 , 44 };
69+ testData .int32Array = new int [] { 42 , 43 , 44 };
70+ testData .int64Array = new long [] { 42L , 43L , 44L };
71+ testData .floatArray = new float [] { 42.0f , 43.0f , 44.0f };
72+ testData .doubleArray = new double [] { 42.0 , 43.0 , 44.0 };
73+ testData .charArray = new char [] { (char ) 42 , (char ) 43 , (char ) 44 };
74+ testData .stringArray = new String [] { "test42" , "test43" , "test44" };
75+
76+ final FastByteBuffer buffer = new FastByteBuffer (10000 );
77+ // buffer.setAutoResize(true);
78+ IoClassSerialiser serialiser = new IoClassSerialiser (buffer , BinarySerialiser .class );
79+ buffer .reset (); // '0' writing at start of buffer
80+ serialiser .serialiseObject (testData );
81+ Files .write (new File ("java.hex" ).toPath (), Arrays .copyOfRange (buffer .elements (), 0 , buffer .position ()));
82+
83+ cppOpencmw .waitFor ();
84+ byte [] opencmwCppString = cppOpencmw .getInputStream ().readAllBytes ();
85+ Files .write (new File ("cpp.hex" ).toPath (), opencmwCppString );
86+
87+ // this assertion does not work, because the name of the Object is different between java and cpp, so all indices are shifted by the difference in object name length
88+ // assertEquals(opencmwCppString, opencmwJavaString);
89+
90+ final FastByteBuffer cppSerialisedBuffer = FastByteBuffer .wrap (opencmwCppString );
91+ cppSerialisedBuffer .reset ();
92+ IoClassSerialiser cppSerialisedDeserialiser = new IoClassSerialiser (cppSerialisedBuffer , BinarySerialiser .class );
93+ TestData cppDeserialisedObject = cppSerialisedDeserialiser .deserialiseObject (TestData .class );
94+ assertEquals (testData , cppDeserialisedObject );
95+ }
96+
97+ @ Test
98+ void testSubscriptionWithListener () throws Exception {
99+ Assumptions .assumeTrue (new File ("../CompatibilityTest" ).exists ()); // skip if the test binary is not available
100+ int port = brokerAddress .getPort ();
101+ int portMds = brokerPubAddress .getPort ();
102+ Process cppOpencmw = new ProcessBuilder ("../CompatibilityTest" , "serve" , "--port" , port + "" , "--port-mds" , portMds + "" ).inheritIO ().start ();
103+ assertNotNull (dataSourcePublisher .getRawDataEventStore ());
104+ final Queue <TestData > updates = new ArrayBlockingQueue <>(20 );
105+ eventStore .register ((event , seq , last ) -> {
106+ if (!event .throwables .isEmpty ()) {
107+ System .err .println ("errors" );
108+ event .throwables .forEach ((Throwable e ) -> System .err .println (e .getMessage ()));
109+ } else {
110+ if (event .payload .getType () == TestData .class ) {
111+ System .out .println ("got update with payload: doubleValue = " + event .payload .get (TestData .class ).doubleValue );
112+ } else {
113+ System .err .println ("unexpected payload type" + event .payload .getType ().getSimpleName ());
114+ }
115+ }
116+ });
117+
118+ eventStore .start ();
119+ dataSourcePublisher .start ();
120+ LockSupport .parkNanos (Duration .ofMillis (200 ).toNanos ());
121+
122+ try (final DataSourcePublisher .Client client = dataSourcePublisher .getClient ()) {
123+ final URI requestURI = URI .create (brokerPubAddress + "/testProperty" );
124+ final TestContext requestContext = new TestContext ();
125+ // requestContext.contextA = "P1";
126+ // requestContext.contextB = "T2";
127+ requestContext .contentType = MimeType .BINARY ;
128+ final NotificationListener <TestData , TestContext > listener = new NotificationListener <>() {
129+ @ Override
130+ public void dataUpdate (final TestData updatedObject , final TestContext contextObject ) {
131+ // assertEquals(new TestContext(), contextObject);
132+ updates .add (updatedObject );
133+ System .err .println ("update" );
134+ }
135+
136+ @ Override
137+ public void updateException (final Throwable exception ) {
138+ fail ("Unexpected exception notification" , exception );
139+ }
140+ };
141+ LOGGER .atInfo ().addArgument (requestURI ).log ("subscribing to endpoint: {}" );
142+ final String reqId = client .subscribe (requestURI , TestData .class , requestContext , TestContext .class , listener );
143+
144+ LockSupport .parkNanos (Duration .ofMillis (10 ).toNanos ());
145+
146+ // check if all notifications were received
147+ LOGGER .atDebug ().log ("Waiting for subscription updates to be received" );
148+ Awaitility .await ().atMost (Duration .ofSeconds (15 )).until (() -> updates .size () >= 4 );
149+ LOGGER .atDebug ().log ("Subscription updates complete" );
150+
151+ LOGGER .atDebug ().addArgument (reqId ).log ("Unsubscribing: {}" );
152+ client .unsubscribe (reqId );
153+ }
154+
155+ // stop event store
156+ eventStore .stop ();
157+ dataSourcePublisher .stop ();
158+ cppOpencmw .destroy ();
159+ }
160+
161+ @ Test
162+ void testGetRequest () throws InterruptedException , ExecutionException , TimeoutException , IOException {
163+ Assumptions .assumeTrue (new File ("../CompatibilityTest" ).exists ()); // skip if the test binary is not available
164+ int port = brokerAddress .getPort ();
165+ int portMds = brokerPubAddress .getPort ();
166+ Process cppOpencmw = new ProcessBuilder ("../CompatibilityTest" , "serve" , "--port" , port + "" , "--port-mds" , portMds + "" ).inheritIO ().start ();
167+ eventStore .start ();
168+ dataSourcePublisher .start ();
169+ LockSupport .parkNanos (Duration .ofMillis (200 ).toNanos ());
170+
171+ // get request
172+ final URI requestURI = URI .create (brokerAddress + "/testProperty?contentType=application/octet-stream&contextA=test&contextB=asdf" );
173+ LOGGER .atInfo ().addArgument (requestURI ).log ("requesting GET from endpoint: {}" );
174+ final Future <TestData > future ;
175+ try (final DataSourcePublisher .Client client = dataSourcePublisher .getClient ()) {
176+ future = client .get (requestURI , null , TestData .class ); // uri_without_query oder serviceName + resolver, requestContext, type
177+ } catch (Exception e ) {
178+ System .err .println (e .getMessage ());
179+ throw e ;
180+ }
181+
182+ // assert result
183+ final TestData result = future .get (1000 , TimeUnit .MILLISECONDS );
184+
185+ System .out .println (result .doubleValue );
186+
187+ eventStore .stop ();
188+ dataSourcePublisher .stop ();
189+ cppOpencmw .destroy ();
190+ }
191+
192+ @ Test
193+ void testGetRequestWithAnnotations () throws InterruptedException , ExecutionException , TimeoutException , IOException {
194+ Assumptions .assumeTrue (new File ("../CompatibilityTest" ).exists ()); // skip if the test binary is not available
195+ int port = brokerAddress .getPort ();
196+ int portMds = brokerPubAddress .getPort ();
197+ Process cppOpencmw = new ProcessBuilder ("../CompatibilityTest" , "serve" , "--port" , port + "" , "--port-mds" , portMds + "" ).inheritIO ().start ();
198+ eventStore .start ();
199+ dataSourcePublisher .start ();
200+ LockSupport .parkNanos (Duration .ofMillis (200 ).toNanos ());
201+
202+ // get request
203+ final URI requestURI = URI .create (brokerAddress + "/annotatedProperty?contentType=application/octet-stream&contextA=test&contextB=asdf" );
204+ LOGGER .atInfo ().addArgument (requestURI ).log ("requesting GET from endpoint: {}" );
205+ final Future <TestData > future ;
206+ try (final DataSourcePublisher .Client client = dataSourcePublisher .getClient ()) {
207+ future = client .get (requestURI , null , TestData .class ); // uri_without_query oder serviceName + resolver, requestContext, type
208+ } catch (Exception e ) {
209+ System .err .println (e .getMessage ());
210+ throw e ;
211+ }
212+
213+ // assert result
214+ final TestData result = future .get (1000 , TimeUnit .MILLISECONDS );
215+
216+ System .out .println (result .doubleValue );
217+
218+ eventStore .stop ();
219+ dataSourcePublisher .stop ();
220+ cppOpencmw .destroy ();
221+ }
222+
223+ public static class TestData {
224+ boolean booleanValue ;
225+ byte int8Value ;
226+ short int16Value ;
227+ int int32Value ;
228+ long int64Value ;
229+ float floatValue ;
230+ double doubleValue ;
231+ char charValue ;
232+ String stringValue ;
233+ boolean [] boolArray ;
234+ byte [] int8Array ;
235+ short [] int16Array ;
236+ int [] int32Array ;
237+ long [] int64Array ;
238+ float [] floatArray ;
239+ double [] doubleArray ;
240+ char [] charArray ;
241+ String [] stringArray ;
242+ // TestData nestedData;
243+
244+ @ Override
245+ public boolean equals (Object obj ) {
246+ if (!(obj instanceof TestData td2 ))
247+ return false ;
248+ return booleanValue == td2 .booleanValue && int8Value == td2 .int8Value && int16Value == td2 .int16Value && int32Value == td2 .int32Value && int64Value == td2 .int64Value && floatValue == td2 .floatValue && doubleValue == td2 .doubleValue && charValue == td2 .charValue && stringValue .equals (td2 .stringValue ) && Arrays .equals (boolArray , td2 .boolArray ) && Arrays .equals (int8Array , td2 .int8Array ) && Arrays .equals (int16Array , td2 .int16Array ) && Arrays .equals (int32Array , td2 .int32Array ) && Arrays .equals (int64Array , td2 .int64Array ) && Arrays .equals (floatArray , td2 .floatArray ) && Arrays .equals (doubleArray , td2 .doubleArray ) && Arrays .equals (charArray , td2 .charArray ) && Arrays .equals (stringArray , td2 .stringArray );
249+ }
250+ }
251+
252+ public static class TestContext implements Filter {
253+ public MimeType contentType = MimeType .BINARY ;
254+
255+ @ Override
256+ public void clear () {
257+ contentType = MimeType .BINARY ;
258+ }
259+
260+ @ Override
261+ public void copyTo (Filter other ) {
262+ if (!(other instanceof TestContext otherContext )) {
263+ throw new RuntimeException ("Trying to copy TestContext into a different filter type" );
264+ }
265+ otherContext .contentType = contentType ;
266+ }
267+
268+ @ Override
269+ public String getKey () {
270+ return "" ;
271+ }
272+
273+ @ Override
274+ public String getValue () {
275+ return "" ;
276+ }
277+
278+ @ Override
279+ public Filter get (String ctxString ) {
280+ return null ;
281+ }
282+
283+ @ Override
284+ public boolean matches (Filter other ) {
285+ return false ;
286+ }
287+ }
288+ }
0 commit comments