diff --git a/.github/workflows/prod.yml b/.github/workflows/prod.yml index 33375250aa..f4af47ee14 100644 --- a/.github/workflows/prod.yml +++ b/.github/workflows/prod.yml @@ -47,6 +47,10 @@ on: type: boolean default: false description: Agent Traffic Analyzer + guardrails_service: + type: boolean + default: false + description: Guardrails Service # A workflow run is made up of one or more jobs that can run sequentially or in parallel jobs: @@ -275,6 +279,71 @@ jobs: SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK + build-guardrails-service: + runs-on: ubuntu-22.04 + if: ${{ github.event.inputs.guardrails_service == 'true' }} + + steps: + - uses: actions/checkout@v2 + + - name: DockerHub login + env: + DOCKER_USERNAME: ${{secrets.DOCKER_USERNAME}} + DOCKER_PASSWORD: ${{secrets.DOCKER_PASSWORD}} + run: | + docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD + + - name: Build, tag, and push the image to DockerHub + id: build-image-dockerhub + env: + ECR_REGISTRY: aktosecurity + IMAGE_VERSION: ${{ github.event.inputs.release_version }} + run: | + docker buildx create --use + cd apps/guardrails-service/container + docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/guardrails-service:local -t $ECR_REGISTRY/guardrails-service:$IMAGE_VERSION . --push + echo "::set-output name=image::$ECR_REGISTRY/guardrails-service:$IMAGE_VERSION" + + - name: Configure AWS Credentials for ECR + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{secrets.AWS_ACCESS_KEY_ID}} + aws-secret-access-key: ${{secrets.AWS_SECRET_ACCESS_KEY}} + aws-region: us-east-1 + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + with: + mask-password: "true" + registry-type: public + + - name: Push git tag + id: tag_version + uses: mathieudutour/github-tag-action@v6.1 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + custom_tag: guardrails-service-${{ github.event.inputs.release_version }} + + - name: Create a GitHub release + uses: ncipollo/release-action@v1.12.0 + with: + tag: ${{ steps.tag_version.outputs.new_tag }} + name: Release ${{ steps.tag_version.outputs.new_tag }} + omitBody: true + + - name: Send Github release notification to Slack + id: slack + uses: slackapi/slack-github-action@v1.23.0 + with: + payload: | + { + "text": "Guardrails Service v${{ github.event.inputs.release_version }} released!" + } + env: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} + SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK + build-threat: # The type of runner that the job will run on runs-on: ubuntu-22.04 diff --git a/.github/workflows/staging.yml b/.github/workflows/staging.yml index e7f5b04bba..08b9497a67 100644 --- a/.github/workflows/staging.yml +++ b/.github/workflows/staging.yml @@ -103,6 +103,8 @@ jobs: docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-threat-detection-backend:$IMAGE_TAG . --push cd ../account-job-executor docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-account-job-executor:$IMAGE_TAG . --push + cd ../guardrails-service/container + docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/guardrails-service:$IMAGE_TAG . --push - name: Set up JDK 11 uses: actions/setup-java@v1 diff --git a/.gitignore b/.gitignore index 1d0972817b..4a15ef657b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ **/yarn-error.log **/yarn.lock **/.vscode/ +**/.cursor/ **/.project **/.classpath **/.settings diff --git a/apps/dashboard/src/main/java/com/akto/action/GuardrailPoliciesAction.java b/apps/dashboard/src/main/java/com/akto/action/GuardrailPoliciesAction.java index 192848822a..02569a04a5 100644 --- a/apps/dashboard/src/main/java/com/akto/action/GuardrailPoliciesAction.java +++ b/apps/dashboard/src/main/java/com/akto/action/GuardrailPoliciesAction.java @@ -18,7 +18,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; public class GuardrailPoliciesAction extends UserAction { diff --git a/apps/guardrails-service/container/src/go.mod b/apps/guardrails-service/container/src/go.mod index 0203fee3a4..a7fd862793 100644 --- a/apps/guardrails-service/container/src/go.mod +++ b/apps/guardrails-service/container/src/go.mod @@ -3,51 +3,61 @@ module github.com/akto-api-security/guardrails-service go 1.24.2 require ( - github.com/akto-api-security/mcp-endpoint-shield v0.0.0-20251024110204-dc3bd07750e4 - github.com/gin-gonic/gin v1.9.1 - go.uber.org/zap v1.27.0 + github.com/akto-api-security/mcp-endpoint-shield v0.0.0-20251215100814-4e47fe5de707 + github.com/gin-gonic/gin v1.11.0 + github.com/segmentio/kafka-go v0.4.49 + go.uber.org/zap v1.27.1 ) require ( - github.com/bytedance/sonic v1.9.1 // indirect - github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect + github.com/andybalholm/brotli v1.2.0 // indirect + github.com/bytedance/gopkg v0.1.3 // indirect + github.com/bytedance/sonic v1.14.2 // indirect + github.com/bytedance/sonic/loader v0.4.0 // indirect + github.com/cloudwego/base64x v0.1.6 // indirect github.com/denisbrodbeck/machineid v1.0.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect - github.com/gabriel-vasile/mimetype v1.4.2 // indirect - github.com/gin-contrib/sse v0.1.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.12 // indirect + github.com/gin-contrib/sse v1.1.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/go-playground/validator/v10 v10.14.0 // indirect - github.com/goccy/go-json v0.10.2 // indirect + github.com/go-playground/validator/v10 v10.29.0 // indirect + github.com/goccy/go-json v0.10.5 // indirect + github.com/goccy/go-yaml v1.19.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/cpuid/v2 v2.2.4 // indirect - github.com/leodido/go-urn v1.2.4 // indirect + github.com/klauspost/compress v1.18.2 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/leodido/go-urn v1.4.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/ncruces/go-strftime v0.1.9 // indirect - github.com/pelletier/go-toml/v2 v2.0.8 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect + github.com/pelletier/go-toml/v2 v2.2.4 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/quic-go/qpack v0.6.0 // indirect + github.com/quic-go/quic-go v0.57.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect - github.com/ugorji/go/codec v1.2.11 // indirect - go.uber.org/multierr v1.10.0 // indirect - golang.org/x/arch v0.3.0 // indirect - golang.org/x/crypto v0.43.0 // indirect - golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect - golang.org/x/net v0.46.0 // indirect - golang.org/x/sync v0.17.0 // indirect - golang.org/x/sys v0.37.0 // indirect - golang.org/x/text v0.30.0 // indirect - google.golang.org/protobuf v1.30.0 // indirect + github.com/ugorji/go/codec v1.3.1 // indirect + go.uber.org/mock v0.6.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/arch v0.23.0 // indirect + golang.org/x/crypto v0.46.0 // indirect + golang.org/x/exp v0.0.0-20251209150349-8475f28825e9 // indirect + golang.org/x/net v0.48.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.39.0 // indirect + golang.org/x/text v0.32.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - modernc.org/libc v1.66.3 // indirect + modernc.org/libc v1.67.1 // indirect modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect - modernc.org/sqlite v1.39.0 // indirect + modernc.org/sqlite v1.40.1 // indirect ) // Replace directive to point to the actual location in akto-gateway repository (stdio-header-fix branch) -replace github.com/akto-api-security/mcp-endpoint-shield => github.com/akto-api-security/akto-gateway/mcp-endpoint-shield v0.0.0-20251104150935-84df8d2eb79f +replace github.com/akto-api-security/mcp-endpoint-shield => github.com/akto-api-security/akto-gateway/mcp-endpoint-shield v0.0.0-20251215100814-4e47fe5de707 diff --git a/apps/guardrails-service/container/src/go.sum b/apps/guardrails-service/container/src/go.sum index 1d3841ee82..674b9b4749 100644 --- a/apps/guardrails-service/container/src/go.sum +++ b/apps/guardrails-service/container/src/go.sum @@ -1,11 +1,15 @@ -github.com/akto-api-security/akto-gateway/mcp-endpoint-shield v0.0.0-20251104150935-84df8d2eb79f h1:dA7Czm4WZ8DrcXnGYfjpH3pUSRcJfIHwf9u3vI552Qg= -github.com/akto-api-security/akto-gateway/mcp-endpoint-shield v0.0.0-20251104150935-84df8d2eb79f/go.mod h1:QCJCivwm25wVrV5WNWeVJI2Rr2Gzg6oZWoZ0CitLTcc= -github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= -github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= -github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= -github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= -github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= -github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= +github.com/akto-api-security/akto-gateway/mcp-endpoint-shield v0.0.0-20251215100814-4e47fe5de707 h1:7x1t+up0WUPUPSOUcsY4dkPwE2KhVi+jOv048+Stqy4= +github.com/akto-api-security/akto-gateway/mcp-endpoint-shield v0.0.0-20251215100814-4e47fe5de707/go.mod h1:fJ+KYYCLM6nSZZOxT5rQKqwqienBa31w0jcwgPF3xUU= +github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= +github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= +github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= +github.com/bytedance/sonic v1.14.2 h1:k1twIoe97C1DtYUo+fZQy865IuHia4PR5RPiuGPPIIE= +github.com/bytedance/sonic v1.14.2/go.mod h1:T80iDELeHiHKSc0C9tubFygiuXoGzrkjKzX2quAx980= +github.com/bytedance/sonic/loader v0.4.0 h1:olZ7lEqcxtZygCK9EKYKADnpQoYkRQxaeY2NYzevs+o= +github.com/bytedance/sonic/loader v0.4.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= +github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= +github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -15,38 +19,45 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= -github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= -github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= -github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= -github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= -github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= -github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU= +github.com/gabriel-vasile/mimetype v1.4.12 h1:e9hWvmLYvtp846tLHam2o++qitpguFiYCKbn0w9jyqw= +github.com/gabriel-vasile/mimetype v1.4.12/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= +github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w= +github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM= +github.com/gin-gonic/gin v1.11.0 h1:OW/6PLjyusp2PPXtyxKHU0RbX6I/l28FTdDlae5ueWk= +github.com/gin-gonic/gin v1.11.0/go.mod h1:+iq/FyxlGzII0KHiBGjuNn4UNENUlKbGlNmc+W50Dls= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= -github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js= -github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= -github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= -github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/go-playground/validator/v10 v10.29.0 h1:lQlF5VNJWNlRbRZNeOIkWElR+1LL/OuHcc0Kp14w1xk= +github.com/go-playground/validator/v10 v10.29.0/go.mod h1:D6QxqeMlgIPuT02L66f2ccrZ7AGgHkzKmmTMZhk/Kc4= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/goccy/go-yaml v1.19.0 h1:EmkZ9RIsX+Uq4DYFowegAuJo8+xdX3T/2dwNPXbxEYE= +github.com/goccy/go-yaml v1.19.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= -github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= -github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= -github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -54,79 +65,100 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= -github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= -github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= -github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= +github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/quic-go/qpack v0.6.0 h1:g7W+BMYynC1LbYLSqRt8PBg5Tgwxn214ZZR34VIOjz8= +github.com/quic-go/qpack v0.6.0/go.mod h1:lUpLKChi8njB4ty2bFLX2x4gzDqXwUpaO1DP9qMDZII= +github.com/quic-go/quic-go v0.57.1 h1:25KAAR9QR8KZrCZRThWMKVAwGoiHIrNbT72ULHTuI10= +github.com/quic-go/quic-go v0.57.1/go.mod h1:ly4QBAjHA2VhdnxhojRsCUOeJwKYg+taDlos92xb1+s= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk= +github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= -github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= -github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= -github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +github.com/ugorji/go/codec v1.3.1 h1:waO7eEiFDwidsBN6agj1vJQ4AG7lh2yqXyOXqhgQuyY= +github.com/ugorji/go/codec v1.3.1/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2Wjqmfxj4= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= -go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= -go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= -golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k= -golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= -golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= -golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= -golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= -golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= -golang.org/x/mod v0.28.0 h1:gQBtGhjxykdjY9YhZpSlZIsbnaE2+PgjfLWUQTnoZ1U= -golang.org/x/mod v0.28.0/go.mod h1:yfB/L0NOf/kmEbXjzCPOx1iK1fRutOydrCMsqRhEBxI= -golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= -golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= -golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y= +go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/arch v0.23.0 h1:lKF64A2jF6Zd8L0knGltUnegD62JMFBiCPBmQpToHhg= +golang.org/x/arch v0.23.0/go.mod h1:dNHoOeKiyja7GTvF9NJS1l3Z2yntpQNzgrjh1cU103A= +golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU= +golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0= +golang.org/x/exp v0.0.0-20251209150349-8475f28825e9 h1:MDfG8Cvcqlt9XXrmEiD4epKn7VJHZO84hejP9Jmp0MM= +golang.org/x/exp v0.0.0-20251209150349-8475f28825e9/go.mod h1:EPRbTFwzwjXj9NpYyyrvenVh9Y+GFeEvMNh7Xuz7xgU= +golang.org/x/mod v0.31.0 h1:HaW9xtz0+kOcWKwli0ZXy79Ix+UW/vOfmWI5QVd2tgI= +golang.org/x/mod v0.31.0/go.mod h1:43JraMp9cGx1Rx3AqioxrbrhNsLl2l/iNAvuBkrezpg= +golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= +golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= -golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= -golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= -golang.org/x/tools v0.37.0 h1:DVSRzp7FwePZW356yEAChSdNcQo6Nsp+fex1SUW09lE= -golang.org/x/tools v0.37.0/go.mod h1:MBN5QPQtLMHVdvsbtarmTNukZDdgwdwlO5qGacAzF0w= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= +golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= +golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= +golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= +golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -modernc.org/cc/v4 v4.26.2 h1:991HMkLjJzYBIfha6ECZdjrIYz2/1ayr+FL8GN+CNzM= -modernc.org/cc/v4 v4.26.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= -modernc.org/ccgo/v4 v4.28.0 h1:rjznn6WWehKq7dG4JtLRKxb52Ecv8OUGah8+Z/SfpNU= -modernc.org/ccgo/v4 v4.28.0/go.mod h1:JygV3+9AV6SmPhDasu4JgquwU81XAKLd3OKTUDNOiKE= -modernc.org/fileutil v1.3.8 h1:qtzNm7ED75pd1C7WgAGcK4edm4fvhtBsEiI/0NQ54YM= -modernc.org/fileutil v1.3.8/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= +modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= +modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc= +modernc.org/ccgo/v4 v4.30.1/go.mod h1:bIOeI1JL54Utlxn+LwrFyjCx2n2RDiYEaJVSrgdrRfM= +modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA= +modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.1 h1:k8T3gkXWY9sEiytKhcgyiZ2L0DTyCQ/nvX+LoCljoRE= +modernc.org/gc/v3 v3.1.1/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= -modernc.org/libc v1.66.3 h1:cfCbjTUcdsKyyZZfEUKfoHcP3S0Wkvz3jgSzByEWVCQ= -modernc.org/libc v1.66.3/go.mod h1:XD9zO8kt59cANKvHPXpx7yS2ELPheAey0vjIuZOhOU8= +modernc.org/libc v1.67.1 h1:bFaqOaa5/zbWYJo8aW0tXPX21hXsngG2M7mckCnFSVk= +modernc.org/libc v1.67.1/go.mod h1:QvvnnJ5P7aitu0ReNpVIEyesuhmDLQ8kaEoyMjIFZJA= modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= @@ -135,10 +167,9 @@ modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= -modernc.org/sqlite v1.39.0 h1:6bwu9Ooim0yVYA7IZn9demiQk/Ejp0BtTjBWFLymSeY= -modernc.org/sqlite v1.39.0/go.mod h1:cPTJYSlgg3Sfg046yBShXENNtPrWrDX8bsbAQBzgQ5E= +modernc.org/sqlite v1.40.1 h1:VfuXcxcUWWKRBuP8+BR9L7VnmusMgBNNnBYGEe9w/iY= +modernc.org/sqlite v1.40.1/go.mod h1:9fjQZ0mB1LLP0GYrp39oOJXx/I2sxEnZtzCmEQIKvGE= modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= -rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/apps/guardrails-service/container/src/main.go b/apps/guardrails-service/container/src/main.go index 898aeb6a42..7e0bc7a15a 100644 --- a/apps/guardrails-service/container/src/main.go +++ b/apps/guardrails-service/container/src/main.go @@ -1,11 +1,13 @@ package main import ( + "context" "fmt" "os" "github.com/akto-api-security/guardrails-service/handlers" "github.com/akto-api-security/guardrails-service/pkg/config" + "github.com/akto-api-security/guardrails-service/pkg/kafka" "github.com/akto-api-security/guardrails-service/pkg/validator" "github.com/gin-gonic/gin" "go.uber.org/zap" @@ -23,12 +25,16 @@ func main() { logger.Info("Starting guardrails-service", zap.Int("port", cfg.ServerPort), zap.String("database_abstractor_url", cfg.DatabaseAbstractorURL), - zap.String("agent_guard_engine_url", cfg.AgentGuardEngineURL)) + zap.String("agent_guard_engine_url", cfg.AgentGuardEngineURL), + zap.Bool("kafka_enabled", cfg.KafkaEnabled)) - // Set Agent Guard Engine URL environment variable for akto-gateway library + // Set environment variables for akto-gateway library if cfg.AgentGuardEngineURL != "" { os.Setenv("AGENT_GUARD_ENGINE_URL", cfg.AgentGuardEngineURL) } + if cfg.DatabaseAbstractorToken != "" { + os.Setenv("AKTO_API_TOKEN", cfg.DatabaseAbstractorToken) + } // Initialize validator service validatorService, err := validator.NewService(cfg, logger) @@ -36,6 +42,16 @@ func main() { logger.Fatal("Failed to initialize validator service", zap.Error(err)) } + // Run in Kafka consumer mode or HTTP server mode based on configuration + if cfg.KafkaEnabled { + runKafkaConsumer(cfg, validatorService, logger) + } else { + runHTTPServer(cfg, validatorService, logger) + } +} + +// runHTTPServer starts the HTTP server mode +func runHTTPServer(cfg *config.Config, validatorService *validator.Service, logger *zap.Logger) { // Initialize handlers validationHandler := handlers.NewValidationHandler(validatorService, logger) @@ -44,13 +60,34 @@ func main() { // Start server addr := fmt.Sprintf(":%d", cfg.ServerPort) - logger.Info("Server starting", zap.String("address", addr)) + logger.Info("Server starting in HTTP mode", zap.String("address", addr)) if err := router.Run(addr); err != nil { logger.Fatal("Failed to start server", zap.Error(err)) } } +// runKafkaConsumer starts the Kafka consumer mode +func runKafkaConsumer(cfg *config.Config, validatorService *validator.Service, logger *zap.Logger) { + logger.Info("Starting in Kafka consumer mode", + zap.String("broker", cfg.KafkaBrokerURL), + zap.String("topic", cfg.KafkaTopic), + zap.String("groupID", cfg.KafkaGroupID)) + + consumer, err := kafka.NewConsumer(cfg, validatorService, logger) + if err != nil { + logger.Fatal("Failed to create Kafka consumer", zap.Error(err)) + } + defer consumer.Close() + + ctx := context.Background() + if err := consumer.Start(ctx); err != nil && err != context.Canceled { + logger.Fatal("Kafka consumer stopped with error", zap.Error(err)) + } + + logger.Info("Kafka consumer stopped gracefully") +} + func setupRouter(validationHandler *handlers.ValidationHandler, logger *zap.Logger) *gin.Engine { // Set Gin mode based on environment if os.Getenv("GIN_MODE") == "" { @@ -93,10 +130,10 @@ func initLogger(logLevel string) *zap.Logger { level = zapcore.ErrorLevel } - config := zap.NewProductionConfig() + config := zap.NewDevelopmentConfig() config.Level = zap.NewAtomicLevelAt(level) - config.EncoderConfig.TimeKey = "timestamp" - config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + config.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05") + config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder logger, err := config.Build() if err != nil { diff --git a/apps/guardrails-service/container/src/pkg/config/config.go b/apps/guardrails-service/container/src/pkg/config/config.go index 78e03de158..b17772b7a7 100644 --- a/apps/guardrails-service/container/src/pkg/config/config.go +++ b/apps/guardrails-service/container/src/pkg/config/config.go @@ -23,18 +23,53 @@ type Config struct { // Logging LogLevel string + + // Kafka configuration + KafkaEnabled bool + KafkaBrokerURL string + KafkaTopic string + KafkaGroupID string + KafkaUseTLS bool + KafkaUsername string + KafkaPassword string + KafkaBatchSize int + KafkaBatchLingerSec int // How long to wait before processing a partial batch (seconds) + KafkaMaxWaitSec int // Kafka fetch.max.wait - max time broker waits before returning data + + // Policy cache configuration + PolicyRefreshIntervalMin int // How often to refresh policies from database (minutes) + + // Traffic filter configuration (mutually exclusive) + // Supports comma-separated values for multiple matches, e.g., "api.example.com,app.example.com" + // Also supports regex patterns by prefixing with "regex:", e.g., "regex:.*\\.example\\.com" + FilterHost string // Filter traffic by host header (comma-separated or regex) + FilterPath string // Filter traffic by path prefix (comma-separated or regex) } // LoadConfig loads configuration from environment variables func LoadConfig() *Config { + dbAbstractorToken := getEnv("DATABASE_ABSTRACTOR_SERVICE_TOKEN", "") return &Config{ - ServerPort: getEnvAsInt("SERVER_PORT", 8080), - DatabaseAbstractorURL: getEnv("DATABASE_ABSTRACTOR_SERVICE_URL", "https://cyborg.akto.io"), - DatabaseAbstractorToken: getEnv("DATABASE_ABSTRACTOR_SERVICE_TOKEN", ""), - AgentGuardEngineURL: getEnv("AGENT_GUARD_ENGINE_URL", "https://akto-agent-guard-engine.billing-53a.workers.dev"), - ThreatBackendURL: getEnv("THREAT_BACKEND_URL", "https://tbs.akto.io"), - ThreatBackendToken: getEnv("THREAT_BACKEND_TOKEN", ""), - LogLevel: getEnv("LOG_LEVEL", "info"), + ServerPort: getEnvAsInt("SERVER_PORT", 8080), + DatabaseAbstractorURL: getEnv("DATABASE_ABSTRACTOR_SERVICE_URL", "https://cyborg.akto.io"), + DatabaseAbstractorToken: dbAbstractorToken, + AgentGuardEngineURL: getEnv("AGENT_GUARD_ENGINE_URL", "https://akto-agent-guard-engine.billing-53a.workers.dev"), + ThreatBackendURL: getEnv("THREAT_BACKEND_URL", "https://tbs.akto.io"), + ThreatBackendToken: getEnv("THREAT_BACKEND_TOKEN", dbAbstractorToken), + LogLevel: getEnv("LOG_LEVEL", "info"), + KafkaEnabled: getEnvAsBool("KAFKA_ENABLED", false), + KafkaBrokerURL: getEnv("KAFKA_BROKER_URL", "localhost:29092"), + KafkaTopic: getEnv("KAFKA_TOPIC", "akto.api.logs"), + KafkaGroupID: getEnv("KAFKA_GROUP_ID", "guardrails-service"), + KafkaUseTLS: getEnvAsBool("KAFKA_USE_TLS", false), + KafkaUsername: getEnv("KAFKA_USERNAME", ""), + KafkaPassword: getEnv("KAFKA_PASSWORD", ""), + KafkaBatchSize: getEnvAsInt("KAFKA_BATCH_SIZE", 100), + KafkaBatchLingerSec: getEnvAsInt("KAFKA_BATCH_LINGER_SEC", 5), + KafkaMaxWaitSec: getEnvAsInt("KAFKA_MAX_WAIT_SEC", 1), + PolicyRefreshIntervalMin: getEnvAsInt("POLICY_REFRESH_INTERVAL_MIN", 15), + FilterHost: getEnv("FILTER_HOST", ""), + FilterPath: getEnv("FILTER_PATH", ""), } } @@ -58,3 +93,16 @@ func getEnvAsInt(key string, defaultValue int) int { } return value } + +// getEnvAsBool gets an environment variable as a boolean or returns a default value +func getEnvAsBool(key string, defaultValue bool) bool { + valueStr := os.Getenv(key) + if valueStr == "" { + return defaultValue + } + value, err := strconv.ParseBool(valueStr) + if err != nil { + return defaultValue + } + return value +} diff --git a/apps/guardrails-service/container/src/pkg/dbabstractor/client.go b/apps/guardrails-service/container/src/pkg/dbabstractor/client.go index 741728d366..75bbf03b6a 100644 --- a/apps/guardrails-service/container/src/pkg/dbabstractor/client.go +++ b/apps/guardrails-service/container/src/pkg/dbabstractor/client.go @@ -42,9 +42,7 @@ func buildDatabaseAbstractorURL() string { dbAbsHost = "https://cyborg.akto.io" } - if strings.HasSuffix(dbAbsHost, "/") { - dbAbsHost = strings.TrimSuffix(dbAbsHost, "/") - } + dbAbsHost = strings.TrimSuffix(dbAbsHost, "/") return dbAbsHost + "/api" } diff --git a/apps/guardrails-service/container/src/pkg/kafka/consumer.go b/apps/guardrails-service/container/src/pkg/kafka/consumer.go new file mode 100644 index 0000000000..349fb95cb4 --- /dev/null +++ b/apps/guardrails-service/container/src/pkg/kafka/consumer.go @@ -0,0 +1,466 @@ +package kafka + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "io" + "os" + "os/signal" + "regexp" + "strings" + "syscall" + "time" + + "github.com/akto-api-security/guardrails-service/models" + "github.com/akto-api-security/guardrails-service/pkg/config" + "github.com/akto-api-security/guardrails-service/pkg/validator" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" + "go.uber.org/zap" +) + +const regexPrefix = "regex:" + +// Consumer represents a Kafka consumer for processing API traffic +type Consumer struct { + reader *kafka.Reader + validatorService *validator.Service + logger *zap.Logger + config *config.Config + batchSize int + // Pre-parsed filter values for performance + filterHosts []string + filterHostRegex *regexp.Regexp + filterPaths []string + filterPathRegex *regexp.Regexp +} + +// NewConsumer creates a new Kafka consumer +func NewConsumer(cfg *config.Config, validatorService *validator.Service, logger *zap.Logger) (*Consumer, error) { + reader := createKafkaReader(cfg, logger) + + consumer := &Consumer{ + reader: reader, + validatorService: validatorService, + logger: logger, + config: cfg, + batchSize: cfg.KafkaBatchSize, + } + + // Parse filter configurations + consumer.parseFilterConfig(logger) + + return consumer, nil +} + +// parseFilterConfig parses the filter configuration from config +func (c *Consumer) parseFilterConfig(logger *zap.Logger) { + // Parse host filter + if c.config.FilterHost != "" { + if strings.HasPrefix(c.config.FilterHost, regexPrefix) { + pattern := strings.TrimPrefix(c.config.FilterHost, regexPrefix) + compiled, err := regexp.Compile(pattern) + if err != nil { + logger.Error("Failed to compile host filter regex, filter disabled", + zap.String("pattern", pattern), + zap.Error(err)) + } else { + c.filterHostRegex = compiled + logger.Info("Host filter configured with regex", + zap.String("pattern", pattern)) + } + } else { + // Comma-separated values + hosts := strings.Split(c.config.FilterHost, ",") + for _, host := range hosts { + trimmed := strings.TrimSpace(host) + if trimmed != "" { + c.filterHosts = append(c.filterHosts, trimmed) + } + } + if len(c.filterHosts) > 0 { + logger.Info("Host filter configured with values", + zap.Strings("hosts", c.filterHosts)) + } + } + } + + // Parse path filter + if c.config.FilterPath != "" { + if strings.HasPrefix(c.config.FilterPath, regexPrefix) { + pattern := strings.TrimPrefix(c.config.FilterPath, regexPrefix) + compiled, err := regexp.Compile(pattern) + if err != nil { + logger.Error("Failed to compile path filter regex, filter disabled", + zap.String("pattern", pattern), + zap.Error(err)) + } else { + c.filterPathRegex = compiled + logger.Info("Path filter configured with regex", + zap.String("pattern", pattern)) + } + } else { + // Comma-separated values + paths := strings.Split(c.config.FilterPath, ",") + for _, path := range paths { + trimmed := strings.TrimSpace(path) + if trimmed != "" { + c.filterPaths = append(c.filterPaths, trimmed) + } + } + if len(c.filterPaths) > 0 { + logger.Info("Path filter configured with values", + zap.Strings("paths", c.filterPaths)) + } + } + } +} + +// createKafkaReader creates and configures a Kafka reader +func createKafkaReader(cfg *config.Config, logger *zap.Logger) *kafka.Reader { + dialer := &kafka.Dialer{ + Timeout: 10 * time.Second, + DualStack: true, + } + + // Configure TLS if enabled + if cfg.KafkaUseTLS { + tlsConfig, err := newTLSConfig() + if err != nil { + logger.Warn("Failed to create TLS config, continuing without TLS", zap.Error(err)) + } else { + dialer.TLS = tlsConfig + } + } + + // Configure SASL authentication if credentials provided + if cfg.KafkaUsername != "" && cfg.KafkaPassword != "" { + dialer.SASLMechanism = plain.Mechanism{ + Username: cfg.KafkaUsername, + Password: cfg.KafkaPassword, + } + logger.Info("Kafka SASL authentication configured", zap.String("username", cfg.KafkaUsername)) + } + + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{cfg.KafkaBrokerURL}, + Topic: cfg.KafkaTopic, + GroupID: cfg.KafkaGroupID, + Dialer: dialer, + MinBytes: 1, + MaxBytes: 10e6, // 10MB + MaxWait: time.Duration(cfg.KafkaMaxWaitSec) * time.Second, + CommitInterval: 1 * time.Second, + StartOffset: kafka.LastOffset, + }) + + logger.Info("Kafka reader created", + zap.String("broker", cfg.KafkaBrokerURL), + zap.String("topic", cfg.KafkaTopic), + zap.String("groupID", cfg.KafkaGroupID), + zap.Int("maxWaitSec", cfg.KafkaMaxWaitSec)) + + return reader +} + +// newTLSConfig creates a TLS configuration for Kafka +func newTLSConfig() (*tls.Config, error) { + tlsCACertPath := os.Getenv("KAFKA_TLS_CA_CERT_PATH") + if tlsCACertPath == "" { + tlsCACertPath = "./ca.crt" + } + + // Check if CA cert file exists + if _, err := os.Stat(tlsCACertPath); os.IsNotExist(err) { + // Return basic TLS config without custom CA + return &tls.Config{ + InsecureSkipVerify: os.Getenv("KAFKA_INSECURE_SKIP_VERIFY") == "true", + MinVersion: tls.VersionTLS12, + }, nil + } + caCert, err := os.ReadFile(tlsCACertPath) + if err != nil { + return nil, err + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + return &tls.Config{ + RootCAs: caCertPool, + InsecureSkipVerify: os.Getenv("KAFKA_INSECURE_SKIP_VERIFY") == "true", + MinVersion: tls.VersionTLS12, + }, nil +} + +// Start starts consuming messages from Kafka +func (c *Consumer) Start(ctx context.Context) error { + c.logger.Info("Starting Kafka consumer", + zap.String("topic", c.config.KafkaTopic), + zap.Int("batchSize", c.batchSize), + zap.Int("batchLingerSec", c.config.KafkaBatchLingerSec)) + + // Log filter configuration + if c.filterHostRegex != nil || len(c.filterHosts) > 0 { + c.logger.Info("Traffic filter active: filtering by host") + } else if c.filterPathRegex != nil || len(c.filterPaths) > 0 { + c.logger.Info("Traffic filter active: filtering by path") + } else { + c.logger.Info("No traffic filters configured, all traffic will be processed") + } + + // Setup signal handling for graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Create a cancellable context + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Handle shutdown signal + go func() { + sig := <-sigChan + c.logger.Info("Received shutdown signal", zap.String("signal", sig.String())) + cancel() + }() + + batch := make([]models.IngestDataBatch, 0, c.batchSize) + ticker := time.NewTicker(time.Duration(c.config.KafkaBatchLingerSec) * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + c.logger.Info("Context cancelled, processing remaining batch") + if len(batch) > 0 { + c.processBatch(ctx, batch) + } + return ctx.Err() + + case <-ticker.C: + // Process batch on timeout if we have messages + if len(batch) > 0 { + c.logger.Debug("Processing batch on timeout", zap.Int("size", len(batch))) + c.processBatch(ctx, batch) + batch = batch[:0] + } + + default: + // Read message with timeout + readCtx, readCancel := context.WithTimeout(ctx, 1*time.Second) + msg, err := c.reader.ReadMessage(readCtx) + readCancel() + + if err != nil { + // Check if parent context was cancelled first + if ctx.Err() != nil { + return nil + } + // Timeout is expected when no messages available, continue silently + if errors.Is(err, context.DeadlineExceeded) { + // Timeout is expected when no messages available, continue silently + continue + } + if errors.Is(err, context.Canceled) { + // Context was cancelled, exit gracefully + return nil + } + // kafka-go returns io.EOF when reader is closed or partition is exhausted + if errors.Is(err, io.EOF) { + return nil + } + c.logger.Error("Failed to read message from Kafka", zap.Error(err)) + continue + } + + // Parse the message + data, err := c.parseMessage(msg.Value) + if err != nil { + c.logger.Error("Failed to parse Kafka message", + zap.Error(err), + zap.String("value", string(msg.Value))) + continue + } + + // Apply traffic filter + if !c.filterTraffic(data) { + continue + } + + batch = append(batch, *data) + + // Process batch if full + if len(batch) >= c.batchSize { + c.logger.Debug("Processing full batch", zap.Int("size", len(batch))) + c.processBatch(ctx, batch) + batch = batch[:0] + } + } + } +} + +// parseMessage parses a Kafka message into IngestDataBatch +func (c *Consumer) parseMessage(value []byte) (*models.IngestDataBatch, error) { + var data models.IngestDataBatch + if err := json.Unmarshal(value, &data); err != nil { + return nil, err + } + return &data, nil +} + +// filterTraffic checks if the traffic should pass through based on host or path filters. +// Returns true if traffic should be processed, false if it should be filtered out. +// If no filters are configured, all traffic passes through. +// Host and path filters are mutually exclusive - only one is applied at a time. +// Supports comma-separated values (uses contains matching) and regex patterns (prefixed with "regex:"). +func (c *Consumer) filterTraffic(data *models.IngestDataBatch) bool { + // If no filters configured, allow all traffic + hasHostFilter := c.filterHostRegex != nil || len(c.filterHosts) > 0 + hasPathFilter := c.filterPathRegex != nil || len(c.filterPaths) > 0 + + if !hasHostFilter && !hasPathFilter { + return true + } + + // Host filter takes precedence (mutually exclusive) + if hasHostFilter { + host := c.extractHost(data) + if host == "" { + c.logger.Debug("No host found in traffic, filtering out", + zap.String("path", data.Path)) + return false + } + matches := c.matchHost(host) + if !matches { + c.logger.Debug("Host does not match filter, filtering out", + zap.String("host", host)) + } + return matches + } + + // Path filter + if hasPathFilter { + matches := c.matchPath(data.Path) + if !matches { + c.logger.Debug("Path does not match filter, filtering out", + zap.String("path", data.Path)) + } + return matches + } + + return true +} + +// matchHost checks if the host matches the configured filter (regex or comma-separated values) +// Uses case-insensitive contains check for comma-separated values +func (c *Consumer) matchHost(host string) bool { + if c.filterHostRegex != nil { + return c.filterHostRegex.MatchString(host) + } + hostLower := strings.ToLower(host) + for _, filterHost := range c.filterHosts { + if strings.Contains(hostLower, strings.ToLower(filterHost)) { + return true + } + } + return false +} + +// matchPath checks if the path matches the configured filter (regex or comma-separated values) +// Uses contains check for comma-separated values +func (c *Consumer) matchPath(path string) bool { + if c.filterPathRegex != nil { + return c.filterPathRegex.MatchString(path) + } + for _, filterPath := range c.filterPaths { + if strings.Contains(path, filterPath) { + return true + } + } + return false +} + +// extractHost extracts the host from request headers or path +func (c *Consumer) extractHost(data *models.IngestDataBatch) string { + // Try to get host from request headers + if data.RequestHeaders != "" { + var headers map[string]interface{} + if err := json.Unmarshal([]byte(data.RequestHeaders), &headers); err == nil { + // Check for Host header (case-insensitive) + for key, value := range headers { + if strings.EqualFold(key, "host") { + switch v := value.(type) { + case string: + return v + case []interface{}: + if len(v) > 0 { + if s, ok := v[0].(string); ok { + return s + } + } + } + } + } + } + } + + // Try to extract host from path if it's a full URL + if strings.HasPrefix(data.Path, "http://") || strings.HasPrefix(data.Path, "https://") { + // Parse URL to extract host + path := data.Path + // Remove protocol + if strings.HasPrefix(path, "https://") { + path = strings.TrimPrefix(path, "https://") + } else { + path = strings.TrimPrefix(path, "http://") + } + // Extract host (before first slash) + if idx := strings.Index(path, "/"); idx > 0 { + return path[:idx] + } + return path + } + + return "" +} + +// processBatch processes a batch of messages through the validator +func (c *Consumer) processBatch(ctx context.Context, batch []models.IngestDataBatch) { + if len(batch) == 0 { + return + } + + c.logger.Info("Processing batch from Kafka", zap.Int("size", len(batch))) + + results, err := c.validatorService.ValidateBatch(ctx, batch) + if err != nil { + c.logger.Error("Failed to validate batch", zap.Error(err)) + return + } + + // Log results summary + blockedRequests := 0 + blockedResponses := 0 + for _, result := range results { + if !result.RequestAllowed { + blockedRequests++ + } + if !result.ResponseAllowed { + blockedResponses++ + } + } + + c.logger.Info("Batch processing completed", + zap.Int("total", len(batch)), + zap.Int("blockedRequests", blockedRequests), + zap.Int("blockedResponses", blockedResponses)) +} + +// Close closes the Kafka consumer +func (c *Consumer) Close() error { + c.logger.Info("Closing Kafka consumer") + return c.reader.Close() +} diff --git a/apps/guardrails-service/container/src/pkg/validator/service.go b/apps/guardrails-service/container/src/pkg/validator/service.go index 71b02ffd66..a6a61bd2d8 100644 --- a/apps/guardrails-service/container/src/pkg/validator/service.go +++ b/apps/guardrails-service/container/src/pkg/validator/service.go @@ -5,21 +5,34 @@ import ( "encoding/json" "fmt" "regexp" + "sync" + "time" - "github.com/akto-api-security/mcp-endpoint-shield/mcp" - "github.com/akto-api-security/mcp-endpoint-shield/mcp/types" "github.com/akto-api-security/guardrails-service/models" "github.com/akto-api-security/guardrails-service/pkg/config" "github.com/akto-api-security/guardrails-service/pkg/dbabstractor" + "github.com/akto-api-security/mcp-endpoint-shield/mcp" + "github.com/akto-api-security/mcp-endpoint-shield/mcp/types" "go.uber.org/zap" ) +// policyCache holds cached policies and their metadata +type policyCache struct { + policies []types.Policy + auditPolicies map[string]*types.AuditPolicy + compiledRules map[string]*regexp.Regexp + hasAuditRules bool + lastFetched time.Time + mu sync.RWMutex +} + // Service handles payload validation using akto-gateway library type Service struct { config *config.Config dbClient *dbabstractor.Client processor mcp.RequestProcessor logger *zap.Logger + cache *policyCache } // NewService creates a new validator service @@ -41,8 +54,8 @@ func NewService(cfg *config.Config, logger *zap.Logger) (*Service, error) { validator, ingestor, sessionMgr, - "", // sessionID - empty for our use case - "", // projectName - empty for our use case + "", // sessionID - empty for our use case + "", // projectName - empty for our use case false, // skipThreat - false to enable threat reporting ) @@ -51,9 +64,68 @@ func NewService(cfg *config.Config, logger *zap.Logger) (*Service, error) { dbClient: dbClient, processor: processor, logger: logger, + cache: &policyCache{}, }, nil } +// getCachedPolicies returns cached policies if still valid, otherwise fetches fresh policies +func (s *Service) getCachedPolicies() ([]types.Policy, map[string]*types.AuditPolicy, map[string]*regexp.Regexp, bool, error) { + refreshInterval := time.Duration(s.config.PolicyRefreshIntervalMin) * time.Minute + + // Check if cache is valid + s.cache.mu.RLock() + if !s.cache.lastFetched.IsZero() && time.Since(s.cache.lastFetched) < refreshInterval { + policies := s.cache.policies + auditPolicies := s.cache.auditPolicies + compiledRules := s.cache.compiledRules + hasAuditRules := s.cache.hasAuditRules + // RUnlock so that multiple goroutines can read the cached policies + s.cache.mu.RUnlock() + s.logger.Debug("Using cached policies", + zap.Time("lastFetched", s.cache.lastFetched), + zap.Int("policiesCount", len(policies))) + return policies, auditPolicies, compiledRules, hasAuditRules, nil + } + s.cache.mu.RUnlock() + + // Cache is stale or empty, fetch fresh policies + return s.refreshPolicies() +} + +// refreshPolicies fetches fresh policies from database and updates the cache +func (s *Service) refreshPolicies() ([]types.Policy, map[string]*types.AuditPolicy, map[string]*regexp.Regexp, bool, error) { + s.cache.mu.Lock() + defer s.cache.mu.Unlock() + + // Double-check: another goroutine might have refreshed while we waited for the lock + refreshInterval := time.Duration(s.config.PolicyRefreshIntervalMin) * time.Minute + if !s.cache.lastFetched.IsZero() && time.Since(s.cache.lastFetched) < refreshInterval { + s.logger.Debug("Cache was refreshed by another goroutine, using cached policies") + return s.cache.policies, s.cache.auditPolicies, s.cache.compiledRules, s.cache.hasAuditRules, nil + } + + s.logger.Info("Refreshing policies cache") + + policies, auditPolicies, compiledRules, hasAuditRules, err := s.fetchAndParsePolicies() + if err != nil { + return nil, nil, nil, false, err + } + + // Update cache + s.cache.policies = policies + s.cache.auditPolicies = auditPolicies + s.cache.compiledRules = compiledRules + s.cache.hasAuditRules = hasAuditRules + s.cache.lastFetched = time.Now() + + s.logger.Info("Policy cache refreshed", + zap.Int("policiesCount", len(policies)), + zap.Int("auditPoliciesCount", len(auditPolicies)), + zap.Time("lastFetched", s.cache.lastFetched)) + + return policies, auditPolicies, compiledRules, hasAuditRules, nil +} + // fetchAndParsePolicies fetches policies from database abstractor and parses them func (s *Service) fetchAndParsePolicies() ([]types.Policy, map[string]*types.AuditPolicy, map[string]*regexp.Regexp, bool, error) { // Fetch guardrail policies from database abstractor @@ -166,8 +238,8 @@ func (s *Service) fetchAndParsePolicies() ([]types.Policy, map[string]*types.Aud func (s *Service) ValidateRequest(ctx context.Context, payload string) (*mcp.ValidationResult, error) { s.logger.Info("Validating request payload") - // Fetch and parse policies from database abstractor - policies, auditPolicies, compiledRules, hasAuditRules, err := s.fetchAndParsePolicies() + // Get cached policies (refreshes if stale) + policies, auditPolicies, compiledRules, hasAuditRules, err := s.getCachedPolicies() if err != nil { return nil, fmt.Errorf("failed to load policies: %w", err) } @@ -219,8 +291,8 @@ func (s *Service) ValidateRequest(ctx context.Context, payload string) (*mcp.Val func (s *Service) ValidateResponse(ctx context.Context, payload string) (*mcp.ValidationResult, error) { s.logger.Info("Validating response payload") - // Fetch and parse policies from database abstractor - policies, _, _, _, err := s.fetchAndParsePolicies() + // Get cached policies (refreshes if stale) + policies, _, _, _, err := s.getCachedPolicies() if err != nil { return nil, fmt.Errorf("failed to load policies: %w", err) } @@ -257,8 +329,8 @@ func (s *Service) ValidateResponse(ctx context.Context, payload string) (*mcp.Va func (s *Service) ValidateBatch(ctx context.Context, batchData []models.IngestDataBatch) ([]ValidationBatchResult, error) { s.logger.Info("Validating batch data", zap.Int("count", len(batchData))) - // Fetch policies once for the entire batch to improve performance - policies, auditPolicies, _, hasAuditRules, err := s.fetchAndParsePolicies() + // Get cached policies (refreshes if stale) + policies, auditPolicies, _, hasAuditRules, err := s.getCachedPolicies() if err != nil { return nil, fmt.Errorf("failed to load policies: %w", err) } @@ -363,6 +435,7 @@ func (s *Service) ValidateBatch(ctx context.Context, batchData []models.IngestDa shouldReport := false if reqResult != nil && (!reqResult.Allowed || reqResult.Modified) { shouldReport = true + s.logger.Warn("Request blocked or modified by guardrails", zap.Int("index", i), zap.String("method", data.Method), @@ -383,11 +456,8 @@ func (s *Service) ValidateBatch(ctx context.Context, batchData []models.IngestDa zap.String("reason", result.ResponseReason)) } - // Report to threat backend if threat detected - // TODO: Implement threat reporting using new API if shouldReport { - // s.reportThreat(ctx, &data, reqResult, respResult) - s.logger.Info("Threat detected but reporting not yet implemented", + s.logger.Info("Threat detected", zap.String("method", data.Method), zap.String("path", data.Path)) } @@ -396,36 +466,6 @@ func (s *Service) ValidateBatch(ctx context.Context, batchData []models.IngestDa return results, nil } -// reportThreat reports a detected threat to the dashboard -// TODO: Reimplement using new API when threat reporting is available -/* -func (s *Service) reportThreat(ctx context.Context, data *models.IngestDataBatch, reqResult, respResult *mcp.ValidationResult) { - // Prepare headers - reqHeaders := make(map[string]string) - respHeaders := make(map[string]string) - - // Parse status code - statusCode := 0 - if data.StatusCode != "" { - // Convert string to int (simplified, add proper error handling in production) - fmt.Sscanf(data.StatusCode, "%d", &statusCode) - } - - // Determine metadata from validation results - metadata := make(map[string]any) - if reqResult != nil && reqResult.Metadata != nil { - metadata = reqResult.Metadata - } else if respResult != nil && respResult.Metadata != nil { - metadata = respResult.Metadata - } - - // TODO: Implement threat reporting with new API - s.logger.Info("Threat reporting not yet implemented", - zap.String("method", data.Method), - zap.String("path", data.Path)) -} -*/ - // FetchPolicies fetches guardrail policies from database-abstractor func (s *Service) FetchPolicies() error { s.logger.Info("Fetching guardrail policies from database-abstractor") @@ -446,17 +486,17 @@ func (s *Service) FetchPolicies() error { // ValidationBatchResult represents the validation result for a single batch item type ValidationBatchResult struct { - Index int `json:"index"` - Method string `json:"method"` - Path string `json:"path"` - RequestAllowed bool `json:"requestAllowed"` - RequestModified bool `json:"requestModified"` - RequestModifiedPayload string `json:"requestModifiedPayload,omitempty"` - RequestReason string `json:"requestReason,omitempty"` - RequestError string `json:"requestError,omitempty"` - ResponseAllowed bool `json:"responseAllowed"` - ResponseModified bool `json:"responseModified"` - ResponseModifiedPayload string `json:"responseModifiedPayload,omitempty"` - ResponseReason string `json:"responseReason,omitempty"` - ResponseError string `json:"responseError,omitempty"` + Index int `json:"index"` + Method string `json:"method"` + Path string `json:"path"` + RequestAllowed bool `json:"requestAllowed"` + RequestModified bool `json:"requestModified"` + RequestModifiedPayload string `json:"requestModifiedPayload,omitempty"` + RequestReason string `json:"requestReason,omitempty"` + RequestError string `json:"requestError,omitempty"` + ResponseAllowed bool `json:"responseAllowed"` + ResponseModified bool `json:"responseModified"` + ResponseModifiedPayload string `json:"responseModifiedPayload,omitempty"` + ResponseReason string `json:"responseReason,omitempty"` + ResponseError string `json:"responseError,omitempty"` }