Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gzip compress gRPC responses #3387

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions misk/src/main/kotlin/misk/grpc/GrpcFeatureBinding.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ internal class GrpcFeatureBinding(
private val streamingRequest: Boolean,
private val streamingResponse: Boolean,
private val isSuspend: Boolean,
private val grpcMessageSourceChannelContext: CoroutineContext
private val grpcMessageSourceChannelContext: CoroutineContext,
private val grpcEncoding: String,
private val minMessageToCompress: Long,
) : FeatureBinding {

override fun beforeCall(subject: Subject) {
Expand All @@ -35,6 +37,7 @@ internal class GrpcFeatureBinding(
requestBody, requestAdapter,
subject.httpCall.requestHeaders["grpc-encoding"]
)
// TODO: support the "grpc-accept-encoding" header

if (streamingRequest) {
val param: Any = if (isSuspend) {
Expand All @@ -57,7 +60,12 @@ internal class GrpcFeatureBinding(

if (streamingResponse) {
val responseBody = subject.takeResponseBody()
val messageSink = GrpcMessageSink(responseBody, responseAdapter, grpcEncoding = "identity")
val messageSink = GrpcMessageSink(
sink = responseBody,
minMessageToCompress = minMessageToCompress,
messageAdapter = responseAdapter,
grpcEncoding = grpcEncoding,
)
val param: Any = if (isSuspend) {
GrpcMessageSinkChannel(
channel = Channel(
Expand All @@ -82,7 +90,12 @@ internal class GrpcFeatureBinding(
setResponseHeaders(subject)

val responseBody = subject.takeResponseBody()
val messageSink = GrpcMessageSink(responseBody, responseAdapter, grpcEncoding = "identity")
val messageSink = GrpcMessageSink(
sink = responseBody,
minMessageToCompress = minMessageToCompress,
messageAdapter = responseAdapter,
grpcEncoding = grpcEncoding,
)

// It's a single response, write the return value out.
val returnValue = subject.takeReturnValue()!!
Expand All @@ -93,8 +106,7 @@ internal class GrpcFeatureBinding(
private fun setResponseHeaders(subject: Subject) {
subject.httpCall.requireTrailers()

// TODO(jwilson): permit non-identity GRPC encoding.
subject.httpCall.setResponseHeader("grpc-encoding", "identity")
subject.httpCall.setResponseHeader("grpc-encoding", grpcEncoding)
subject.httpCall.setResponseHeader("grpc-accept-encoding", "gzip")
subject.httpCall.setResponseHeader("Content-Type", MediaTypes.APPLICATION_GRPC)

Expand All @@ -118,6 +130,10 @@ internal class GrpcFeatureBinding(
name = "GrpcMessageSourceChannel.bridgeFromSource"
)

private val grpcEncoding = if (webConfig.grpcGzip) "gzip" else "identity"

private val minMessageToCompress = webConfig.minGzipSize.toLong()

override fun create(
action: Action,
pathPattern: PathPattern,
Expand Down Expand Up @@ -160,6 +176,8 @@ internal class GrpcFeatureBinding(
streamingResponse = streamingResponse,
isSuspend = isSuspend,
grpcMessageSourceChannelContext = grpcMessageSourceChannelDispatcher,
grpcEncoding = grpcEncoding,
minMessageToCompress = minMessageToCompress,
)
} else {
@Suppress("UNCHECKED_CAST") // Assume it's a proto type.
Expand All @@ -170,6 +188,8 @@ internal class GrpcFeatureBinding(
streamingResponse = streamingResponse,
isSuspend = isSuspend,
grpcMessageSourceChannelContext = grpcMessageSourceChannelDispatcher,
grpcEncoding = grpcEncoding,
minMessageToCompress = minMessageToCompress,
)
}
}
Expand Down
25 changes: 18 additions & 7 deletions misk/src/main/kotlin/misk/grpc/GrpcMessageSink.kt
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply copies the changes from square/wire@17a3d9e

Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import okio.BufferedSink
* https://github.com/square/wire/search?q=GrpcMessageSink&type=Code
*
* @param sink the HTTP/2 stream body.
* @param minMessageToCompress the minimum message size for compression
* when [grpcEncoding] is not "identity".
* @param messageAdapter a proto adapter for each message.
* @param grpcEncoding the content coding for the stream body.
*/
internal class GrpcMessageSink<T : Any> constructor(
internal class GrpcMessageSink<T : Any>(
private val sink: BufferedSink,
private val minMessageToCompress: Long,
private val messageAdapter: ProtoAdapter<T>,
private val grpcEncoding: String
) : MessageSink<T> {
Expand All @@ -25,15 +28,23 @@ internal class GrpcMessageSink<T : Any> constructor(
check(!closed) { "closed" }

val encodedMessage = Buffer()
grpcEncoding.toGrpcEncoder().encode(encodedMessage).use { encodingSink ->
messageAdapter.encode(encodingSink, message)
messageAdapter.encode(encodedMessage, message)

if (grpcEncoding == "identity" || encodedMessage.size < minMessageToCompress) {
sink.writeByte(0) // 0 = Not encoded.
sink.writeInt(encodedMessage.size.toInt())
sink.writeAll(encodedMessage)
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we explicitly check that gzip type is set instead of this being the else clause? Maybe else should throw unsupported operation exception or something like that.

Copy link
Contributor Author

@zpingcai zpingcai Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's needed. The grpcEncoding.toGrpcEncoder() call will throw if the encoding is not supported:

internal fun String.toGrpcEncoder(): GrpcEncoder {

val compressedMessage = Buffer()
grpcEncoding.toGrpcEncoder().encode(compressedMessage).use { sink ->
sink.writeAll(encodedMessage)
}
sink.writeByte(1) // 1 = Compressed.
sink.writeInt(compressedMessage.size.toInt())
sink.writeAll(compressedMessage)
}

val compressedFlag = if (grpcEncoding == "identity") 0 else 1
sink.writeByte(compressedFlag)
// TODO: fail if the message size is more than MAX_INT
sink.writeInt(encodedMessage.size.toInt())
sink.writeAll(encodedMessage)
sink.flush()
}

Expand Down
7 changes: 6 additions & 1 deletion misk/src/main/kotlin/misk/web/WebConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,15 @@ data class WebConfig @JvmOverloads constructor(
val close_connection_percent: Double = 0.0,

/**
* If true responses which are larger than the minGzipSize will be compressed.
* If true non-gRPC responses which are larger than the minGzipSize will be compressed.
*/
val gzip: Boolean = true,

/**
* If true gRPC responses which are larger than the minGzipSize will be compressed.
*/
val grpcGzip: Boolean = false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this instead be an enum to allow for the other wire-supported encoding?

Suggested change
val grpcGzip: Boolean = false,
val grpcEncoding: GrpcEncoding = GrpcEncoding.IDENTITY,

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'd lean towards the encoding enum, not just a boolean

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's actually worth it:

  1. The client only supports identity and gzip: https://github.com/square/wire/blob/master/wire-grpc-client/src/commonMain/kotlin/com/squareup/wire/internal/GrpcDecoder.kt
  2. The http response compression only supports gzip:
    val gzip: Boolean = true,
  3. We also need to introduce WebConfig.minGrpcMessageToCompress instead of reusing WebConfig.minGzipSize.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current implementation makes more sense to the user:

  1. Why do you configure grpcEncoding to anything that's not supported?
  2. Why do you want different configs for min compression size for http vs grpc over http?

cc @Hexcles

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'd say @zpingcai 's current impl is better in terms of consistency.

grpcGzip makes a lot of sense along with gzip. If we want grpcEncoding, I'd expect to have an enum for plain old HTTP, too. And in fact, not just an enum but a config object as different compression algos have different knobs (minGzipSize obviously doesn't make sense for non-gzip compression algos).


/** The minimum size in bytes before the response body will be compressed. */
val minGzipSize: Int = 1024,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,14 @@ class ExceptionHandlingInterceptor(
httpCall.setResponseTrailer("grpc-status-details-bin", response.toEncodedStatusProto)
httpCall.setResponseTrailer("grpc-message", response.message ?: response.status.name)
httpCall.takeResponseBody()?.use { responseBody: BufferedSink ->
GrpcMessageSink(responseBody, ProtoAdapter.BYTES, grpcEncoding = "identity")
.use { messageSink ->
messageSink.write(ByteString.EMPTY)
}
GrpcMessageSink(
sink = responseBody,
minMessageToCompress = 0,
messageAdapter = ProtoAdapter.BYTES,
grpcEncoding = "identity"
).use { messageSink ->
messageSink.write(ByteString.EMPTY)
}
}
}

Expand All @@ -124,7 +128,11 @@ class ExceptionHandlingInterceptor(
val mapper = mapperResolver.mapperFor(th)
if (mapper != null) {
if (!suppressLog) {
log.log(mapper.loggingLevel(th), th, *mdcTags.toTypedArray()) { "exception dispatching to $actionName" }
log.log(
level = mapper.loggingLevel(th),
th = th,
tags = *mdcTags.toTypedArray(),
) { "exception dispatching to $actionName" }
Comment on lines +131 to +135
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDE auto formatted this because the line is too long.

}
return mapper.toResponse(th)
}
Expand All @@ -139,7 +147,11 @@ class ExceptionHandlingInterceptor(
is InvocationTargetException -> toGrpcResponse(th.targetException, mdcTags)
is UncheckedExecutionException -> toGrpcResponse(th.cause!!, mdcTags)
else -> mapperResolver.mapperFor(th)?.let {
log.log(it.loggingLevel(th), th, *mdcTags.toTypedArray()) { "exception dispatching to $actionName" }
log.log(
level = it.loggingLevel(th),
th = th,
tags = *mdcTags.toTypedArray(),
) { "exception dispatching to $actionName" }
val grpcResponse = it.toGrpcResponse(th)
if (grpcResponse == null) {
val httpResponse = toResponse(th, suppressLog = true, mdcTags)
Expand Down
29 changes: 22 additions & 7 deletions misk/src/test/kotlin/misk/grpc/GrpcConnectivityTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ class GrpcConnectivityTest {
}

override fun writeTo(sink: BufferedSink) {
val writer = GrpcMessageSink(sink, HelloRequest.ADAPTER, "gzip")
val writer = GrpcMessageSink(
sink = sink,
minMessageToCompress = 0,
messageAdapter = HelloRequest.ADAPTER,
grpcEncoding = "gzip"
)
writer.write(HelloRequest("jesse!"))
}
})
Expand All @@ -82,7 +87,7 @@ class GrpcConnectivityTest {
response.use {
assertThat(response.code).isEqualTo(200)
assertThat(response.headers["grpc-status"]).isNull() // Sent in the trailers!
assertThat(response.headers["grpc-encoding"]).isEqualTo("identity")
assertThat(response.headers["grpc-encoding"]).isEqualTo("gzip")
assertThat(response.body!!.contentType()).isEqualTo("application/grpc".toMediaType())

val reader = GrpcMessageSource(
Expand Down Expand Up @@ -110,7 +115,12 @@ class GrpcConnectivityTest {
}

override fun writeTo(sink: BufferedSink) {
val writer = GrpcMessageSink(sink, HelloRequest.ADAPTER, "gzip")
val writer = GrpcMessageSink(
sink = sink,
minMessageToCompress = 0,
messageAdapter = HelloRequest.ADAPTER,
grpcEncoding = "gzip"
)
writer.write(HelloRequest("jesse!"))
}
})
Expand Down Expand Up @@ -142,7 +152,12 @@ class GrpcConnectivityTest {
}

override fun writeTo(sink: BufferedSink) {
val writer = GrpcMessageSink(sink, HelloRequest.ADAPTER, "gzip")
val writer = GrpcMessageSink(
sink = sink,
minMessageToCompress = 0,
messageAdapter = HelloRequest.ADAPTER,
grpcEncoding = "gzip"
)
writer.write(HelloRequest("jp!"))
}
})
Expand All @@ -169,8 +184,8 @@ class GrpcConnectivityTest {
interface GreeterSayHello : Service {
@WireRpc(
path = "/helloworld.Greeter/SayHello",
requestAdapter = "com.squareup.protos.test.grpc.HelloRequest.ADAPTER",
responseAdapter = "com.squareup.protos.test.grpc.HelloReply.ADAPTER"
requestAdapter = "com.squareup.protos.test.grpc.HelloRequest#ADAPTER",
responseAdapter = "com.squareup.protos.test.grpc.HelloReply#ADAPTER"
Comment on lines +187 to +188
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by: fix the test setup

)
fun sayHello(request: HelloRequest): HelloReply
}
Expand All @@ -179,7 +194,7 @@ class GrpcConnectivityTest {
override fun configure() {
install(
WebServerTestingModule(
webConfig = WebServerTestingModule.TESTING_WEB_CONFIG
webConfig = WebServerTestingModule.TESTING_WEB_CONFIG.copy(grpcGzip = true)
)
)
install(MiskTestingServiceModule())
Expand Down
7 changes: 6 additions & 1 deletion misk/src/test/kotlin/misk/grpc/GrpcMessageSinkChannelTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ import kotlin.time.Duration.Companion.milliseconds

class GrpcMessageSinkChannelTest {
private val buffer = Buffer()
private val writer = GrpcMessageSink(buffer, HelloRequest.ADAPTER, "identity")
private val writer = GrpcMessageSink(
sink = buffer,
minMessageToCompress = 0,
messageAdapter = HelloRequest.ADAPTER,
grpcEncoding = "identity"
)

@AfterEach
fun tearDown() {
Expand Down
94 changes: 92 additions & 2 deletions misk/src/test/kotlin/misk/grpc/GrpcSourceSinkTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ class GrpcSourceSinkTest {
@Test
fun grpcMessageSinkHelloRequest() {
val buffer = Buffer()
val writer = GrpcMessageSink(buffer, HelloRequest.ADAPTER, "identity")
val writer = GrpcMessageSink(
sink = buffer,
minMessageToCompress = 0,
messageAdapter = HelloRequest.ADAPTER,
grpcEncoding = "identity"
)
writer.write(HelloRequest("localhost"))
writer.close()

Expand All @@ -39,10 +44,95 @@ class GrpcSourceSinkTest {
@Test
fun grpcMessageSinkHelloReply() {
val buffer = Buffer()
val writer = GrpcMessageSink(buffer, HelloReply.ADAPTER, "identity")
val writer = GrpcMessageSink(
sink = buffer,
minMessageToCompress = 0,
messageAdapter = HelloReply.ADAPTER,
grpcEncoding = "identity"
)
writer.write(HelloReply("Hello localhost"))
writer.close()

assertEquals("00000000110a0f48656c6c6f206c6f63616c686f7374".decodeHex(), buffer.readByteString())
}

@Test
fun grpcMessageSourceCompressedHelloRequest() {
val buffer = Buffer()
buffer.write(
"010000001f1f8b0800000000000000e3e2ccc94f4eccc9c82f2e01002fdef60d0b000000".decodeHex()
)
val reader = GrpcMessageSource(buffer, HelloRequest.ADAPTER, "gzip")

assertEquals(HelloRequest("localhost"), reader.read())
}

@Test
fun messageLargerThanMinimumSizeIsCompressed() {
val message = HelloRequest("localhost")
val encodedMessage = HelloRequest.ADAPTER.encode(message)
assertEquals(encodedMessage.size, 11)

val buffer = Buffer()
val writer = GrpcMessageSink(
sink = buffer,
minMessageToCompress = 10,
messageAdapter = HelloRequest.ADAPTER,
grpcEncoding = "gzip"
)

writer.write(message)
writer.close()

assertEquals(
"010000001f1f8b0800000000000000e3e2ccc94f4eccc9c82f2e01002fdef60d0b000000".decodeHex(),
buffer.readByteString()
)
}

@Test
fun messageEqualToMinimumSizeIsCompressed() {
val message = HelloRequest("localhost")
val encodedMessage = HelloRequest.ADAPTER.encode(message)
assertEquals(encodedMessage.size, 11)

val buffer = Buffer()
val writer = GrpcMessageSink(
sink = buffer,
minMessageToCompress = 11,
messageAdapter = HelloRequest.ADAPTER,
grpcEncoding = "gzip"
)

writer.write(message)
writer.close()

assertEquals(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an assertion on the min size and the actual size? Not just a snapshot assertion on expected output (still keep that but add the extra assertion).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual size is asserted in helloRequestSize, but I can move it here if that makes the test more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. I simply repeated the same assertion multiple times:

val message = HelloRequest("localhost")
val encodedMessage = HelloRequest.ADAPTER.encode(message)
assertEquals(encodedMessage.size, 11)

Can definitely create a helper function if needed.

"010000001f1f8b0800000000000000e3e2ccc94f4eccc9c82f2e01002fdef60d0b000000".decodeHex(),
buffer.readByteString()
)
}

@Test
fun messageSmallerThanMinimumSizeIsNotCompressed() {
val message = HelloRequest("localhost")
val encodedMessage = HelloRequest.ADAPTER.encode(message)
assertEquals(encodedMessage.size, 11)

val buffer = Buffer()
val writer = GrpcMessageSink(
sink = buffer,
minMessageToCompress = 12,
messageAdapter = HelloRequest.ADAPTER,
grpcEncoding = "gzip"
)

writer.write(message)
writer.close()

assertEquals(
"000000000b0a096c6f63616c686f7374".decodeHex(),
buffer.readByteString()
)
}
}
Loading