88 "errors"
99 "os"
1010 "os/signal"
11+ "regexp"
12+ "strings"
1113 "syscall"
1214 "time"
1315
@@ -19,26 +21,101 @@ import (
1921 "go.uber.org/zap"
2022)
2123
24+ const regexPrefix = "regex:"
25+
2226// Consumer represents a Kafka consumer for processing API traffic
2327type Consumer struct {
2428 reader * kafka.Reader
2529 validatorService * validator.Service
2630 logger * zap.Logger
2731 config * config.Config
2832 batchSize int
33+ // Pre-parsed filter values for performance
34+ filterHosts []string
35+ filterHostRegex * regexp.Regexp
36+ filterPaths []string
37+ filterPathRegex * regexp.Regexp
2938}
3039
3140// NewConsumer creates a new Kafka consumer
3241func NewConsumer (cfg * config.Config , validatorService * validator.Service , logger * zap.Logger ) (* Consumer , error ) {
3342 reader := createKafkaReader (cfg , logger )
3443
35- return & Consumer {
44+ consumer := & Consumer {
3645 reader : reader ,
3746 validatorService : validatorService ,
3847 logger : logger ,
3948 config : cfg ,
4049 batchSize : cfg .KafkaBatchSize ,
41- }, nil
50+ }
51+
52+ // Parse filter configurations
53+ consumer .parseFilterConfig (logger )
54+
55+ return consumer , nil
56+ }
57+
58+ // parseFilterConfig parses the filter configuration from config
59+ func (c * Consumer ) parseFilterConfig (logger * zap.Logger ) {
60+ // Parse host filter
61+ if c .config .FilterHost != "" {
62+ if strings .HasPrefix (c .config .FilterHost , regexPrefix ) {
63+ pattern := strings .TrimPrefix (c .config .FilterHost , regexPrefix )
64+ compiled , err := regexp .Compile (pattern )
65+ if err != nil {
66+ logger .Error ("Failed to compile host filter regex, filter disabled" ,
67+ zap .String ("pattern" , pattern ),
68+ zap .Error (err ))
69+ } else {
70+ c .filterHostRegex = compiled
71+ logger .Info ("Host filter configured with regex" ,
72+ zap .String ("pattern" , pattern ))
73+ }
74+ } else {
75+ // Comma-separated values
76+ hosts := strings .Split (c .config .FilterHost , "," )
77+ for _ , host := range hosts {
78+ trimmed := strings .TrimSpace (host )
79+ if trimmed != "" {
80+ c .filterHosts = append (c .filterHosts , trimmed )
81+ }
82+ }
83+ if len (c .filterHosts ) > 0 {
84+ logger .Info ("Host filter configured with values" ,
85+ zap .Strings ("hosts" , c .filterHosts ))
86+ }
87+ }
88+ }
89+
90+ // Parse path filter
91+ if c .config .FilterPath != "" {
92+ if strings .HasPrefix (c .config .FilterPath , regexPrefix ) {
93+ pattern := strings .TrimPrefix (c .config .FilterPath , regexPrefix )
94+ compiled , err := regexp .Compile (pattern )
95+ if err != nil {
96+ logger .Error ("Failed to compile path filter regex, filter disabled" ,
97+ zap .String ("pattern" , pattern ),
98+ zap .Error (err ))
99+ } else {
100+ c .filterPathRegex = compiled
101+ logger .Info ("Path filter configured with regex" ,
102+ zap .String ("pattern" , pattern ))
103+ }
104+ } else {
105+ // Comma-separated values
106+ paths := strings .Split (c .config .FilterPath , "," )
107+ for _ , path := range paths {
108+ trimmed := strings .TrimSpace (path )
109+ if trimmed != "" {
110+ c .filterPaths = append (c .filterPaths , trimmed )
111+ }
112+ }
113+ if len (c .filterPaths ) > 0 {
114+ logger .Info ("Path filter configured with values" ,
115+ zap .Strings ("paths" , c .filterPaths ))
116+ }
117+ }
118+ }
42119}
43120
44121// createKafkaReader creates and configures a Kafka reader
@@ -126,6 +203,15 @@ func (c *Consumer) Start(ctx context.Context) error {
126203 zap .Int ("batchSize" , c .batchSize ),
127204 zap .Int ("batchLingerSec" , c .config .KafkaBatchLingerSec ))
128205
206+ // Log filter configuration
207+ if c .filterHostRegex != nil || len (c .filterHosts ) > 0 {
208+ c .logger .Info ("Traffic filter active: filtering by host" )
209+ } else if c .filterPathRegex != nil || len (c .filterPaths ) > 0 {
210+ c .logger .Info ("Traffic filter active: filtering by path" )
211+ } else {
212+ c .logger .Info ("No traffic filters configured, all traffic will be processed" )
213+ }
214+
129215 // Setup signal handling for graceful shutdown
130216 sigChan := make (chan os.Signal , 1 )
131217 signal .Notify (sigChan , syscall .SIGINT , syscall .SIGTERM )
@@ -189,6 +275,11 @@ func (c *Consumer) Start(ctx context.Context) error {
189275 continue
190276 }
191277
278+ // Apply traffic filter
279+ if ! c .filterTraffic (data ) {
280+ continue
281+ }
282+
192283 batch = append (batch , * data )
193284
194285 // Process batch if full
@@ -210,6 +301,122 @@ func (c *Consumer) parseMessage(value []byte) (*models.IngestDataBatch, error) {
210301 return & data , nil
211302}
212303
304+ // filterTraffic checks if the traffic should pass through based on host or path filters.
305+ // Returns true if traffic should be processed, false if it should be filtered out.
306+ // If no filters are configured, all traffic passes through.
307+ // Host and path filters are mutually exclusive - only one is applied at a time.
308+ // Supports comma-separated values (uses contains matching) and regex patterns (prefixed with "regex:").
309+ func (c * Consumer ) filterTraffic (data * models.IngestDataBatch ) bool {
310+ // If no filters configured, allow all traffic
311+ hasHostFilter := c .filterHostRegex != nil || len (c .filterHosts ) > 0
312+ hasPathFilter := c .filterPathRegex != nil || len (c .filterPaths ) > 0
313+
314+ if ! hasHostFilter && ! hasPathFilter {
315+ return true
316+ }
317+
318+ // Host filter takes precedence (mutually exclusive)
319+ if hasHostFilter {
320+ host := c .extractHost (data )
321+ if host == "" {
322+ c .logger .Debug ("No host found in traffic, filtering out" ,
323+ zap .String ("path" , data .Path ))
324+ return false
325+ }
326+ matches := c .matchHost (host )
327+ if ! matches {
328+ c .logger .Debug ("Host does not match filter, filtering out" ,
329+ zap .String ("host" , host ))
330+ }
331+ return matches
332+ }
333+
334+ // Path filter
335+ if hasPathFilter {
336+ matches := c .matchPath (data .Path )
337+ if ! matches {
338+ c .logger .Debug ("Path does not match filter, filtering out" ,
339+ zap .String ("path" , data .Path ))
340+ }
341+ return matches
342+ }
343+
344+ return true
345+ }
346+
347+ // matchHost checks if the host matches the configured filter (regex or comma-separated values)
348+ // Uses case-insensitive contains check for comma-separated values
349+ func (c * Consumer ) matchHost (host string ) bool {
350+ if c .filterHostRegex != nil {
351+ return c .filterHostRegex .MatchString (host )
352+ }
353+ hostLower := strings .ToLower (host )
354+ for _ , filterHost := range c .filterHosts {
355+ if strings .Contains (hostLower , strings .ToLower (filterHost )) {
356+ return true
357+ }
358+ }
359+ return false
360+ }
361+
362+ // matchPath checks if the path matches the configured filter (regex or comma-separated values)
363+ // Uses contains check for comma-separated values
364+ func (c * Consumer ) matchPath (path string ) bool {
365+ if c .filterPathRegex != nil {
366+ return c .filterPathRegex .MatchString (path )
367+ }
368+ for _ , filterPath := range c .filterPaths {
369+ if strings .Contains (path , filterPath ) {
370+ return true
371+ }
372+ }
373+ return false
374+ }
375+
376+ // extractHost extracts the host from request headers or path
377+ func (c * Consumer ) extractHost (data * models.IngestDataBatch ) string {
378+ // Try to get host from request headers
379+ if data .RequestHeaders != "" {
380+ var headers map [string ]interface {}
381+ if err := json .Unmarshal ([]byte (data .RequestHeaders ), & headers ); err == nil {
382+ // Check for Host header (case-insensitive)
383+ for key , value := range headers {
384+ if strings .EqualFold (key , "host" ) {
385+ switch v := value .(type ) {
386+ case string :
387+ return v
388+ case []interface {}:
389+ if len (v ) > 0 {
390+ if s , ok := v [0 ].(string ); ok {
391+ return s
392+ }
393+ }
394+ }
395+ }
396+ }
397+ }
398+ }
399+
400+ // Try to extract host from path if it's a full URL
401+ if strings .HasPrefix (data .Path , "http://" ) || strings .HasPrefix (data .Path , "https://" ) {
402+ // Parse URL to extract host
403+ path := data .Path
404+ // Remove protocol
405+ if strings .HasPrefix (path , "https://" ) {
406+ path = strings .TrimPrefix (path , "https://" )
407+ } else {
408+ path = strings .TrimPrefix (path , "http://" )
409+ }
410+ // Extract host (before first slash)
411+ if idx := strings .Index (path , "/" ); idx > 0 {
412+ return path [:idx ]
413+ }
414+ return path
415+ }
416+
417+ return ""
418+ }
419+
213420// processBatch processes a batch of messages through the validator
214421func (c * Consumer ) processBatch (ctx context.Context , batch []models.IngestDataBatch ) {
215422 if len (batch ) == 0 {
0 commit comments