|
| 1 | +--- |
| 2 | +nav_title: Customize Kafka Messages |
| 3 | +title: Customize Kafka Messages with the Kafka Upstream Plugin |
| 4 | +minimum_version: 3.10.x |
| 5 | +--- |
| 6 | + |
| 7 | + |
| 8 | +Starting in {{site.base_gateway}} 3.10, the Kafka Upstream plugin supports customizing the Kafka message format. |
| 9 | +Using the configuration field `config.message_by_lua_functions`, you can define a function chain with multiple functions that can manipulate both the Kafka message format and the content of the message. |
| 10 | + |
| 11 | +## Prerequisites |
| 12 | +* You have created a Kafka topic |
| 13 | +* The [`kcat` utility](https://github.com/edenhill/kcat) is installed |
| 14 | + |
| 15 | +## Using the plugin |
| 16 | + |
| 17 | +The configuration field `message_by_lua_functions` is an array of strings, where each string is a customized Lua function. The default message generated by the Kafka Upstream plugin gets reduced through the following functions: |
| 18 | + |
| 19 | +``` |
| 20 | +Message(default) -> f1(Message) -> f2(Message) -> ... -> Producer(New_Message) |
| 21 | +``` |
| 22 | + |
| 23 | +The Lua function must be in the following format, where the function receives one message argument, and returns the new message as a single return value: |
| 24 | + |
| 25 | +```lua |
| 26 | +return function(message) |
| 27 | + ... -- Some manipulation code |
| 28 | + return new_message |
| 29 | +end |
| 30 | +``` |
| 31 | + |
| 32 | +### Example: A function that does nothing |
| 33 | + |
| 34 | +You can just return the message argument as the return value, so that the function makes no modifications to the default message: |
| 35 | + |
| 36 | +```lua |
| 37 | +return function(message) |
| 38 | + return message |
| 39 | +end |
| 40 | +``` |
| 41 | + |
| 42 | +Let's configure this function with an example declarative config: |
| 43 | + |
| 44 | +```yaml |
| 45 | +_format_version: "3.0" |
| 46 | +services: |
| 47 | + - name: kafka-service |
| 48 | + url: http://mock-upstream |
| 49 | + routes: |
| 50 | + - name: kafka-route |
| 51 | + paths: |
| 52 | + - /kafka |
| 53 | + plugins: |
| 54 | + - name: kafka-upstream |
| 55 | + config: |
| 56 | + bootstrap_servers: |
| 57 | + - host: localhost |
| 58 | + port: 9092 |
| 59 | + topic: "my-topic" |
| 60 | + forward_method: true |
| 61 | + forward_uri: true |
| 62 | + forward_headers: true |
| 63 | + forward_body: true |
| 64 | + message_by_lua_functions: |
| 65 | + - 'return function(message) return message end' |
| 66 | +``` |
| 67 | +
|
| 68 | +Make sure the Kafka topic is created, then send a request to the route path and check the message: |
| 69 | +
|
| 70 | +```sh |
| 71 | +curl http://localhost:8000/kafka |
| 72 | +``` |
| 73 | + |
| 74 | +Response: |
| 75 | +``` |
| 76 | +{"message":"message sent"} |
| 77 | +``` |
| 78 | + |
| 79 | +You can use the `kcat` tool to check the message: |
| 80 | +```sh |
| 81 | +kcat -b localhost:9092 -G mygroup1 -o beginning -t my-topic -p 0 -C |
| 82 | +``` |
| 83 | + |
| 84 | +You should see that the default message was sent to the topic: |
| 85 | +```json |
| 86 | +{"body_base64":true,"headers":{"accept":"*\/*","host":"localhost:8000","user-agent":"curl\/8.7.1"},"uri_args":{},"uri":"\/kafka","body_args":{},"body":"","method":"GET"} |
| 87 | +``` |
| 88 | + |
| 89 | +### Example: A function that returns a fixed table |
| 90 | + |
| 91 | +In the custom function, users can return whatever data they want as the return value. |
| 92 | +The returned value must be JSON-serializable, otherwise it can't be sent to Kafka. |
| 93 | + |
| 94 | +```lua |
| 95 | +return function(message) |
| 96 | + return {a="1", b="2"} |
| 97 | +end |
| 98 | +``` |
| 99 | + |
| 100 | +Configure this function with an example declarative config: |
| 101 | + |
| 102 | +```yaml |
| 103 | +_format_version: "3.0" |
| 104 | +services: |
| 105 | + - name: kafka-service |
| 106 | + url: http://mock-upstream |
| 107 | + routes: |
| 108 | + - name: kafka-route |
| 109 | + paths: |
| 110 | + - /kafka |
| 111 | + plugins: |
| 112 | + - name: kafka-upstream |
| 113 | + config: |
| 114 | + bootstrap_servers: |
| 115 | + - host: localhost |
| 116 | + port: 9092 |
| 117 | + topic: "my-topic" |
| 118 | + message_by_lua_functions: |
| 119 | + - 'return function(message) return {a="1", b="2"} end' |
| 120 | +``` |
| 121 | +
|
| 122 | +Make sure the Kafka topic is created, then send a request to the route path and check the message: |
| 123 | +
|
| 124 | +```sh |
| 125 | +curl http://localhost:8000/kafka |
| 126 | +``` |
| 127 | + |
| 128 | +Response: |
| 129 | +```json |
| 130 | +{"message":"message sent"} |
| 131 | +``` |
| 132 | + |
| 133 | +You can use the `kcat` tool to check the message: |
| 134 | +```sh |
| 135 | +kcat -b localhost:9092 -G mygroup1 -o beginning -t my-topic -p 0 -C |
| 136 | +``` |
| 137 | + |
| 138 | +You should see that the new message was sent to the topic: |
| 139 | + |
| 140 | +```json |
| 141 | +.....OLD MESSAGES..... |
| 142 | +% Reached end of topic my-topic [0] at offset 8 |
| 143 | +{"b":"2","a":"1"} |
| 144 | +% Reached end of topic my-topic [0] at offset 9 |
| 145 | +``` |
| 146 | + |
| 147 | +You can also return the encoded message string as a return value: |
| 148 | + |
| 149 | +```lua |
| 150 | +return function(message) |
| 151 | + return [[{"a":"1","b":"2"}]] |
| 152 | +end |
| 153 | +``` |
| 154 | + |
| 155 | +The result should be the same as the one returned by the `{a="1", b="2"}` table. |
| 156 | + |
| 157 | +{:.important} |
| 158 | +> **Note**: The Kafka Upstream plugin only supports JSON encoding on the message. Other serialization methods are not supported. |
| 159 | +
|
| 160 | +### Example: Functions that insert consumer info and redact a sensitive request header |
| 161 | + |
| 162 | +Let's use the following functions, which call the `get_consumer` PDK function and manipulate the request header information: |
| 163 | + |
| 164 | +```lua |
| 165 | +--- FUNCTION 1 that inject the consumer information to the message |
| 166 | +return function(message) |
| 167 | + message.consumer = kong.client.get_consumer() |
| 168 | + |
| 169 | + return message |
| 170 | +end |
| 171 | +``` |
| 172 | + |
| 173 | +```lua |
| 174 | +--- FUNCTION 2 that clean an apikey header |
| 175 | +return function(message) |
| 176 | + local headers = message and message.headers |
| 177 | + local apikey = headers and headers.apikey |
| 178 | + if apikey then |
| 179 | + headers.apikey = "[REDACTED]" |
| 180 | + end |
| 181 | + |
| 182 | + return message |
| 183 | +end |
| 184 | +``` |
| 185 | + |
| 186 | +Create an example declarative config for these two functions: |
| 187 | + |
| 188 | +```yaml |
| 189 | +_format_version: "3.0" |
| 190 | + |
| 191 | +services: |
| 192 | + - name: kafka-service |
| 193 | + url: http://mock-upstream |
| 194 | + routes: |
| 195 | + - name: kafka-route |
| 196 | + paths: |
| 197 | + - /kafka |
| 198 | + plugins: |
| 199 | + - name: key-auth |
| 200 | + - name: kafka-upstream |
| 201 | + config: |
| 202 | + bootstrap_servers: |
| 203 | + - host: localhost |
| 204 | + port: 9092 |
| 205 | + topic: "my-topic" |
| 206 | + forward_method: true |
| 207 | + forward_uri: true |
| 208 | + forward_headers: true |
| 209 | + forward_body: true |
| 210 | + message_by_lua_functions: |
| 211 | + - "return function(message) message.consumer = kong.client.get_consumer() return message end" |
| 212 | + - "return function(message) local headers = message and message.headers local apikey = headers and headers.apikey if apikey then headers.apikey = '[REDACTED]' end return message end" |
| 213 | + |
| 214 | +consumers: |
| 215 | + - username: alice |
| 216 | + keyauth_credentials: |
| 217 | + - key: "example-api-key" |
| 218 | +``` |
| 219 | +
|
| 220 | +Trigger the API call and check the message: |
| 221 | +
|
| 222 | +```sh |
| 223 | +curl http://localhost:8000/kafka -H 'apikey: example-api-key' |
| 224 | +``` |
| 225 | +
|
| 226 | +Response: |
| 227 | +``` |
| 228 | +{"message":"message sent"} |
| 229 | +``` |
| 230 | + |
| 231 | +You can use the `kcat` tool to check the message: |
| 232 | +``` |
| 233 | +kcat -b localhost:9092 -G mygroup1 -o beginning -t my-topic -p 0 -C |
| 234 | +``` |
| 235 | + |
| 236 | +You should see the `REDACTED` API key in the message: |
| 237 | + |
| 238 | +```json |
| 239 | +.....OLD MESSAGES..... |
| 240 | +% Reached end of topic my-topic [0] at offset 20 |
| 241 | +{"body_base64":true,"method":"GET","consumer":{"id":"25f66f8c-b5e1-4bbd-9b17-2cbe733d1895","created_at":1741768764,"username":"alice","username_lower":"alice","type":0,"updated_at":1741768764},"body_args":{},"body":"","headers":{"apikey":"[REDACTED]","accept":"*\/*","host" |
| 242 | +:"localhost:8000","x-consumer-id":"25f66f8c-b5e1-4bbd-9b17-2cbe733d1895","x-credential-identifier":"795fb073-f698-403d-9302-209ba3abdd55","user-agent":"curl\/8.7.1","x-consumer-username":"alice"},"uri_args":{},"uri":"\/kafka"} |
| 243 | +% Reached end of topic my-topic [0] at offset 21 |
| 244 | +``` |
0 commit comments