Skip to content

Commit 0503fbd

Browse files
Zen YuiAntonio Yuen
andauthored
Fix persisted headers, quiet noisy logs (#124)
Co-authored-by: Antonio Yuen <antonio.yuen@namely.com>
1 parent a49a631 commit 0503fbd

File tree

6 files changed

+89
-24
lines changed

6 files changed

+89
-24
lines changed

code/service/src/main/resources/logback.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
<logger name="org.apache.kafka" level="ERROR"/>
3939
<!-- Turn down Slick noise-->
4040
<logger name="scala.slick" level="ERROR"/>
41+
<!-- Turn down kamon -->
42+
<logger name="kamon.jaeger" level="WARN"/>
4143

4244
<root level="${LOG_LEVEL}">
4345
<appender-ref ref="ASYNCJSON"/>

code/service/src/main/scala/com/namely/chiefofstate/GrpcServiceImpl.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,9 @@ class GrpcServiceImpl(sys: ActorSystem,
4949
)
5050
} else {
5151

52-
val meta: Try[Map[String, Any]] = pluginManager.run(in, MetadataUtil.makeMeta(metadata))
53-
54-
meta match {
55-
case Success(m) =>
56-
// get the headers to forward
52+
Future
53+
.fromTry(pluginManager.run(in, MetadataUtil.makeMeta(metadata)))
54+
.flatMap(meta => {
5755
val propagatedHeaders: Seq[RemoteCommand.Header] = metadata.asList
5856
// filter to relevant headers
5957
.filter({ case (k, _) => sendCommandSettings.propagatedHeaders.contains(k) })
@@ -75,15 +73,14 @@ class GrpcServiceImpl(sys: ActorSystem,
7573
.withCommand(in.getCommand)
7674
.withHeaders(propagatedHeaders)
7775

78-
sendCommand(in.entityId, remoteCommand, m)
76+
sendCommand(in.entityId, remoteCommand, meta)
7977
.map((stateWrapper: StateWrapper) => {
8078
ProcessCommandResponse(
8179
state = stateWrapper.state,
8280
meta = stateWrapper.meta.map(Util.toCosMetaData)
8381
)
8482
})
85-
case Failure(e) => Future.fromTry(Failure(e))
86-
}
83+
})
8784
}
8885
}
8986

code/service/src/main/scala/com/namely/chiefofstate/plugin/PersistHeaders.scala

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,59 @@ import com.namely.protobuf.chiefofstate.plugins.persistedheaders.v1.headers.{Hea
55
import com.namely.protobuf.chiefofstate.v1.service.ProcessCommandRequest
66
import io.grpc.Metadata
77
import scala.jdk.CollectionConverters._
8+
import scala.collection.mutable
9+
import org.slf4j.{Logger, LoggerFactory}
810

911
private[this] class PersistHeaders(persistedHeaders: Seq[String]) extends PluginBase {
1012

13+
import PersistHeaders.logger
14+
1115
override val pluginId: String = "persisted_headers.v1"
1216

1317
override def run(processCommandRequest: ProcessCommandRequest, metadata: Metadata): Option[com.google.protobuf.any.Any] = {
14-
val headers: Seq[Header] = persistedHeaders.flatMap(header => {
15-
if(header.endsWith("-bin")) {
16-
val bytesKey: Metadata.Key[Array[Byte]] = Metadata.Key.of(header, Metadata.BINARY_BYTE_MARSHALLER)
17-
metadata.getAll[Array[Byte]](bytesKey).asScala
18-
.map(b => Header().withKey(header).withBytesValue(ByteString.copyFrom(b)))
18+
19+
val capturedHeaders: mutable.ListBuffer[Header] = mutable.ListBuffer.empty[Header]
20+
21+
persistedHeaders.foreach(key => {
22+
if(key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
23+
val bytesKey: Metadata.Key[Array[Byte]] = Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER)
24+
val byteValues = metadata.getAll[Array[Byte]](bytesKey)
25+
26+
if(byteValues != null) {
27+
byteValues.forEach(byteArray => {
28+
val byteString = ByteString.copyFrom(byteArray)
29+
logger.debug(s"persisting header=${key}, type=bytes")
30+
capturedHeaders.addOne(Header(key=key).withBytesValue(byteString))
31+
})
32+
}
1933
} else {
20-
val stringKey: Metadata.Key[String] = Metadata.Key.of(header, Metadata.ASCII_STRING_MARSHALLER)
21-
metadata.getAll[String](stringKey).asScala
22-
.map(s => Header().withKey(header).withStringValue(s))
34+
val stringKey: Metadata.Key[String] = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)
35+
val stringValues = metadata.getAll[String](stringKey)
36+
37+
if(stringValues != null) {
38+
stringValues.forEach(stringValue => {
39+
logger.debug(s"persisting header=$key, type=string, value=$stringValue")
40+
capturedHeaders.addOne(Header(key=key).withStringValue(stringValue))
41+
})
42+
}
2343
}
2444
})
2545

26-
Some(com.google.protobuf.any.Any.pack(Headers().withHeaders(headers)))
46+
if(capturedHeaders.nonEmpty) {
47+
Some(com.google.protobuf.any.Any.pack(Headers().withHeaders(capturedHeaders.toSeq)))
48+
} else {
49+
None
50+
}
2751
}
2852
}
2953

3054
object PersistHeaders extends PluginFactory {
3155

3256
val envName: String = "COS_WRITE_PERSISTED_HEADERS"
3357

34-
lazy val persistedHeaders: Seq[String] = sys.env.get(envName)
58+
val logger: Logger = LoggerFactory.getLogger(getClass)
59+
60+
def persistedHeaders: Seq[String] = sys.env.get(envName)
3561
.map(_.split(",").map(_.trim).toSeq)
3662
.getOrElse(Seq.empty[String])
3763

code/service/src/main/scala/com/namely/chiefofstate/plugin/PluginManager.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import com.google.protobuf.any.Any
55
import com.namely.protobuf.chiefofstate.v1.service.ProcessCommandRequest
66
import com.typesafe.config.Config
77
import io.grpc.{Metadata, Status}
8-
8+
import org.slf4j.{Logger, LoggerFactory}
99
import scala.reflect.runtime.universe
1010
import scala.util.{Failure, Success, Try}
1111

@@ -15,6 +15,8 @@ import scala.util.{Failure, Success, Try}
1515
* @param plugins Sequence of PluginBase
1616
*/
1717
case class PluginManager(plugins: Seq[PluginBase]) {
18+
import PluginManager.logger
19+
1820
/**
1921
* Given a PluginManager instance, a ProcessCommandRequest instance and io.grpc.Metadata instance, folds left
2022
* throughout all the plugins and runs them one at a time. The results are mapped into a Map of pluginId -> protoAny.
@@ -34,7 +36,15 @@ case class PluginManager(plugins: Seq[PluginBase]) {
3436
}
3537
pluginRun match {
3638
case Success(m) => Try(metaMap.get ++ m)
37-
case Failure(e) => Failure(new GrpcServiceException(status = Status.ABORTED.withDescription(e.getMessage)))
39+
case Failure(e: GrpcServiceException) =>
40+
logger.error(s"plugin '${plugin.pluginId}' failed with ${e.getClass.getName}: ${e.getStatus.toString}")
41+
Failure(e)
42+
case Failure(e: Throwable) =>
43+
val errMsg = s"plugin ${plugin.pluginId} failed due to ${e.getClass.getName}: ${e.getMessage}"
44+
logger.error(errMsg)
45+
val status = Status.INTERNAL.withDescription(e.getMessage)
46+
val err = new GrpcServiceException(status=status)
47+
Failure(err)
3848
}
3949
})
4050
}
@@ -47,6 +57,8 @@ object PluginManager {
4757

4858
final val HOCON_PATH: String = "chief-of-state.plugin-settings.enable-plugins"
4959

60+
lazy val logger: Logger = LoggerFactory.getLogger(getClass)
61+
5062
/**
5163
* Default COS Plugins
5264
*/

code/service/src/test/scala/com/namely/chiefofstate/plugin/PersistedHeadersSpec.scala renamed to code/service/src/test/scala/com/namely/chiefofstate/plugin/PersistHeadersSpec.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ import com.namely.protobuf.chiefofstate.plugins.persistedheaders.v1.headers.Head
88
import com.namely.protobuf.chiefofstate.plugins.persistedheaders.v1.headers.{Header, Headers}
99
import com.namely.protobuf.chiefofstate.v1.service.ProcessCommandRequest
1010

11-
class PersistedHeadersSpec extends TestSpec with MockFactory {
11+
class PersistHeadersSpec extends TestSpec with MockFactory {
12+
13+
override def beforeEach(): Unit = {
14+
super.beforeEach()
15+
EnvironmentHelper.clearEnv()
16+
}
1217

1318
"Persistedheaders" should {
1419
val fooKeyName: String = "foo"
@@ -40,7 +45,7 @@ class PersistedHeadersSpec extends TestSpec with MockFactory {
4045
metadata.put(bazKey, baz)
4146

4247
"return the a string and byte header" in {
43-
EnvironmentHelper.setEnv(PersistHeaders.envName, "foo,bar-bin")
48+
EnvironmentHelper.setEnv(PersistHeaders.envName, "foo,bar-bin,not-a-key")
4449

4550
val actual: Headers = PersistHeaders
4651
.apply()
@@ -52,5 +57,15 @@ class PersistedHeadersSpec extends TestSpec with MockFactory {
5257

5358
actual should be (expected)
5459
}
60+
61+
"return an empty header" in {
62+
EnvironmentHelper.setEnv(PersistHeaders.envName, "not-a-key,not-a-key-bin")
63+
64+
val actual: Option[com.google.protobuf.any.Any] = PersistHeaders
65+
.apply()
66+
.run(processCommandRequest, metadata)
67+
68+
actual should be (None)
69+
}
5570
}
5671
}

code/service/src/test/scala/com/namely/chiefofstate/plugin/PluginManagerSpec.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import org.scalatest.Assertion
1111
import org.scalatest.matchers.should.Matchers
1212

1313
import scala.util.Try
14+
import io.grpc.Status
1415

1516
private[this] class MockPlugin1() extends PluginBase {
1617
override val pluginId: String = "MockPluginBase"
@@ -124,10 +125,22 @@ class PluginManagerSpec extends TestSpec {
124125

125126
"return a failure" in {
126127
val mockPluginBase: PluginBase = mock[PluginBase]
127-
(mockPluginBase.run _).expects(processCommandRequest, metadata).throws(new RuntimeException("test"))
128+
129+
(mockPluginBase.run _)
130+
.expects(processCommandRequest, metadata)
131+
.throws(new RuntimeException("test"))
132+
133+
(() => mockPluginBase.pluginId)
134+
.expects()
135+
.returning("some-plugin-id")
128136

129137
val pluginManager: PluginManager = new PluginManager(Seq(mockPluginBase))
130-
intercept[GrpcServiceException](pluginManager.run(processCommandRequest, metadata).get)
138+
139+
val actual = pluginManager.run(processCommandRequest, metadata)
140+
val actualError = intercept[GrpcServiceException](actual.get)
141+
142+
actualError.getStatus.getCode shouldBe(Status.Code.INTERNAL)
143+
actualError.getStatus.getDescription() shouldBe("test")
131144
}
132145
}
133146
}

0 commit comments

Comments
 (0)