Skip to content

Commit

Permalink
Support POST requests from SseEventSourceImpl
Browse files Browse the repository at this point in the history
It is perfectly valid to return an event stream from a POST method, but currently there is no way to connect to it with the JAX-RS client. This commit adds support for connecting to event streams using POST methods by supplying an entity to be used as the POST body.
  • Loading branch information
timothyjward committed Jun 8, 2021
1 parent a237fa5 commit 2ecf496
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public interface ClientProperties {
Boolean.parseBoolean(SystemPropertyAction.getPropertyOrNull(THREAD_SAFE_CLIENT_PROP));
Integer THREAD_SAFE_CLIENT_STATE_CLEANUP_PERIOD =
getIntValue(SystemPropertyAction.getPropertyOrNull(THREAD_SAFE_CLIENT_STATE_CLEANUP_PROP));
String SSE_REQUEST_ENTITY = "sse.request.entity";

static Integer getIntValue(Object o) {
return o instanceof Integer ? (Integer)o : o instanceof String ? Integer.valueOf(o.toString()) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.function.Consumer;
import java.util.logging.Logger;

import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Configuration;
Expand All @@ -41,6 +42,7 @@

import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.jaxrs.client.ClientProperties;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.jaxrs.impl.RetryAfterHeaderProvider;
import org.apache.cxf.jaxrs.utils.ExceptionUtils;
Expand Down Expand Up @@ -195,7 +197,19 @@ private void connect(String lastEventId) {
if (lastEventId != null) {
builder.header(HttpHeaders.LAST_EVENT_ID_HEADER, lastEventId);
}
response = builder.get();

Object o = target.getConfiguration().getProperty(ClientProperties.SSE_REQUEST_ENTITY);

if (o == null) {
response = builder.get();
} else if (o instanceof Entity) {
LOG.fine("Using POST for SSE endpoint " + target.getUri() + " with entity " + o);
response = builder.post((Entity<?>) o);
} else {
throw new IllegalArgumentException("The " + ClientProperties.SSE_REQUEST_ENTITY
+ " property is not an entity " + o.getClass());
}


// A client can be told to stop reconnecting using the HTTP 204 No Content
// response code. In this case, we should give up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@
*/
package org.apache.cxf.systest.jaxrs.sse;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -38,13 +33,13 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.SseEventSource;
import javax.ws.rs.sse.SseEventSource.Builder;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

import org.apache.cxf.jaxrs.client.ClientProperties;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -54,9 +49,7 @@
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;


public abstract class AbstractSseTest extends AbstractSseBaseTest {
Expand Down Expand Up @@ -115,52 +108,17 @@ public void testBooksStreamIsReturnedFromInboundSseEvents() throws InterruptedEx
);
}

@SuppressWarnings("unchecked")
@Test
public void testBooksStreamIsReturnedFromInboundSseEventsWithPOST() throws InterruptedException, IOException {
final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0");
final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0")
.property(ClientProperties.SSE_REQUEST_ENTITY, Entity.text("42"));
final Collection<Book> books = new ArrayList<>();

@SuppressWarnings("rawtypes")
MessageBodyReader mbr = new JacksonJsonProvider();

Response response = target.request(MediaType.SERVER_SENT_EVENTS)
.post(Entity.entity(42, MediaType.TEXT_PLAIN));

try (BufferedReader br = new BufferedReader(new InputStreamReader(response.readEntity(InputStream.class)))) {
String s;
Integer id = null;
Book book = null;

while ((s = br.readLine()) != null) {
if (s.trim().isEmpty()) {
if (id == null && book == null) {
continue;
} else if (id != null && book != null) {
books.add(book);
id = null;
book = null;
continue;
}
fail("The event did not contain both an id " + id + " and a book " + book);
}
if (s.startsWith("event:")) {
assertEquals("Not a book event", "event: book", s.trim());
continue;
}
if (s.startsWith("id:")) {
assertNull("There was an existing id " + id, id);
id = Integer.parseInt(s.substring(3).trim());
continue;
}
if (s.startsWith("data:")) {
assertNull("There was an existing book " + book, book);
book = (Book) mbr.readFrom(Book.class, Book.class, null, MediaType.APPLICATION_JSON_TYPE, null,
new ByteArrayInputStream(s.substring(5).trim().getBytes(StandardCharsets.UTF_8)));
continue;
}
fail("Unexpected String content returned by SSE POST " + s);
}
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(collect(books), System.out::println);
eventSource.open();
// Give the SSE stream some time to collect all events
awaitEvents(5000, books, 4);
}

// Easing the test verification here, it does not work well for Atm + Jetty
Expand Down

0 comments on commit 2ecf496

Please sign in to comment.