@@ -12,7 +12,9 @@ import (
1212 "io"
1313 "net/http"
1414 "net/url"
15+ "strconv"
1516 "strings"
17+ "sync/atomic"
1618 "text/template"
1719 "time"
1820
@@ -33,19 +35,40 @@ const (
3335 crowdsecLapiHeader = "X-Api-Key"
3436 crowdsecLapiRoute = "v1/decisions"
3537 crowdsecLapiStreamRoute = "v1/decisions/stream"
38+ crowdsecLapiMetricsRoute = "v1/usage-metrics"
3639 crowdsecCapiHost = "api.crowdsec.net"
3740 crowdsecCapiHeader = "Authorization"
3841 crowdsecCapiLoginRoute = "v2/watchers/login"
3942 crowdsecCapiStreamRoute = "v2/decisions/stream"
4043 cacheTimeoutKey = "updated"
4144)
4245
46+ // ##############################################################
47+ // Important: traefik creates an instance of the bouncer per route.
48+ // We rely on globals (both here and in the memory cache) to share info between
49+ // routes. This means that some of the plugins parameters will only work "once"
50+ // and will take the values of the first middleware that was instantiated even
51+ // if you have different middlewares with different parameters. This design
52+ // makes it impossible to have multiple crowdsec implementations per cluster (unless you have multiple traefik deployments in it)
53+ // - updateInterval
54+ // - updateMaxFailure
55+ // - defaultDecisionTimeout
56+ // - redisUnreachableBlock
57+ // - appsecEnabled
58+ // - appsecHost
59+ // - metricsUpdateIntervalSeconds
60+ // - others...
61+ // ###################################
62+
4363//nolint:gochecknoglobals
4464var (
4565 isStartup = true
4666 isCrowdsecStreamHealthy = true
47- updateFailure = 0
48- ticker chan bool
67+ updateFailure int64
68+ streamTicker chan bool
69+ metricsTicker chan bool
70+ lastMetricsPush time.Time
71+ blockedRequests int64
4972)
5073
5174// CreateConfig creates the default plugin configuration.
@@ -75,7 +98,7 @@ type Bouncer struct {
7598 crowdsecPassword string
7699 crowdsecScenarios []string
77100 updateInterval int64
78- updateMaxFailure int
101+ updateMaxFailure int64
79102 defaultDecisionTimeout int64
80103 remediationStatusCode int
81104 remediationCustomHeader string
@@ -93,6 +116,8 @@ type Bouncer struct {
93116}
94117
95118// New creates the crowdsec bouncer plugin.
119+ //
120+ //nolint:gocyclo
96121func New (_ context.Context , next http.Handler , config * configuration.Config , name string ) (http.Handler , error ) {
97122 config .LogLevel = strings .ToUpper (config .LogLevel )
98123 log := logger .New (config .LogLevel , config .LogFilePath )
@@ -225,7 +250,7 @@ func New(_ context.Context, next http.Handler, config *configuration.Config, nam
225250 return nil , err
226251 }
227252
228- if (config .CrowdsecMode == configuration .StreamMode || config .CrowdsecMode == configuration .AloneMode ) && ticker == nil {
253+ if (config .CrowdsecMode == configuration .StreamMode || config .CrowdsecMode == configuration .AloneMode ) && streamTicker == nil {
229254 if config .CrowdsecMode == configuration .AloneMode {
230255 if err := getToken (bouncer ); err != nil {
231256 bouncer .log .Error ("New:getToken " + err .Error ())
@@ -234,10 +259,20 @@ func New(_ context.Context, next http.Handler, config *configuration.Config, nam
234259 }
235260 handleStreamTicker (bouncer )
236261 isStartup = false
237- ticker = startTicker (config , log , func () {
262+ streamTicker = startTicker ("stream" , config . UpdateIntervalSeconds , log , func () {
238263 handleStreamTicker (bouncer )
239264 })
240265 }
266+
267+ // Start metrics ticker if not already running
268+ if metricsTicker == nil && config .MetricsUpdateIntervalSeconds > 0 {
269+ lastMetricsPush = time .Now () // Initialize lastMetricsPush when starting the metrics ticker
270+ handleMetricsTicker (bouncer )
271+ metricsTicker = startTicker ("metrics" , config .MetricsUpdateIntervalSeconds , log , func () {
272+ handleMetricsTicker (bouncer )
273+ })
274+ }
275+
241276 bouncer .log .Debug ("New initialized mode:" + config .CrowdsecMode )
242277
243278 return bouncer , nil
@@ -353,6 +388,8 @@ type Login struct {
353388
354389// To append Headers we need to call rw.WriteHeader after set any header.
355390func handleBanServeHTTP (bouncer * Bouncer , rw http.ResponseWriter ) {
391+ atomic .AddInt64 (& blockedRequests , 1 )
392+
356393 if bouncer .remediationCustomHeader != "" {
357394 rw .Header ().Set (bouncer .remediationCustomHeader , "ban" )
358395 }
@@ -375,6 +412,7 @@ func handleRemediationServeHTTP(bouncer *Bouncer, remoteIP, remediation string,
375412 handleNextServeHTTP (bouncer , remoteIP , rw , req )
376413 return
377414 }
415+ atomic .AddInt64 (& blockedRequests , 1 ) // If we serve a captcha that should count as a dropped request.
378416 bouncer .captchaClient .ServeHTTP (rw , req , remoteIP )
379417 return
380418 }
@@ -406,11 +444,17 @@ func handleStreamTicker(bouncer *Bouncer) {
406444 }
407445}
408446
409- func startTicker (config * configuration.Config , log * logger.Log , work func ()) chan bool {
410- ticker := time .NewTicker (time .Duration (config .UpdateIntervalSeconds ) * time .Second )
447+ func handleMetricsTicker (bouncer * Bouncer ) {
448+ if err := reportMetrics (bouncer ); err != nil {
449+ bouncer .log .Error ("handleMetricsTicker:reportMetrics " + err .Error ())
450+ }
451+ }
452+
453+ func startTicker (name string , updateInterval int64 , log * logger.Log , work func ()) chan bool {
454+ ticker := time .NewTicker (time .Duration (updateInterval ) * time .Second )
411455 stop := make (chan bool , 1 )
412456 go func () {
413- defer log .Debug ("ticker :stopped" )
457+ defer log .Debug (name + "_ticker :stopped" )
414458 for {
415459 select {
416460 case <- ticker .C :
@@ -432,7 +476,7 @@ func handleNoStreamCache(bouncer *Bouncer, remoteIP string) (string, error) {
432476 Path : bouncer .crowdsecPath + crowdsecLapiRoute ,
433477 RawQuery : fmt .Sprintf ("ip=%v" , remoteIP ),
434478 }
435- body , err := crowdsecQuery (bouncer , routeURL .String (), false )
479+ body , err := crowdsecQuery (bouncer , routeURL .String (), nil )
436480 if err != nil {
437481 return cache .BannedValue , err
438482 }
@@ -491,7 +535,16 @@ func getToken(bouncer *Bouncer) error {
491535 Host : bouncer .crowdsecHost ,
492536 Path : crowdsecCapiLoginRoute ,
493537 }
494- body , err := crowdsecQuery (bouncer , loginURL .String (), true )
538+
539+ // Move the login-specific payload here
540+ loginData := []byte (fmt .Sprintf (
541+ `{"machine_id": "%v","password": "%v","scenarios": ["%v"]}` ,
542+ bouncer .crowdsecMachineID ,
543+ bouncer .crowdsecPassword ,
544+ strings .Join (bouncer .crowdsecScenarios , `","` ),
545+ ))
546+
547+ body , err := crowdsecQuery (bouncer , loginURL .String (), loginData )
495548 if err != nil {
496549 return err
497550 }
@@ -528,7 +581,7 @@ func handleStreamCache(bouncer *Bouncer) error {
528581 Path : bouncer .crowdsecPath + bouncer .crowdsecStreamRoute ,
529582 RawQuery : fmt .Sprintf ("startup=%t" , ! isCrowdsecStreamHealthy || isStartup ),
530583 }
531- body , err := crowdsecQuery (bouncer , streamRouteURL .String (), false )
584+ body , err := crowdsecQuery (bouncer , streamRouteURL .String (), nil )
532585 if err != nil {
533586 return err
534587 }
@@ -559,15 +612,9 @@ func handleStreamCache(bouncer *Bouncer) error {
559612 return nil
560613}
561614
562- func crowdsecQuery (bouncer * Bouncer , stringURL string , isPost bool ) ([]byte , error ) {
615+ func crowdsecQuery (bouncer * Bouncer , stringURL string , data [] byte ) ([]byte , error ) {
563616 var req * http.Request
564- if isPost {
565- data := []byte (fmt .Sprintf (
566- `{"machine_id": "%v","password": "%v","scenarios": ["%v"]}` ,
567- bouncer .crowdsecMachineID ,
568- bouncer .crowdsecPassword ,
569- strings .Join (bouncer .crowdsecScenarios , `","` ),
570- ))
617+ if len (data ) > 0 {
571618 req , _ = http .NewRequest (http .MethodPost , stringURL , bytes .NewBuffer (data ))
572619 } else {
573620 req , _ = http .NewRequest (http .MethodGet , stringURL , nil )
@@ -588,13 +635,16 @@ func crowdsecQuery(bouncer *Bouncer, stringURL string, isPost bool) ([]byte, err
588635 if errToken := getToken (bouncer ); errToken != nil {
589636 return nil , fmt .Errorf ("crowdsecQuery:renewToken url:%s %w" , stringURL , errToken )
590637 }
591- return crowdsecQuery (bouncer , stringURL , false )
638+ return crowdsecQuery (bouncer , stringURL , nil )
592639 }
593- if res .StatusCode != http .StatusOK {
594- return nil , fmt .Errorf ("crowdsecQuery url:%s, statusCode:%d" , stringURL , res .StatusCode )
640+
641+ // Check if the status code starts with 2
642+ statusStr := strconv .Itoa (res .StatusCode )
643+ if len (statusStr ) < 1 || statusStr [0 ] != '2' {
644+ return nil , fmt .Errorf ("crowdsecQuery method:%s url:%s, statusCode:%d (expected: 2xx)" , req .Method , stringURL , res .StatusCode )
595645 }
596- body , err := io .ReadAll (res .Body )
597646
647+ body , err := io .ReadAll (res .Body )
598648 if err != nil {
599649 return nil , fmt .Errorf ("crowdsecQuery:readBody %w" , err )
600650 }
@@ -664,3 +714,65 @@ func appsecQuery(bouncer *Bouncer, ip string, httpReq *http.Request) error {
664714 }
665715 return nil
666716}
717+
718+ func reportMetrics (bouncer * Bouncer ) error {
719+ now := time .Now ()
720+ currentCount := atomic .LoadInt64 (& blockedRequests )
721+ windowSizeSeconds := int (now .Sub (lastMetricsPush ).Seconds ())
722+
723+ bouncer .log .Debug (fmt .Sprintf ("reportMetrics: blocked_requests=%d window_size=%ds" , currentCount , windowSizeSeconds ))
724+
725+ metrics := map [string ]interface {}{
726+ "remediation_components" : []map [string ]interface {}{
727+ {
728+ "version" : "1.X.X" ,
729+ "type" : "bouncer" ,
730+ "name" : "traefik_plugin" ,
731+ "metrics" : []map [string ]interface {}{
732+ {
733+ "items" : []map [string ]interface {}{
734+ {
735+ "name" : "dropped" ,
736+ "value" : currentCount ,
737+ "unit" : "request" ,
738+ "labels" : map [string ]string {
739+ "type" : "traefik_plugin" ,
740+ },
741+ },
742+ },
743+ "meta" : map [string ]interface {}{
744+ "window_size_seconds" : windowSizeSeconds ,
745+ "utc_now_timestamp" : now .Unix (),
746+ },
747+ },
748+ },
749+ "utc_startup_timestamp" : time .Now ().Unix (),
750+ "feature_flags" : []string {},
751+ "os" : map [string ]string {
752+ "name" : "unknown" ,
753+ "version" : "unknown" ,
754+ },
755+ },
756+ },
757+ }
758+
759+ data , err := json .Marshal (metrics )
760+ if err != nil {
761+ return fmt .Errorf ("reportMetrics:marshal %w" , err )
762+ }
763+
764+ metricsURL := url.URL {
765+ Scheme : bouncer .crowdsecScheme ,
766+ Host : bouncer .crowdsecHost ,
767+ Path : bouncer .crowdsecPath + crowdsecLapiMetricsRoute ,
768+ }
769+
770+ _ , err = crowdsecQuery (bouncer , metricsURL .String (), data )
771+ if err != nil {
772+ return fmt .Errorf ("reportMetrics:query %w" , err )
773+ }
774+
775+ atomic .StoreInt64 (& blockedRequests , 0 )
776+ lastMetricsPush = now
777+ return nil
778+ }
0 commit comments