|
1 | 1 | package org.strykeforce.thirdcoast.telemetry |
2 | 2 |
|
3 | | -import com.squareup.moshi.JsonWriter |
4 | 3 | import mu.KotlinLogging |
5 | 4 | import okio.Buffer |
6 | | -import org.nanohttpd.protocols.http.NanoHTTPD |
7 | | -import org.nanohttpd.protocols.http.request.Method |
8 | | -import org.nanohttpd.protocols.http.response.Response |
9 | | -import org.nanohttpd.protocols.http.response.Status |
| 5 | +import org.eclipse.jetty.server.Request |
| 6 | +import org.eclipse.jetty.server.Server |
| 7 | +import org.eclipse.jetty.server.handler.AbstractHandler |
| 8 | +import org.eclipse.jetty.server.handler.DefaultHandler |
| 9 | +import org.eclipse.jetty.server.handler.HandlerList |
10 | 10 | import org.strykeforce.thirdcoast.telemetry.grapher.ClientHandler |
11 | 11 | import org.strykeforce.thirdcoast.telemetry.grapher.Subscription |
12 | | -import java.io.IOException |
13 | 12 | import java.net.Inet4Address |
14 | 13 | import java.net.NetworkInterface |
15 | | -import java.util.* |
| 14 | +import javax.servlet.http.HttpServletRequest |
| 15 | +import javax.servlet.http.HttpServletResponse |
| 16 | + |
16 | 17 |
|
17 | 18 | private const val JSON = "application/json" |
18 | | -private val logger = KotlinLogging.logger {} |
| 19 | +private const val GRAPHER = "/v1/grapher" |
| 20 | +private const val INVENTORY = "$GRAPHER/inventory" |
| 21 | +private const val SUBSCRIPTION = "$GRAPHER/subscription" |
19 | 22 |
|
20 | | -/** Provides a web service to config telemetry. */ |
21 | | -class TelemetryController( |
22 | | - inventory: Inventory, |
23 | | - private val clientHandler: ClientHandler, |
24 | | - private val port: Int |
25 | | -) : NanoHTTPD(port) { |
| 23 | +private val logger = KotlinLogging.logger {} |
26 | 24 |
|
27 | | - private val inventoryEndpoints: List<String> |
28 | | - get() { |
29 | | - val endpoints = ArrayList<String>(2) |
30 | | - val nets = NetworkInterface.getNetworkInterfaces() |
31 | | - for (netint in Collections.list(nets)) { |
32 | | - val inetAddresses = netint.inetAddresses |
33 | | - for (addr in Collections.list(inetAddresses)) { |
34 | | - if (!addr.isLinkLocalAddress && addr.javaClass == Inet4Address::class.java) |
35 | | - endpoints += "http://${addr.hostAddress}:$port/v1/grapher/inventory" |
36 | | - } |
37 | | - } |
38 | | - return endpoints |
| 25 | +internal class TelemetryControllerHandler(private val inventory: Inventory, private val clientHandler: ClientHandler) : |
| 26 | + AbstractHandler() { |
| 27 | + |
| 28 | + override fun handle( |
| 29 | + target: String, |
| 30 | + baseRequest: Request, |
| 31 | + request: HttpServletRequest, |
| 32 | + response: HttpServletResponse |
| 33 | + ) { |
| 34 | + logger.debug { "${request.method} $target" } |
| 35 | + |
| 36 | + baseRequest.isHandled = true |
| 37 | + |
| 38 | + if (target.toLowerCase() == INVENTORY && request.method == "GET") { |
| 39 | + val buffer = Buffer() |
| 40 | + inventory.writeInventory(buffer) |
| 41 | + response.writeJson(buffer) |
| 42 | + logger.info { "inventory requested from ${request.remoteAddr}" } |
| 43 | + return |
39 | 44 | } |
40 | 45 |
|
41 | | - init { |
42 | | - addHTTPInterceptor { session -> |
43 | | - if (session.method == Method.GET && session.uri.equals( |
44 | | - "/v1/grapher/inventory", |
45 | | - ignoreCase = true |
46 | | - ) |
47 | | - ) { |
| 46 | + if (target.toLowerCase() == SUBSCRIPTION) { |
| 47 | + if (request.method == "POST") { |
| 48 | + val sub = Subscription(inventory, request.remoteAddr, request.reader.readText()) |
| 49 | + clientHandler.start(sub) |
48 | 50 | val buffer = Buffer() |
49 | | - inventory.writeInventory(buffer) |
| 51 | + sub.toJson(buffer) |
| 52 | + response.writeJson(buffer) |
| 53 | + logger.info { "subscription started from ${request.remoteAddr}" } |
| 54 | + return |
| 55 | + } |
50 | 56 |
|
51 | | - logger.debug { "inventory requested from ${session.remoteIpAddress}" } |
52 | | - return@addHTTPInterceptor Response.newFixedLengthResponse(Status.OK, JSON, buffer.readByteArray()) |
| 57 | + if (request.method == "DELETE") { |
| 58 | + clientHandler.shutdown() |
| 59 | + logger.info { "subscription stopped from ${request.remoteAddr}" } |
| 60 | + return |
53 | 61 | } |
54 | | - null |
55 | 62 | } |
| 63 | + baseRequest.isHandled = false |
| 64 | + } |
| 65 | +} |
56 | 66 |
|
57 | | - addHTTPInterceptor { session -> |
58 | | - if (session.method == Method.POST && session.uri.equals( |
59 | | - "/v1/grapher/subscription", |
60 | | - ignoreCase = true |
61 | | - ) |
62 | | - ) { |
63 | | - val body = HashMap<String, String>() |
64 | | - try { |
65 | | - session.parseBody(body) |
66 | | - val sub = Subscription(inventory, session.remoteIpAddress, body["postData"]!!) |
67 | | - clientHandler.start(sub) |
68 | | - val buffer = Buffer() |
69 | | - sub.toJson(buffer) |
70 | | - return@addHTTPInterceptor Response.newFixedLengthResponse(Status.OK, JSON, buffer.readByteArray()) |
71 | | - } catch (t: Throwable) { |
72 | | - logger.error("couldn't start grapher", t) |
73 | | - return@addHTTPInterceptor errorResponseFor(t) |
74 | | - } |
| 67 | +private fun HttpServletResponse.writeJson(buffer: Buffer) { |
| 68 | + this.contentType = JSON |
| 69 | + this.status = HttpServletResponse.SC_OK |
| 70 | + this.writer.print(buffer.readUtf8()) |
| 71 | +} |
75 | 72 |
|
76 | | - } |
77 | | - null |
78 | | - } |
| 73 | +/** Provides a web service to config telemetry. */ |
| 74 | +class TelemetryController( |
| 75 | + inventory: Inventory, |
| 76 | + private val clientHandler: ClientHandler, |
| 77 | + private val port: Int |
| 78 | +) { |
79 | 79 |
|
80 | | - addHTTPInterceptor { session -> |
81 | | - if (session.method == Method.DELETE && session.uri.equals( |
82 | | - "/v1/grapher/subscription", |
83 | | - ignoreCase = true |
84 | | - ) |
85 | | - ) { |
86 | | - try { |
87 | | - clientHandler.shutdown() |
88 | | - return@addHTTPInterceptor Response.newFixedLengthResponse(Status.NO_CONTENT, JSON, "") |
89 | | - } catch (t: Throwable) { |
90 | | - logger.error("couldn't stop grapher", t) |
91 | | - return@addHTTPInterceptor errorResponseFor(t) |
92 | | - } |
93 | 80 |
|
94 | | - } |
95 | | - null |
| 81 | + private val server = Server(port).apply { |
| 82 | + handler = HandlerList().apply { |
| 83 | + handlers = arrayOf(TelemetryControllerHandler(inventory, clientHandler), DefaultHandler()) |
96 | 84 | } |
97 | 85 | } |
98 | 86 |
|
99 | | - /** Start web service to listen for HTTP commands that control telemetry service. */ |
100 | | - override fun start() { |
101 | | - try { |
102 | | - start(NanoHTTPD.SOCKET_READ_TIMEOUT, true) |
103 | | - } catch (e: IOException) { |
104 | | - logger.error("couldn't start web service", e) |
105 | | - } |
106 | | - |
107 | | - if (logger.isInfoEnabled) { |
108 | | - logger.info("started web service") |
109 | | - for (end in inventoryEndpoints) { |
110 | | - logger.info(end) |
| 87 | + private val inventoryEndpoints: List<String> |
| 88 | + get() { |
| 89 | + val endpoints = mutableListOf<String>() |
| 90 | + NetworkInterface.getNetworkInterfaces().iterator().forEach { ni -> |
| 91 | + ni.inetAddresses.iterator().forEach { addr -> |
| 92 | + if (addr is Inet4Address && !addr.isLinkLocalAddress) |
| 93 | + endpoints += "http://${addr.hostAddress}:$port$INVENTORY" |
| 94 | + } |
111 | 95 | } |
| 96 | + return endpoints |
112 | 97 | } |
| 98 | + |
| 99 | + /** Start web service to listen for HTTP commands that control telemetry service. */ |
| 100 | + fun start() { |
| 101 | + server.start() |
| 102 | + logger.info("started web service") |
| 103 | + inventoryEndpoints.forEach(logger::info) |
113 | 104 | } |
114 | 105 |
|
115 | 106 | /** Stop streaming to client and shut down web service. */ |
116 | 107 | fun shutdown() { |
117 | 108 | clientHandler.shutdown() |
118 | | - super.stop() |
| 109 | + server.stop() |
119 | 110 | logger.info("stopped web service") |
120 | 111 | } |
121 | | - |
122 | | - private fun errorResponseFor(e: Throwable): Response { |
123 | | - val buffer = Buffer() |
124 | | - val writer = JsonWriter.of(buffer) |
125 | | - try { |
126 | | - writer.beginObject() |
127 | | - writer.name("error").value(e.message) |
128 | | - writer.endObject() |
129 | | - } catch (ignored: IOException) { |
130 | | - } |
131 | | - |
132 | | - return Response.newFixedLengthResponse(Status.INTERNAL_ERROR, JSON, buffer.readByteArray()) |
133 | | - } |
134 | | - |
135 | 112 | } |
0 commit comments