Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CXF-8545 - support POST requests from SseEventSource #813

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public interface ClientProperties {
Boolean.parseBoolean(SystemPropertyAction.getPropertyOrNull(THREAD_SAFE_CLIENT_PROP));
Integer THREAD_SAFE_CLIENT_STATE_CLEANUP_PERIOD =
getIntValue(SystemPropertyAction.getPropertyOrNull(THREAD_SAFE_CLIENT_STATE_CLEANUP_PROP));
String SSE_REQUEST_ENTITY = "sse.request.entity";

static Integer getIntValue(Object o) {
return o instanceof Integer ? (Integer)o : o instanceof String ? Integer.valueOf(o.toString()) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.function.Consumer;
import java.util.logging.Logger;

import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Configuration;
Expand All @@ -41,6 +42,7 @@

import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.jaxrs.client.ClientProperties;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.jaxrs.impl.RetryAfterHeaderProvider;
import org.apache.cxf.jaxrs.utils.ExceptionUtils;
Expand Down Expand Up @@ -195,7 +197,19 @@ private void connect(String lastEventId) {
if (lastEventId != null) {
builder.header(HttpHeaders.LAST_EVENT_ID_HEADER, lastEventId);
}
response = builder.get();

Object o = target.getConfiguration().getProperty(ClientProperties.SSE_REQUEST_ENTITY);

if (o == null) {
response = builder.get();
} else if (o instanceof Entity) {
LOG.fine("Using POST for SSE endpoint " + target.getUri() + " with entity " + o);
response = builder.post((Entity<?>) o);
} else {
throw new IllegalArgumentException("The " + ClientProperties.SSE_REQUEST_ENTITY
+ " property is not an entity " + o.getClass());
}


// A client can be told to stop reconnecting using the HTTP 204 No Content
// response code. In this case, we should give up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,6 @@ public void handleMessage(Message message) {
servletResponse = (HttpServletResponse)response;
builder = Response.status(servletResponse.getStatus());

@SuppressWarnings("unchecked")
final Map<String, List<Object>> userHeaders = (Map<String, List<Object>>)message
.get(Message.PROTOCOL_HEADERS);

if (userHeaders != null) {
for (Map.Entry<String, List<Object>> entry: userHeaders.entrySet()) {
addHeader(builder, entry);
}
}

for (final String header: servletResponse.getHeaderNames()) {
final Collection<String> headers = servletResponse.getHeaders(header);
addHeader(builder, header, headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected WebClient createWebClient(final String url) {
protected WebTarget createWebTarget(final String url) {
return ClientBuilder
.newClient()
.property("http.receive.timeout", 8000)
.property("http.receive.timeout", 8000000)
.register(JacksonJsonProvider.class)
.target("http://localhost:" + getPort() + url);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.cxf.systest.jaxrs.sse;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -26,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
Expand All @@ -37,6 +39,8 @@

import com.fasterxml.jackson.core.JsonProcessingException;

import org.apache.cxf.jaxrs.client.ClientProperties;

import org.junit.Before;
import org.junit.Test;

Expand All @@ -47,6 +51,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;


public abstract class AbstractSseTest extends AbstractSseBaseTest {
@Before
public void setUp() {
Expand Down Expand Up @@ -103,6 +108,30 @@ public void testBooksStreamIsReturnedFromInboundSseEvents() throws InterruptedEx
);
}

@Test
public void testBooksStreamIsReturnedFromInboundSseEventsWithPOST() throws InterruptedException, IOException {
final WebTarget target = createWebTarget("/rest/api/bookstore/sse/0")
.property(ClientProperties.SSE_REQUEST_ENTITY, Entity.text("42"));
final Collection<Book> books = new ArrayList<>();

try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(collect(books), System.out::println);
eventSource.open();
// Give the SSE stream some time to collect all events
awaitEvents(5000, books, 4);
}

// Easing the test verification here, it does not work well for Atm + Jetty
assertThat(books,
hasItems(
new Book("New Book #43", 43),
new Book("New Book #44", 44),
new Book("New Book #45", 45),
new Book("New Book #46", 46)
)
);
}

@Test
public void testBookTitlesStreamIsReturnedFromInboundSseEvents() throws InterruptedException {
final WebTarget target = createWebTarget("/rest/api/bookstore/titles/sse");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
Expand Down Expand Up @@ -94,6 +95,34 @@ public void run() {
}.start();
}

@POST
@Path("sse/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
@Consumes(MediaType.TEXT_PLAIN)
public void forBookPOST(@Context SseEventSink sink, @PathParam("id") final String id,
final String lastEventId) {
new Thread() {
public void run() {
try {
final Integer id = Integer.valueOf(lastEventId);
final Builder builder = sse.newEventBuilder();

sink.send(createEvent(builder.name("book"), id + 1));
Thread.sleep(200);
sink.send(createEvent(builder.name("book"), id + 2));
Thread.sleep(200);
sink.send(createEvent(builder.name("book"), id + 3));
Thread.sleep(200);
sink.send(createEvent(builder.name("book"), id + 4));
Thread.sleep(200);
sink.close();
} catch (final InterruptedException ex) {
LOG.error("Communication error", ex);
}
}
}.start();
}

@GET
@Path("nodelay/sse/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
Expand Down Expand Up @@ -92,7 +93,35 @@ public void run() {
}
}.start();
}

@POST
@Path("sse/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
@Consumes(MediaType.TEXT_PLAIN)
public void forBookPOST(@Context SseEventSink sink, @PathParam("id") final String id,
final String lastEventId) {
new Thread() {
public void run() {
try {
final Integer id = Integer.valueOf(lastEventId);
final Builder builder = sse.newEventBuilder();

sink.send(createEvent(builder.name("book"), id + 1));
Thread.sleep(200);
sink.send(createEvent(builder.name("book"), id + 2));
Thread.sleep(200);
sink.send(createEvent(builder.name("book"), id + 3));
Thread.sleep(200);
sink.send(createEvent(builder.name("book"), id + 4));
Thread.sleep(200);
sink.close();
} catch (final InterruptedException ex) {
LOG.error("Communication error", ex);
}
}
}.start();
}

@GET
@Path("nodelay/sse/{id}")
@Produces(MediaType.SERVER_SENT_EVENTS)
Expand Down