diff --git a/.github/workflows/certificatelogverify-collector-image.yml b/.github/workflows/certificatelogverify-collector-image.yml new file mode 100644 index 0000000000000..e595dee06fde4 --- /dev/null +++ b/.github/workflows/certificatelogverify-collector-image.yml @@ -0,0 +1,60 @@ +name: certificatelogverify-collector-image + +on: + push: + branches: [veryfySignProcessor] + pull_request: + workflow_dispatch: + +permissions: read-all + +jobs: + build-image: + runs-on: ubuntu-24.04 + if: ${{ github.actor != 'dependabot[bot]' }} + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + - uses: ./.github/actions/setup-go-tools + with: + go-version: oldstable + - name: Build collector and docker image + run: | + make genotelcontribcol + make docker-otelcontribcol + - name: Tag image for certificatelogverify build + run: | + docker tag otelcontribcol:latest ghcr.io/${{ github.repository_owner }}/otelcol-certificatelogverify:${{ github.sha }} + - name: Validate image + run: | + docker run --rm ghcr.io/${{ github.repository_owner }}/otelcol-certificatelogverify:${{ github.sha }} --version + + publish-image: + runs-on: ubuntu-24.04 + needs: [build-image] + if: github.ref == 'refs/heads/veryfySignProcessor' && github.event_name != 'pull_request' + permissions: + packages: write + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + - uses: ./.github/actions/setup-go-tools + with: + go-version: oldstable + - name: Build collector and docker image + run: | + make genotelcontribcol + make docker-otelcontribcol + - name: Login to GitHub Container Registry + uses: docker/login-action@c94ce9fb468520275223c153574b00df6fe4bcc9 # v3 + with: + registry: ghcr.io + username: ${{ github.repository_owner }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Tag and push sha image + run: | + docker tag otelcontribcol:latest ghcr.io/${{ github.repository_owner }}/otelcol-certificatelogverify:${{ github.sha }} + docker push ghcr.io/${{ github.repository_owner }}/otelcol-certificatelogverify:${{ github.sha }} + - name: Tag and push latest image (main only) + if: github.ref == 'refs/heads/veryfySignProcessor' + run: | + docker tag otelcontribcol:latest ghcr.io/${{ github.repository_owner }}/otelcol-certificatelogverify:latest + docker push ghcr.io/${{ github.repository_owner }}/otelcol-certificatelogverify:latest diff --git a/cmd/otelcontribcol/builder-config.yaml b/cmd/otelcontribcol/builder-config.yaml index a21110e7bab0c..a3d3c30e19654 100644 --- a/cmd/otelcontribcol/builder-config.yaml +++ b/cmd/otelcontribcol/builder-config.yaml @@ -142,6 +142,7 @@ processors: - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/unrollprocessor v0.147.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/remotetapprocessor v0.147.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogsemanticsprocessor v0.147.0 + - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/certificatelogverifyprocessor v0.147.0 receivers: - gomod: go.opentelemetry.io/collector/receiver/nopreceiver v0.147.0 diff --git a/processor/certificatelogverifyprocessor/Makefile b/processor/certificatelogverifyprocessor/Makefile new file mode 100644 index 0000000000000..ded7a36092dc3 --- /dev/null +++ b/processor/certificatelogverifyprocessor/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/processor/certificatelogverifyprocessor/README.md b/processor/certificatelogverifyprocessor/README.md new file mode 100644 index 0000000000000..b8d278e34946b --- /dev/null +++ b/processor/certificatelogverifyprocessor/README.md @@ -0,0 +1,293 @@ +# Certificate Log Verify Processor + +The Certificate Log Verify Processor verifies the integrity and authenticity of log records that have been signed with a certificate. It reads certificate-signed logs, recomputes their hash, and verifies the signature using a certificate stored in Kubernetes secrets. + +## Overview + +This processor: +- Fetches certificates from Kubernetes secrets +- Verifies log signatures by recomputing hashes based on the signed content (body/meta/attr) +- Validates signatures using RSA public keys from certificates +- Ensures log integrity and authenticity + +## Prerequisites + +- Go 1.24+ (for building) +- Kubernetes cluster with kubectl configured +- OpenTelemetry Collector Contrib repository +- Certificate and private key files (for creating K8s secrets) + +## Building + +### Build the Collector with the Processor + +From the repository root, build the entire collector with this processor included: + +```bash +make otelcontribcol +``` + +This will create the binary at `bin/otelcontribcol_linux_amd64`. + +### Build Docker Image + +A Dockerfile is provided in the `k8s/` directory. Build the Docker image from the repository root: + +```bash +cd processor/certificatelogverifyprocessor/k8s +docker build -f Dockerfile -t certificatelogverify-collector:latest ../../.. +``` + +Or from the repository root: + +```bash +docker build -f processor/certificatelogverifyprocessor/k8s/Dockerfile -t certificatelogverify-collector:latest . +``` + +The Dockerfile uses a multi-stage build to create a minimal, secure image with the collector binary. + +## Kubernetes Deployment + +All Kubernetes manifests are provided in the `k8s/` directory. You can deploy everything using the provided scripts or manually with kubectl. + +### Deploy to kind (After Building) + +If you already built the Docker image locally (for example `certificatelogverify-collector:latest`), load it into your kind cluster before deploying: + +```bash +# 1. Make sure your kind cluster exists +kind get clusters + +# 2. Load the local image into kind +kind load docker-image certificatelogverify-collector:latest --name kind + +# 3. Deploy collector resources +cd processor/certificatelogverifyprocessor/k8s +kubectl apply -k . + +# 4. Wait for rollout and inspect logs +kubectl rollout status deployment/otelcol-certificatelogverify -n otel-demo +kubectl logs -n otel-demo -l app=otelcol-certificatelogverify +``` + +If your kind cluster has a different name, replace `--name kind` with your cluster name. + +### Quick Deployment + +The easiest way to deploy is using the provided deployment scripts: + +**Linux/Mac:** +```bash +cd processor/certificatelogverifyprocessor/k8s +./deploy.sh +``` + +**Windows (PowerShell):** +```powershell +cd processor\certificatelogverifyprocessor\k8s +.\deploy.ps1 +``` + +### Manual Deployment + +#### Step 1: Create Kubernetes Secret + +First, create a Kubernetes secret containing your certificate: + +```bash +kubectl create namespace otel-demo + +kubectl create secret generic otelcol-test-certs \ + --from-file=cert.pem=./cert.pem \ + --from-file=key.pem=./key.pem \ + --from-file=ca.pem=./ca.pem \ + -n otel-demo +``` + +Or use the provided PowerShell script: + +```powershell +.\k8s\create-k8s-secret.ps1 +``` + +#### Step 2: Deploy Kubernetes Resources + +All YAML files are provided in the `k8s/` directory: + +- `namespace.yaml` - Creates the `otel-demo` namespace +- `rbac.yaml` - ServiceAccount, Role, and RoleBinding for secret access +- `configmap.yaml` - Collector configuration with the certificatelogverify processor +- `deployment.yaml` - Deployment with health checks and resource limits +- `service.yaml` - ClusterIP service exposing OTLP endpoints + +Deploy all resources: + +```bash +cd processor/certificatelogverifyprocessor/k8s + +# Deploy individually +kubectl apply -f namespace.yaml +kubectl apply -f rbac.yaml +kubectl apply -f configmap.yaml +kubectl apply -f deployment.yaml +kubectl apply -f service.yaml + +# Or deploy all at once +kubectl apply -f namespace.yaml -f rbac.yaml -f configmap.yaml -f deployment.yaml -f service.yaml + +# Or using kustomize +kubectl apply -k . +``` + +#### Step 3: Verify Deployment + +Check if the pod is running: + +```bash +kubectl get pods -n otel-demo -l app=otelcol-certificatelogverify +``` + +View logs: + +```bash +kubectl logs -n otel-demo -l app=otelcol-certificatelogverify +``` + +Check service: + +```bash +kubectl get svc -n otel-demo otelcol-certificatelogverify +``` + +### Configuration Options + +The processor configuration in `configmap.yaml` can be customized. Available options: + +- `hash_algorithm`: Hash algorithm used for verification (`SHA256` or `SHA512`). Default: `SHA256` +- `sign_content`: What content was signed (`body`, `meta`, or `attr`). Default: `body` + - `body`: Only log body + - `meta`: Body + metadata (timestamp, severity, trace_id, span_id) + - `attr`: Body + metadata + attributes (excluding `otel.log.*` attributes) +- `k8s_secret`: Kubernetes secret configuration + - `name`: Secret name (required) + - `namespace`: Secret namespace (default: `default`) + - `cert_key`: Key name in secret containing the certificate (required) + +To update the configuration: + +```bash +# Edit configmap.yaml, then apply +kubectl apply -f configmap.yaml + +# Restart the deployment to pick up changes +kubectl rollout restart deployment/otelcol-certificatelogverify -n otel-demo +``` + +## How It Works + +1. **Certificate Loading**: On startup, the processor fetches the certificate from the specified Kubernetes secret +2. **Log Processing**: For each log record, it: + - Reads `otel.log.hash`, `otel.log.signature`, and `otel.log.sign_content` attributes + - Uses `sign_content` from the log attribute (set by the signing processor) or falls back to config + - Recomputes the hash using the same serialization logic as the signing processor + - Compares the recomputed hash with the received hash + - Verifies the signature using the certificate's RSA public key +3. **Error Handling**: If verification fails, an error is logged and the log record continues processing + +## Expected Log Attributes + +The processor expects log records to have these attributes (set by the signing processor): + +- `otel.log.hash`: Base64-encoded hash of the serialized log content +- `otel.log.signature`: Base64-encoded RSA signature of the hash +- `otel.log.sign_content`: Indicates what content was signed (`body`, `meta`, or `attr`) + +## Troubleshooting + +### Secret Not Found + +If you see errors about secrets not being found: + +```bash +kubectl get secret otelcol-test-certs -n otel-demo +kubectl describe secret otelcol-test-certs -n otel-demo +``` + +### Permission Denied + +Verify RBAC is correctly configured: + +```bash +kubectl get role otelcol-secret-reader -n otel-demo +kubectl get rolebinding otelcol-secret-reader -n otel-demo +``` + +### Verification Failures + +Check collector logs for detailed error messages. Common issues: +- Hash mismatch: Signing and verification processors using different `sign_content` values +- Signature verification failed: Certificate mismatch or corrupted signature +- Missing attributes: Logs not processed by the signing processor first + +## Example: Complete Deployment + +### Using Deployment Scripts (Recommended) + +```bash +# 1. Build Docker image +cd processor/certificatelogverifyprocessor/k8s +docker build -f Dockerfile -t certificatelogverify-collector:latest ../.. + +# 2. Create secret (if not already created) +kubectl create secret generic otelcol-test-certs \ + --from-file=cert.pem=./cert.pem \ + --from-file=key.pem=./key.pem \ + -n otel-demo + +# 3. Deploy everything using the script +./deploy.sh # or .\deploy.ps1 on Windows +``` + +### Manual Deployment + +```bash +cd processor/certificatelogverifyprocessor/k8s + +# 1. Create namespace +kubectl apply -f namespace.yaml + +# 2. Create secret +kubectl create secret generic otelcol-test-certs \ + --from-file=cert.pem=./cert.pem \ + --from-file=key.pem=./key.pem \ + -n otel-demo + +# 3. Deploy all resources +kubectl apply -f rbac.yaml +kubectl apply -f configmap.yaml +kubectl apply -f deployment.yaml +kubectl apply -f service.yaml + +# 4. Check status +kubectl get pods -n otel-demo +kubectl logs -n otel-demo -l app=otelcol-certificatelogverify +``` + +### Using Kustomize + +```bash +cd processor/certificatelogverifyprocessor/k8s + +# Create secret first +kubectl create secret generic otelcol-test-certs \ + --from-file=cert.pem=./cert.pem \ + --from-file=key.pem=./key.pem \ + -n otel-demo + +# Deploy everything with kustomize +kubectl apply -k . +``` + +## Related Components + +This processor is designed to work with the certificate signing processor (on branch `signLogsInsideProcesor`) that adds the signature attributes to log records. diff --git a/processor/certificatelogverifyprocessor/certificate_reader.go b/processor/certificatelogverifyprocessor/certificate_reader.go new file mode 100644 index 0000000000000..b572c1856b6b2 --- /dev/null +++ b/processor/certificatelogverifyprocessor/certificate_reader.go @@ -0,0 +1,188 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package certificatelogverifyprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/certificatelogverifyprocessor" + +import ( + "context" + "crypto/rsa" + "crypto/x509" + "encoding/base64" + "encoding/pem" + "fmt" + "strings" + + "go.uber.org/zap" +) + +type CertificateReader struct { + cert *x509.Certificate + key *rsa.PrivateKey +} + +func NewCertificateReaderFromK8sSecret(ctx context.Context, config *K8sSecretConfig, logger interface{}) (*CertificateReader, error) { + var zapLogger *zap.Logger + if l, ok := logger.(*zap.Logger); ok { + zapLogger = l + } + + certPEM, err := fetchSecretData(ctx, config.Name, config.Namespace, config.CertKey, zapLogger) + if err != nil { + return nil, fmt.Errorf("failed to fetch certificate from k8s secret: %w", err) + } + + keyPEM, err := fetchSecretData(ctx, config.Name, config.Namespace, config.KeyKey, zapLogger) + if err != nil { + return nil, fmt.Errorf("failed to fetch private key from k8s secret: %w", err) + } + + certPEM = decodeIfBase64(certPEM) + keyPEM = decodeIfBase64(keyPEM) + + certPEM = normalizeLineEndings(certPEM) + keyPEM = normalizeLineEndings(keyPEM) + + return parseCertificateData(certPEM, keyPEM) +} + +func NewCertificateReaderFromK8sSecretForVerification(ctx context.Context, config *K8sSecretConfig, logger interface{}) (*CertificateReader, error) { + var zapLogger *zap.Logger + if l, ok := logger.(*zap.Logger); ok { + zapLogger = l + } + + certPEM, err := fetchSecretData(ctx, config.Name, config.Namespace, config.CertKey, zapLogger) + if err != nil { + return nil, fmt.Errorf("failed to fetch certificate from k8s secret: %w", err) + } + + certPEM = decodeIfBase64(certPEM) + certPEM = normalizeLineEndings(certPEM) + + return parseCertificateForVerification(certPEM) +} + +func decodeIfBase64(data []byte) []byte { + if len(data) == 0 { + return data + } + + dataStr := strings.TrimSpace(string(data)) + if !strings.HasPrefix(dataStr, "-----BEGIN") { + decoded, err := base64.StdEncoding.DecodeString(dataStr) + if err == nil && len(decoded) > 0 { + decodedStr := string(decoded) + if strings.HasPrefix(decodedStr, "-----BEGIN") { + return decoded + } + } + } + return data +} + +func normalizeLineEndings(data []byte) []byte { + dataStr := string(data) + dataStr = strings.ReplaceAll(dataStr, "\r\n", "\n") + dataStr = strings.ReplaceAll(dataStr, "\r", "\n") + return []byte(dataStr) +} + +func parseCertificateData(certPEM, keyPEM []byte) (*CertificateReader, error) { + if len(certPEM) == 0 { + return nil, fmt.Errorf("certificate data is empty") + } + if len(keyPEM) == 0 { + return nil, fmt.Errorf("private key data is empty") + } + + certStr := string(certPEM) + if !strings.Contains(certStr, "-----BEGIN") { + return nil, fmt.Errorf("certificate data does not appear to be PEM format (data length: %d, first 100 bytes: %q)", len(certPEM), string(certPEM[:min(100, len(certPEM))])) + } + + certBlock, _ := pem.Decode(certPEM) + if certBlock == nil { + return nil, fmt.Errorf("failed to decode PEM certificate (data length: %d, first 100 bytes: %q)", len(certPEM), string(certPEM[:min(100, len(certPEM))])) + } + + cert, err := x509.ParseCertificate(certBlock.Bytes) + if err != nil { + return nil, fmt.Errorf("failed to parse certificate: %w", err) + } + + keyStr := string(keyPEM) + if !strings.Contains(keyStr, "-----BEGIN") { + return nil, fmt.Errorf("private key data does not appear to be PEM format (data length: %d, first 100 bytes: %q)", len(keyPEM), string(keyPEM[:min(100, len(keyPEM))])) + } + + keyBlock, _ := pem.Decode(keyPEM) + if keyBlock == nil { + return nil, fmt.Errorf("failed to decode PEM private key (data length: %d, first 100 bytes: %q)", len(keyPEM), string(keyPEM[:min(100, len(keyPEM))])) + } + + var key *rsa.PrivateKey + if keyBlock.Type == "RSA PRIVATE KEY" { + key, err = x509.ParsePKCS1PrivateKey(keyBlock.Bytes) + if err != nil { + return nil, fmt.Errorf("failed to parse PKCS1 private key: %w", err) + } + } else if keyBlock.Type == "PRIVATE KEY" { + parsedKey, err := x509.ParsePKCS8PrivateKey(keyBlock.Bytes) + if err != nil { + return nil, fmt.Errorf("failed to parse PKCS8 private key: %w", err) + } + var ok bool + key, ok = parsedKey.(*rsa.PrivateKey) + if !ok { + return nil, fmt.Errorf("private key is not RSA") + } + } else { + return nil, fmt.Errorf("unsupported private key type: %s", keyBlock.Type) + } + + return &CertificateReader{ + cert: cert, + key: key, + }, nil +} + +func parseCertificateForVerification(certPEM []byte) (*CertificateReader, error) { + if len(certPEM) == 0 { + return nil, fmt.Errorf("certificate data is empty") + } + + certStr := string(certPEM) + if !strings.Contains(certStr, "-----BEGIN") { + return nil, fmt.Errorf("certificate data does not appear to be PEM format (data length: %d, first 100 bytes: %q)", len(certPEM), string(certPEM[:min(100, len(certPEM))])) + } + + certBlock, _ := pem.Decode(certPEM) + if certBlock == nil { + return nil, fmt.Errorf("failed to decode PEM certificate (data length: %d, first 100 bytes: %q)", len(certPEM), string(certPEM[:min(100, len(certPEM))])) + } + + cert, err := x509.ParseCertificate(certBlock.Bytes) + if err != nil { + return nil, fmt.Errorf("failed to parse certificate: %w", err) + } + + return &CertificateReader{ + cert: cert, + key: nil, + }, nil +} + +func (cr *CertificateReader) GetPrivateKey() *rsa.PrivateKey { + return cr.key +} + +func (cr *CertificateReader) GetCertificate() *x509.Certificate { + return cr.cert +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/processor/certificatelogverifyprocessor/config.go b/processor/certificatelogverifyprocessor/config.go new file mode 100644 index 0000000000000..630f167f16164 --- /dev/null +++ b/processor/certificatelogverifyprocessor/config.go @@ -0,0 +1,85 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package certificatelogverifyprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/certificatelogverifyprocessor" + +import ( + "crypto" + "errors" + + "go.opentelemetry.io/collector/component" +) + +const ( + defaultHashAlgorithm = "SHA256" + defaultSignContent = "body" +) + +const ( + SignContentBody = "body" + SignContentMeta = "meta" + SignContentAttr = "attr" +) + +var ( + errInvalidHashAlgorithm = errors.New("hash_algorithm must be SHA256 or SHA512") + errInvalidSignContent = errors.New("sign_content must be body, meta, or attr") +) + +type Config struct { + HashAlgorithm string `mapstructure:"hash_algorithm"` + SignContent string `mapstructure:"sign_content"` + K8sSecret *K8sSecretConfig `mapstructure:"k8s_secret"` +} + +type K8sSecretConfig struct { + Name string `mapstructure:"name"` + Namespace string `mapstructure:"namespace"` + CertKey string `mapstructure:"cert_key"` + KeyKey string `mapstructure:"key_key"` + CAKey string `mapstructure:"ca_key"` +} + +func createDefaultConfig() component.Config { + return &Config{ + HashAlgorithm: defaultHashAlgorithm, + SignContent: defaultSignContent, + } +} + +func (c *Config) Validate() error { + if c.HashAlgorithm != "SHA256" && c.HashAlgorithm != "SHA512" { + return errInvalidHashAlgorithm + } + + if c.SignContent == "" { + c.SignContent = defaultSignContent + } else if c.SignContent != SignContentBody && c.SignContent != SignContentMeta && c.SignContent != SignContentAttr { + return errInvalidSignContent + } + + if c.K8sSecret == nil { + return errors.New("k8s_secret is required") + } + + if c.K8sSecret.Name == "" { + return errors.New("k8s_secret.name is required") + } + if c.K8sSecret.CertKey == "" { + return errors.New("k8s_secret.cert_key is required") + } + if c.K8sSecret.Namespace == "" { + c.K8sSecret.Namespace = "default" + } + + return nil +} + +func (c *Config) GetHash() crypto.Hash { + if c.HashAlgorithm == "SHA512" { + return crypto.SHA512 + } + return crypto.SHA256 +} + +var _ component.Config = (*Config)(nil) diff --git a/processor/certificatelogverifyprocessor/factory.go b/processor/certificatelogverifyprocessor/factory.go new file mode 100644 index 0000000000000..205bd8e1101df --- /dev/null +++ b/processor/certificatelogverifyprocessor/factory.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package certificatelogverifyprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/certificatelogverifyprocessor" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/certificatelogverifyprocessor/internal/metadata" +) + +func NewFactory() processor.Factory { + return processor.NewFactory( + metadata.Type, + createDefaultConfig, + processor.WithLogs(createLogsProcessor, metadata.LogsStability), + ) +} + +func createLogsProcessor( + ctx context.Context, + settings processor.Settings, + cfg component.Config, + nextConsumer consumer.Logs, +) (processor.Logs, error) { + processorCfg, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("invalid config type: %+v", cfg) + } + + if err := processorCfg.Validate(); err != nil { + return nil, fmt.Errorf("invalid config: %w", err) + } + + proc, err := newProcessor(processorCfg, nextConsumer, settings) + if err != nil { + return nil, fmt.Errorf("error creating processor: %w", err) + } + + return proc, nil +} diff --git a/processor/certificatelogverifyprocessor/go.mod b/processor/certificatelogverifyprocessor/go.mod new file mode 100644 index 0000000000000..05e87ee257ad3 --- /dev/null +++ b/processor/certificatelogverifyprocessor/go.mod @@ -0,0 +1,62 @@ +module github.com/open-telemetry/opentelemetry-collector-contrib/processor/certificatelogverifyprocessor + +go 1.25.0 + +require ( + go.opentelemetry.io/collector/component v1.53.0 + go.opentelemetry.io/collector/consumer v1.53.0 + go.opentelemetry.io/collector/pdata v1.53.0 + go.opentelemetry.io/collector/processor v1.53.0 + go.uber.org/zap v1.27.1 + k8s.io/apimachinery v0.35.2 + k8s.io/client-go v0.35.2 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/emicklei/go-restful/v3 v3.12.2 // indirect + github.com/fxamacker/cbor/v2 v2.9.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-openapi/jsonpointer v0.21.0 // indirect + github.com/go-openapi/jsonreference v0.20.2 // indirect + github.com/go-openapi/swag v0.23.0 // indirect + github.com/google/gnostic-models v0.7.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.8.0 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/mailru/easyjson v0.7.7 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/spf13/pflag v1.0.9 // indirect + github.com/x448/float16 v0.8.4 // indirect + go.opentelemetry.io/collector/featuregate v1.53.0 // indirect + go.opentelemetry.io/collector/internal/componentalias v0.147.0 // indirect + go.opentelemetry.io/collector/pipeline v1.53.0 // indirect + go.opentelemetry.io/otel v1.40.0 // indirect + go.opentelemetry.io/otel/metric v1.40.0 // indirect + go.opentelemetry.io/otel/trace v1.40.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/net v0.51.0 // indirect + golang.org/x/oauth2 v0.30.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/term v0.40.0 // indirect + golang.org/x/text v0.34.0 // indirect + golang.org/x/time v0.9.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect + gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/api v0.35.2 // indirect + k8s.io/klog/v2 v2.130.1 // indirect + k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect + k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect + sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect + sigs.k8s.io/randfill v1.0.0 // indirect + sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect + sigs.k8s.io/yaml v1.6.0 // indirect +) diff --git a/processor/certificatelogverifyprocessor/internal/metadata/metadata.go b/processor/certificatelogverifyprocessor/internal/metadata/metadata.go new file mode 100644 index 0000000000000..1dc2431e25a5f --- /dev/null +++ b/processor/certificatelogverifyprocessor/internal/metadata/metadata.go @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metadata + +import ( + "go.opentelemetry.io/collector/component" +) + +var ( + Type = component.MustNewType("certificatelogverify") +) + +const ( + LogsStability = component.StabilityLevelAlpha + TracesStability = component.StabilityLevelUndefined + MetricsStability = component.StabilityLevelUndefined +) diff --git a/processor/certificatelogverifyprocessor/k8s/Dockerfile b/processor/certificatelogverifyprocessor/k8s/Dockerfile new file mode 100644 index 0000000000000..8268261e184e2 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s/Dockerfile @@ -0,0 +1,20 @@ +FROM golang:1.26@sha256:fb612b7831d53a89cbc0aaa7855b69ad7b0caf603715860cf538df854d047b84 AS build + +WORKDIR /src +ADD . /src + +RUN make otelcontribcol + +FROM alpine:latest@sha256:25109184c71bdad752c8312a8623239686a9a2071e8825f20acb8f2198c3f659 AS certs +RUN apk --update add ca-certificates + +FROM scratch + +ARG USER_UID=10001 +USER ${USER_UID} + +COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +COPY --from=build /src/bin/otelcontribcol_linux_amd64 /otelcontribcol +ENTRYPOINT ["/otelcontribcol"] +EXPOSE 4317 4318 55680 55679 +CMD ["--config", "/etc/otelcol/config.yaml"] diff --git a/processor/certificatelogverifyprocessor/k8s/configmap.yaml b/processor/certificatelogverifyprocessor/k8s/configmap.yaml new file mode 100644 index 0000000000000..b5d0def623166 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s/configmap.yaml @@ -0,0 +1,38 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: otelcol-config + namespace: otel-demo +data: + config.yaml: | + receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + + processors: + certificatelogverify: + hash_algorithm: SHA256 + sign_content: body + k8s_secret: + name: otelcol-test-certs + namespace: otel-demo + cert_key: cert.pem + + exporters: + debug: + verbosity: detailed + otlp: + endpoint: otel-collector:4317 + tls: + insecure: true + + service: + pipelines: + logs: + receivers: [otlp] + processors: [certificatelogverify] + exporters: [debug] diff --git a/processor/certificatelogverifyprocessor/k8s/create-k8s-secret.ps1 b/processor/certificatelogverifyprocessor/k8s/create-k8s-secret.ps1 new file mode 100644 index 0000000000000..281876f70fca9 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s/create-k8s-secret.ps1 @@ -0,0 +1,48 @@ +$ErrorActionPreference = "Stop" + +Write-Host "========================================" -ForegroundColor Cyan +Write-Host "Create K8s Secret for Certificates" -ForegroundColor Cyan +Write-Host "========================================" -ForegroundColor Cyan +Write-Host "" + +$namespace = "otel-demo" +$secretName = "otelcol-test-certs" + +Write-Host "Step 1: Generate certificates and create Kubernetes secret..." -ForegroundColor Yellow +Push-Location $PSScriptRoot\..\..\.. +if (Test-Path "create-cert.ps1") { + .\create-cert.ps1 -namespace $namespace -secretName $secretName + if ($LASTEXITCODE -ne 0) { + Write-Host "Error: Failed to create certificate in Kubernetes" -ForegroundColor Red + Pop-Location + exit 1 + } +} else { + Write-Host "Error: create-cert.ps1 not found in repo root" -ForegroundColor Red + Pop-Location + exit 1 +} + +Write-Host "" +Write-Host "Step 2: Verify secret exists..." -ForegroundColor Yellow +kubectl get secret $secretName -n $namespace + +Write-Host "" +Write-Host "Step 3: Verify secret keys..." -ForegroundColor Yellow +$secret = kubectl get secret $secretName -n $namespace -o jsonpath='{.data}' | ConvertFrom-Json +Write-Host "Secret contains keys:" -ForegroundColor Cyan +$secret.PSObject.Properties.Name | ForEach-Object { + Write-Host " - $_" -ForegroundColor White +} + +Write-Host "" +Write-Host "========================================" -ForegroundColor Cyan +Write-Host "SUCCESS! Secret is ready" -ForegroundColor Green +Write-Host "========================================" -ForegroundColor Cyan +Write-Host "" +Write-Host "Secret Name: $secretName" -ForegroundColor White +Write-Host "Namespace: $namespace" -ForegroundColor White +Write-Host "" +Write-Host "You can now deploy the collector with K8s secret configuration." -ForegroundColor Yellow + +Pop-Location diff --git a/processor/certificatelogverifyprocessor/k8s/debug-secret-data.ps1 b/processor/certificatelogverifyprocessor/k8s/debug-secret-data.ps1 new file mode 100644 index 0000000000000..ba633c55c7662 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s/debug-secret-data.ps1 @@ -0,0 +1,95 @@ +$ErrorActionPreference = "Stop" + +Write-Host "Debugging K8s Secret Data Format" -ForegroundColor Cyan +Write-Host "=================================" -ForegroundColor Cyan +Write-Host "" + +$namespace = "otel-demo" +$secretName = "otelcol-test-certs" + +$secret = kubectl get secret $secretName -n $namespace -o json | ConvertFrom-Json + +Write-Host "Secret keys:" -ForegroundColor Yellow +$secret.data.PSObject.Properties.Name | ForEach-Object { + Write-Host " - $_" -ForegroundColor White +} + +Write-Host "" +Write-Host "Certificate data (first 200 chars):" -ForegroundColor Yellow +$certBase64 = $secret.data.'cert.pem' +if ($certBase64) { + try { + $certBytes = [System.Convert]::FromBase64String($certBase64) + $certText = [System.Text.Encoding]::UTF8.GetString($certBytes) + if ($certText.Length -gt 0) { + Write-Host $certText.Substring(0, [Math]::Min(200, $certText.Length)) -ForegroundColor Gray + } else { + Write-Host " Certificate text is empty after decoding" -ForegroundColor Red + } + } catch { + Write-Host " Error decoding base64: $_" -ForegroundColor Red + Write-Host " Base64 length: $($certBase64.Length)" -ForegroundColor Yellow + Write-Host " First 50 chars: $($certBase64.Substring(0, [Math]::Min(50, $certBase64.Length)))" -ForegroundColor Yellow + } +} else { + Write-Host " Certificate data is null or empty" -ForegroundColor Red +} + +Write-Host "" +Write-Host "Certificate line endings:" -ForegroundColor Yellow +if ($certText -match "`r`n") { + Write-Host " Windows line endings (CRLF) detected" -ForegroundColor Yellow +} elseif ($certText -match "`n") { + Write-Host " Unix line endings (LF) detected" -ForegroundColor Green +} else { + Write-Host " No line endings detected" -ForegroundColor Red +} + +Write-Host "" +Write-Host "Private key data (first 200 chars):" -ForegroundColor Yellow +$keyBase64 = $secret.data.'key.pem' +if ($keyBase64) { + try { + $keyBytes = [System.Convert]::FromBase64String($keyBase64) + $keyText = [System.Text.Encoding]::UTF8.GetString($keyBytes) + if ($keyText.Length -gt 0) { + Write-Host $keyText.Substring(0, [Math]::Min(200, $keyText.Length)) -ForegroundColor Gray + } else { + Write-Host " Private key text is empty after decoding" -ForegroundColor Red + } + } catch { + Write-Host " Error decoding base64: $_" -ForegroundColor Red + Write-Host " Base64 length: $($keyBase64.Length)" -ForegroundColor Yellow + Write-Host " First 50 chars: $($keyBase64.Substring(0, [Math]::Min(50, $keyBase64.Length)))" -ForegroundColor Yellow + } +} else { + Write-Host " Private key data is null or empty" -ForegroundColor Red +} + +Write-Host "" +Write-Host "Private key line endings:" -ForegroundColor Yellow +if ($keyText -match "`r`n") { + Write-Host " Windows line endings (CRLF) detected" -ForegroundColor Yellow +} elseif ($keyText -match "`n") { + Write-Host " Unix line endings (LF) detected" -ForegroundColor Green +} else { + Write-Host " No line endings detected" -ForegroundColor Red +} + +Write-Host "" +Write-Host "Certificate PEM header check:" -ForegroundColor Yellow +if ($certText -match "-----BEGIN CERTIFICATE-----") { + Write-Host " ✓ Valid PEM header found" -ForegroundColor Green +} else { + Write-Host " ✗ Invalid PEM header" -ForegroundColor Red +} + +Write-Host "" +Write-Host "Private key PEM header check:" -ForegroundColor Yellow +if ($keyText -match "-----BEGIN.*PRIVATE KEY-----") { + Write-Host " ✓ Valid PEM header found" -ForegroundColor Green + $keyType = if ($keyText -match "-----BEGIN RSA PRIVATE KEY-----") { "RSA PRIVATE KEY" } else { "PRIVATE KEY" } + Write-Host " Key type: $keyType" -ForegroundColor Cyan +} else { + Write-Host " ✗ Invalid PEM header" -ForegroundColor Red +} diff --git a/processor/certificatelogverifyprocessor/k8s/deploy.ps1 b/processor/certificatelogverifyprocessor/k8s/deploy.ps1 new file mode 100644 index 0000000000000..3adfcef39b177 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s/deploy.ps1 @@ -0,0 +1,54 @@ +$ErrorActionPreference = "Stop" + +Write-Host "========================================" -ForegroundColor Cyan +Write-Host "Deploy Certificate Log Verify Processor" -ForegroundColor Cyan +Write-Host "========================================" -ForegroundColor Cyan +Write-Host "" + +$namespace = "otel-demo" +$secretName = "otelcol-test-certs" + +Write-Host "Step 1: Creating namespace..." -ForegroundColor Yellow +kubectl apply -f namespace.yaml + +Write-Host "" +Write-Host "Step 2: Creating RBAC resources..." -ForegroundColor Yellow +kubectl apply -f rbac.yaml + +Write-Host "" +Write-Host "Step 3: Creating ConfigMap..." -ForegroundColor Yellow +kubectl apply -f configmap.yaml + +Write-Host "" +Write-Host "Step 4: Creating Deployment..." -ForegroundColor Yellow +kubectl apply -f deployment.yaml + +Write-Host "" +Write-Host "Step 5: Creating Service..." -ForegroundColor Yellow +kubectl apply -f service.yaml + +Write-Host "" +Write-Host "Step 6: Waiting for deployment to be ready..." -ForegroundColor Yellow +kubectl wait --for=condition=available --timeout=300s deployment/otelcol-certificatelogverify -n $namespace + +if ($LASTEXITCODE -eq 0) { + Write-Host "" + Write-Host "========================================" -ForegroundColor Green + Write-Host "SUCCESS! Deployment is ready" -ForegroundColor Green + Write-Host "========================================" -ForegroundColor Green + Write-Host "" + Write-Host "Pods:" -ForegroundColor Cyan + kubectl get pods -n $namespace -l app=otelcol-certificatelogverify + Write-Host "" + Write-Host "Service:" -ForegroundColor Cyan + kubectl get svc -n $namespace otelcol-certificatelogverify + Write-Host "" + Write-Host "To view logs:" -ForegroundColor Yellow + Write-Host " kubectl logs -n $namespace -l app=otelcol-certificatelogverify" -ForegroundColor White +} else { + Write-Host "" + Write-Host "WARNING: Deployment not ready within timeout" -ForegroundColor Yellow + Write-Host "Checking pod status..." -ForegroundColor Yellow + kubectl get pods -n $namespace -l app=otelcol-certificatelogverify + kubectl describe pod -n $namespace -l app=otelcol-certificatelogverify | Select-String -Pattern "Error|Warning" -Context 2 +} diff --git a/processor/certificatelogverifyprocessor/k8s/deploy.sh b/processor/certificatelogverifyprocessor/k8s/deploy.sh new file mode 100644 index 0000000000000..033fd074ea8f5 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s/deploy.sh @@ -0,0 +1,54 @@ +#!/bin/bash +set -e + +echo "========================================" +echo "Deploy Certificate Log Verify Processor" +echo "========================================" +echo "" + +NAMESPACE="otel-demo" + +echo "Step 1: Creating namespace..." +kubectl apply -f namespace.yaml + +echo "" +echo "Step 2: Creating RBAC resources..." +kubectl apply -f rbac.yaml + +echo "" +echo "Step 3: Creating ConfigMap..." +kubectl apply -f configmap.yaml + +echo "" +echo "Step 4: Creating Deployment..." +kubectl apply -f deployment.yaml + +echo "" +echo "Step 5: Creating Service..." +kubectl apply -f service.yaml + +echo "" +echo "Step 6: Waiting for deployment to be ready..." +kubectl wait --for=condition=available --timeout=300s deployment/otelcol-certificatelogverify -n $NAMESPACE + +if [ $? -eq 0 ]; then + echo "" + echo "========================================" + echo "SUCCESS! Deployment is ready" + echo "========================================" + echo "" + echo "Pods:" + kubectl get pods -n $NAMESPACE -l app=otelcol-certificatelogverify + echo "" + echo "Service:" + kubectl get svc -n $NAMESPACE otelcol-certificatelogverify + echo "" + echo "To view logs:" + echo " kubectl logs -n $NAMESPACE -l app=otelcol-certificatelogverify" +else + echo "" + echo "WARNING: Deployment not ready within timeout" + echo "Checking pod status..." + kubectl get pods -n $NAMESPACE -l app=otelcol-certificatelogverify + kubectl describe pod -n $NAMESPACE -l app=otelcol-certificatelogverify | grep -E "Error|Warning" -A 2 +fi diff --git a/processor/certificatelogverifyprocessor/k8s/deployment.yaml b/processor/certificatelogverifyprocessor/k8s/deployment.yaml new file mode 100644 index 0000000000000..d51c4323d5090 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s/deployment.yaml @@ -0,0 +1,59 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: otelcol-certificatelogverify + namespace: otel-demo + labels: + app: otelcol-certificatelogverify +spec: + replicas: 1 + selector: + matchLabels: + app: otelcol-certificatelogverify + template: + metadata: + labels: + app: otelcol-certificatelogverify + spec: + serviceAccountName: otelcol-certificatelogverify + containers: + - name: otelcol + image: certificatelogverify-collector:latest + imagePullPolicy: IfNotPresent + ports: + - containerPort: 4317 + name: otlp-grpc + protocol: TCP + - containerPort: 4318 + name: otlp-http + protocol: TCP + - containerPort: 8888 + name: metrics + protocol: TCP + volumeMounts: + - name: config + mountPath: /etc/otelcol + readOnly: true + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 128Mi + livenessProbe: + httpGet: + path: / + port: 8888 + initialDelaySeconds: 10 + periodSeconds: 10 + readinessProbe: + httpGet: + path: / + port: 8888 + initialDelaySeconds: 5 + periodSeconds: 5 + volumes: + - name: config + configMap: + name: otelcol-config diff --git a/processor/certificatelogverifyprocessor/k8s/kustomization.yaml b/processor/certificatelogverifyprocessor/k8s/kustomization.yaml new file mode 100644 index 0000000000000..cec01ea572f48 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s/kustomization.yaml @@ -0,0 +1,11 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: otel-demo + +resources: + - namespace.yaml + - rbac.yaml + - configmap.yaml + - deployment.yaml + - service.yaml diff --git a/processor/certificatelogverifyprocessor/k8s/namespace.yaml b/processor/certificatelogverifyprocessor/k8s/namespace.yaml new file mode 100644 index 0000000000000..eff30a7c45472 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s/namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: otel-demo diff --git a/processor/certificatelogverifyprocessor/k8s/rbac.yaml b/processor/certificatelogverifyprocessor/k8s/rbac.yaml new file mode 100644 index 0000000000000..7a1804b3d7134 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s/rbac.yaml @@ -0,0 +1,30 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: otelcol-certificatelogverify + namespace: otel-demo +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: otelcol-secret-reader + namespace: otel-demo +rules: +- apiGroups: [""] + resources: ["secrets"] + resourceNames: ["otelcol-test-certs"] + verbs: ["get"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: otelcol-secret-reader + namespace: otel-demo +subjects: +- kind: ServiceAccount + name: otelcol-certificatelogverify + namespace: otel-demo +roleRef: + kind: Role + name: otelcol-secret-reader + apiGroup: rbac.authorization.k8s.io diff --git a/processor/certificatelogverifyprocessor/k8s/service.yaml b/processor/certificatelogverifyprocessor/k8s/service.yaml new file mode 100644 index 0000000000000..a5bbfbaf2a310 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s/service.yaml @@ -0,0 +1,24 @@ +apiVersion: v1 +kind: Service +metadata: + name: otelcol-certificatelogverify + namespace: otel-demo + labels: + app: otelcol-certificatelogverify +spec: + type: ClusterIP + selector: + app: otelcol-certificatelogverify + ports: + - name: otlp-grpc + port: 4317 + targetPort: 4317 + protocol: TCP + - name: otlp-http + port: 4318 + targetPort: 4318 + protocol: TCP + - name: metrics + port: 8888 + targetPort: 8888 + protocol: TCP diff --git a/processor/certificatelogverifyprocessor/k8s/verify/log-from-collector.json b/processor/certificatelogverifyprocessor/k8s/verify/log-from-collector.json new file mode 100644 index 0000000000000..e9bfd83ea0c85 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s/verify/log-from-collector.json @@ -0,0 +1,11 @@ +{ + "severity_number": 9, + "timestamp": 1772709152367000000, + "body": "Test log message from PowerShell", + "attributes": { + "test.attribute": "test-value", + "otel.log.hash": "k5+tWJ2OAQyUfck+iSY6Lt7wqmxRA4j9jqMOBy3Ysos=", + "otel.log.signature": "L/GdpgM9Dz5JOVANj3MTiTzVgYN8KbU8jSWlIxa1BJ0MYXbipmTzGHxU2fhFD8wgtX25Lgt8NKw0yiEU1hcOEpXFF3nyB16A1ZPb/eD0skrDSceI5XOEpA8eJt6VPnhMARBFbekBofhBFsZaQGZjtnUofVUYEkUeXFiVasIWO3TIZb9+pNyh61vZA4t8n8o0bZ/HpO1MvnQ4bRpLQT1J6daud9DH3k+8G8v8vw9xzUy2VT81ZBCZaLix02U+B7g2Wl8POObT3W7+Na9E0PNGpSnA40L+1Ox1e1qY3Otn2cCBLiOdnjnVUFlTYpR9w6d0SQNgnwJ8IoxqTFVaXcTDtw==" + }, + "severity_text": "INFO" +} \ No newline at end of file diff --git a/processor/certificatelogverifyprocessor/k8s_secret_reader.go b/processor/certificatelogverifyprocessor/k8s_secret_reader.go new file mode 100644 index 0000000000000..80fb06c4b99f3 --- /dev/null +++ b/processor/certificatelogverifyprocessor/k8s_secret_reader.go @@ -0,0 +1,145 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package certificatelogverifyprocessor + +import ( + "context" + "fmt" + "os" + "path/filepath" + "time" + + "go.uber.org/zap" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +func getK8sClient() (*kubernetes.Clientset, error) { + var config *rest.Config + var err error + + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig == "" { + home := os.Getenv("HOME") + if home == "" { + home = os.Getenv("USERPROFILE") + } + if home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } + } + + if kubeconfig != "" { + if _, err := os.Stat(kubeconfig); err == nil { + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to build config from kubeconfig: %w", err) + } + } + } + + if config == nil { + config, err = rest.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("failed to get in-cluster config (trying to use service account token): %w", err) + } + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to create kubernetes client: %w", err) + } + + return clientset, nil +} + +func fetchSecretData(ctx context.Context, secretName, namespace, key string, logger *zap.Logger) ([]byte, error) { + if logger != nil { + logger.Info("Fetching secret data", + zap.String("secret", fmt.Sprintf("%s/%s", namespace, secretName)), + zap.String("key", key), + ) + } + clientset, err := getK8sClient() + if err != nil { + if logger != nil { + logger.Error("Failed to create k8s client", zap.Error(err)) + } + return nil, fmt.Errorf("failed to create k8s client: %w", err) + } + + maxRetries := 30 + retryDelay := 2 * time.Second + var lastErr error + + for attempt := 0; attempt < maxRetries; attempt++ { + secret, err := clientset.CoreV1().Secrets(namespace).Get(ctx, secretName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + if attempt < maxRetries-1 { + if logger != nil { + logger.Info("Secret not found, retrying...", + zap.String("secret", fmt.Sprintf("%s/%s", namespace, secretName)), + zap.Int("attempt", attempt+1), + zap.Int("max_attempts", maxRetries), + zap.Duration("retry_delay", retryDelay), + ) + } + time.Sleep(retryDelay) + retryDelay = time.Duration(float64(retryDelay) * 1.5) + if retryDelay > 10*time.Second { + retryDelay = 10 * time.Second + } + continue + } + lastErr = fmt.Errorf("secret %s/%s not found after %d attempts", namespace, secretName, maxRetries) + } else { + if logger != nil { + logger.Error("Failed to get secret", + zap.Error(err), + zap.String("secret", fmt.Sprintf("%s/%s", namespace, secretName)), + zap.Int("attempt", attempt+1), + ) + } + return nil, fmt.Errorf("failed to get secret %s/%s: %w", namespace, secretName, err) + } + } else { + if logger != nil && attempt > 0 { + logger.Info("Secret found successfully", + zap.String("secret", fmt.Sprintf("%s/%s", namespace, secretName)), + zap.Int("attempts", attempt+1), + ) + } + data, exists := secret.Data[key] + if !exists { + availableKeys := make([]string, 0, len(secret.Data)) + for k := range secret.Data { + availableKeys = append(availableKeys, k) + } + err := fmt.Errorf("key %s not found in secret %s/%s (available keys: %v)", key, namespace, secretName, availableKeys) + if logger != nil { + logger.Error("Key not found in secret", + zap.String("key", key), + zap.String("secret", fmt.Sprintf("%s/%s", namespace, secretName)), + zap.Strings("available_keys", availableKeys), + ) + } + return nil, err + } + if logger != nil { + logger.Info("Successfully fetched secret data", + zap.String("secret", fmt.Sprintf("%s/%s", namespace, secretName)), + zap.String("key", key), + zap.Int("data_length", len(data)), + ) + } + return data, nil + } + } + + return nil, lastErr +} diff --git a/processor/certificatelogverifyprocessor/metadata.yaml b/processor/certificatelogverifyprocessor/metadata.yaml new file mode 100644 index 0000000000000..5be80f5b6205c --- /dev/null +++ b/processor/certificatelogverifyprocessor/metadata.yaml @@ -0,0 +1,24 @@ +type: certificatelogverify + +status: + class: processor + stability: + alpha: [logs] + distributions: [contrib] + warnings: [] + codeowners: + active: [] + +tests: + config: + +telemetry: + metrics: + certificatelogverify_processor_processed_logs: + description: Number of log records verified with certificate signature. + stability: + level: development + unit: "{records}" + enabled: true + histogram: + value_type: int diff --git a/processor/certificatelogverifyprocessor/processor.go b/processor/certificatelogverifyprocessor/processor.go new file mode 100644 index 0000000000000..eeab2a64ea286 --- /dev/null +++ b/processor/certificatelogverifyprocessor/processor.go @@ -0,0 +1,314 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package certificatelogverifyprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/certificatelogverifyprocessor" + +import ( + "context" + "crypto" + "crypto/rsa" + "crypto/subtle" + "encoding/base64" + "encoding/json" + "fmt" + "sort" + "strings" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/processor" + "go.uber.org/zap" +) + +const ( + hashAttributeKey = "otel.log.hash" + signatureAttributeKey = "otel.log.signature" + signContentAttributeKey = "otel.log.sign_content" +) + +type certificateHashProcessor struct { + config *Config + logger *zap.Logger + nextLogs consumer.Logs + reader *CertificateReader + hashAlgo crypto.Hash +} + +func newProcessor(cfg *Config, nextLogs consumer.Logs, settings processor.Settings) (*certificateHashProcessor, error) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + logger := settings.Logger.WithOptions(zap.AddStacktrace(zap.PanicLevel)) + + settings.Logger.Info("Initializing certificate reader from Kubernetes secret for verification", + zap.String("secret", cfg.K8sSecret.Name), + zap.String("namespace", cfg.K8sSecret.Namespace), + zap.String("cert_key", cfg.K8sSecret.CertKey), + ) + reader, err := NewCertificateReaderFromK8sSecretForVerification(ctx, cfg.K8sSecret, logger) + if err != nil { + logger.Error("Failed to initialize certificate reader from k8s secret", + zap.Error(err), + zap.String("secret", cfg.K8sSecret.Name), + zap.String("namespace", cfg.K8sSecret.Namespace), + ) + return nil, fmt.Errorf("failed to initialize certificate reader from k8s secret: %w", err) + } + logger.Info("Successfully initialized certificate reader from Kubernetes secret", + zap.String("secret", cfg.K8sSecret.Name), + zap.String("namespace", cfg.K8sSecret.Namespace), + ) + + hashAlgo := cfg.GetHash() + if hashAlgo != crypto.SHA256 && hashAlgo != crypto.SHA512 { + return nil, fmt.Errorf("unsupported hash algorithm") + } + + return &certificateHashProcessor{ + config: cfg, + logger: logger, + nextLogs: nextLogs, + reader: reader, + hashAlgo: hashAlgo, + }, nil +} + +func (p *certificateHashProcessor) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (p *certificateHashProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + resourceLogs := ld.ResourceLogs() + resourceLogs.RemoveIf(func(rl plog.ResourceLogs) bool { + rl.ScopeLogs().RemoveIf(func(sl plog.ScopeLogs) bool { + sl.LogRecords().RemoveIf(func(lr plog.LogRecord) bool { + if err := p.verifyLogRecord(lr); err != nil { + p.logger.Error("Failed to verify log record", zap.Error(err)) + return true + } + return false + }) + return sl.LogRecords().Len() == 0 + }) + return rl.ScopeLogs().Len() == 0 + }) + + if ld.LogRecordCount() == 0 { + return nil + } + + return p.nextLogs.ConsumeLogs(ctx, ld) +} + +func (p *certificateHashProcessor) verifyLogRecord(lr plog.LogRecord) error { + hashAttr, hashExists := lr.Attributes().Get(hashAttributeKey) + if !hashExists { + return fmt.Errorf("missing required attribute: %s", hashAttributeKey) + } + + signatureAttr, sigExists := lr.Attributes().Get(signatureAttributeKey) + if !sigExists { + return fmt.Errorf("missing required attribute: %s", signatureAttributeKey) + } + + signContentAttr, signContentExists := lr.Attributes().Get(signContentAttributeKey) + var signContent string + if signContentExists && signContentAttr.Str() != "" { + signContent = signContentAttr.Str() + } else { + signContent = p.config.SignContent + if signContent == "" { + signContent = defaultSignContent + } + } + + receivedHashBase64 := hashAttr.Str() + receivedSignatureBase64 := signatureAttr.Str() + + if receivedHashBase64 == "" { + return fmt.Errorf("hash attribute is empty") + } + if receivedSignatureBase64 == "" { + return fmt.Errorf("signature attribute is empty") + } + + receivedHash, err := base64.StdEncoding.DecodeString(receivedHashBase64) + if err != nil { + return fmt.Errorf("failed to decode hash: %w", err) + } + + receivedSignature, err := base64.StdEncoding.DecodeString(receivedSignatureBase64) + if err != nil { + return fmt.Errorf("failed to decode signature: %w", err) + } + + logData, err := p.serializeLogRecord(lr, signContent) + if err != nil { + return fmt.Errorf("failed to serialize log record: %w", err) + } + + if len(logData) == 0 { + return fmt.Errorf("serialized log data is empty (sign_content: %s)", signContent) + } + + h := p.hashAlgo.New() + if _, err := h.Write(logData); err != nil { + return fmt.Errorf("failed to compute hash: %w", err) + } + computedHash := h.Sum(nil) + + if !equalHashes(computedHash, receivedHash) { + p.logger.Info("Hash mismatch detected", + zap.String("algorithm", p.config.HashAlgorithm), + zap.String("sign_content", signContent), + zap.String("computed_hash", fmt.Sprintf("%x", computedHash)), + zap.String("received_hash", fmt.Sprintf("%x", receivedHash)), + ) + return fmt.Errorf("hash mismatch (algorithm: %s, sign_content: %s): computed %x, received %x", + p.config.HashAlgorithm, signContent, computedHash, receivedHash) + } + + if p.reader == nil { + return fmt.Errorf("certificate reader is nil") + } + + cert := p.reader.GetCertificate() + if cert == nil { + return fmt.Errorf("certificate is nil") + } + + publicKey, ok := cert.PublicKey.(*rsa.PublicKey) + if !ok { + return fmt.Errorf("certificate public key is not RSA") + } + + err = rsa.VerifyPKCS1v15(publicKey, p.hashAlgo, computedHash, receivedSignature) + if err != nil { + return fmt.Errorf("signature verification failed (algorithm: %s): %w", p.config.HashAlgorithm, err) + } + + p.logger.Info("Log record verification successful", + zap.String("sign_content", signContent), + ) + + return nil +} + +func (p *certificateHashProcessor) serializeLogRecord(lr plog.LogRecord, signContent string) ([]byte, error) { + data := make(map[string]interface{}) + + if signContent == SignContentBody || signContent == SignContentMeta || signContent == SignContentAttr { + if lr.Body().Type() == pcommon.ValueTypeStr { + data["body"] = lr.Body().Str() + } + } + + if signContent == SignContentMeta || signContent == SignContentAttr { + if lr.Timestamp() != 0 { + data["timestamp"] = lr.Timestamp().AsTime().UnixNano() + } + + if lr.SeverityNumber() != 0 { + data["severity_number"] = lr.SeverityNumber() + } + + if lr.SeverityText() != "" { + data["severity_text"] = lr.SeverityText() + } + + if !lr.TraceID().IsEmpty() { + data["trace_id"] = lr.TraceID().String() + } + + if !lr.SpanID().IsEmpty() { + data["span_id"] = lr.SpanID().String() + } + } + + if signContent == SignContentAttr { + attrs := make(map[string]interface{}) + lr.Attributes().Range(func(k string, v pcommon.Value) bool { + if !strings.HasPrefix(k, "otel.log.") { + attrs[k] = p.valueToInterface(v) + } + return true + }) + data["attributes"] = attrs + } + + return p.marshalJSONDeterministic(data) +} + +func (p *certificateHashProcessor) marshalJSONDeterministic(v interface{}) ([]byte, error) { + sorted := p.sortMapKeys(v) + return json.Marshal(sorted) +} + +func (p *certificateHashProcessor) sortMapKeys(v interface{}) interface{} { + switch val := v.(type) { + case map[string]interface{}: + sorted := make(map[string]interface{}) + keys := make([]string, 0, len(val)) + for k := range val { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + sorted[k] = p.sortMapKeys(val[k]) + } + return sorted + case []interface{}: + sorted := make([]interface{}, len(val)) + for i, item := range val { + sorted[i] = p.sortMapKeys(item) + } + return sorted + default: + return val + } +} + +func (p *certificateHashProcessor) valueToInterface(v pcommon.Value) interface{} { + switch v.Type() { + case pcommon.ValueTypeStr: + return v.Str() + case pcommon.ValueTypeInt: + return v.Int() + case pcommon.ValueTypeDouble: + return v.Double() + case pcommon.ValueTypeBool: + return v.Bool() + case pcommon.ValueTypeBytes: + return base64.StdEncoding.EncodeToString(v.Bytes().AsRaw()) + case pcommon.ValueTypeSlice: + slice := make([]interface{}, v.Slice().Len()) + for i := 0; i < v.Slice().Len(); i++ { + slice[i] = p.valueToInterface(v.Slice().At(i)) + } + return slice + case pcommon.ValueTypeMap: + m := make(map[string]interface{}) + v.Map().Range(func(k string, val pcommon.Value) bool { + m[k] = p.valueToInterface(val) + return true + }) + return m + default: + return nil + } +} + +func equalHashes(a, b []byte) bool { + return len(a) == len(b) && subtle.ConstantTimeCompare(a, b) == 1 +} + +func (p *certificateHashProcessor) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (p *certificateHashProcessor) Shutdown(_ context.Context) error { + return nil +}