diff --git a/cmd/build.go b/cmd/build.go index af911470..415827cb 100644 --- a/cmd/build.go +++ b/cmd/build.go @@ -20,13 +20,9 @@ import ( "github.com/gookit/color" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "go.opentelemetry.io/otel" - sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/attribute" ) -// CleanupFunc is a function that performs cleanup operations and must be deferred -type CleanupFunc func() - // buildCmd represents the build command var buildCmd = &cobra.Command{ Use: "build [targetPackage]", @@ -58,13 +54,18 @@ Examples: }, } -func build(cmd *cobra.Command, args []string) error { +func build(cmd *cobra.Command, args []string) (buildErr error) { + _, span := telemetry.StartSpan(cmd.Context(), "leeway.build") + defer telemetry.FinishSpan(span, &buildErr) + _, pkg, _, _ := getTarget(args, false) if pkg == nil { return errors.New("build needs a package") } - opts, localCache, shutdown := getBuildOpts(cmd) - defer shutdown() + + span.SetAttributes(attribute.String("leeway.target.package", pkg.FullName())) + + opts, localCache := getBuildOpts(cmd) var ( watch, _ = cmd.Flags().GetBool("watch") @@ -240,13 +241,9 @@ func addBuildFlags(cmd *cobra.Command) { cmd.Flags().Bool("report-github", os.Getenv("GITHUB_OUTPUT") != "", "Report package build success/failure to GitHub Actions using the GITHUB_OUTPUT environment variable") cmd.Flags().Bool("fixed-build-dir", true, "Use a fixed build directory for each package, instead of based on the package version, to better utilize caches based on absolute paths (defaults to true)") cmd.Flags().Bool("docker-export-to-cache", false, "Export Docker images to cache instead of pushing directly (enables SLSA L3 compliance)") - cmd.Flags().String("otel-endpoint", os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"), "OpenTelemetry OTLP endpoint URL for tracing (defaults to $OTEL_EXPORTER_OTLP_ENDPOINT)") - cmd.Flags().Bool("otel-insecure", os.Getenv("OTEL_EXPORTER_OTLP_INSECURE") == "true", "Disable TLS for OTLP endpoint (for local development only, defaults to $OTEL_EXPORTER_OTLP_INSECURE)") - cmd.Flags().String("trace-parent", os.Getenv("TRACEPARENT"), "W3C Trace Context traceparent header for distributed tracing (defaults to $TRACEPARENT)") - cmd.Flags().String("trace-state", os.Getenv("TRACESTATE"), "W3C Trace Context tracestate header for distributed tracing (defaults to $TRACESTATE)") } -func getBuildOpts(cmd *cobra.Command) ([]leeway.BuildOption, cache.LocalCache, CleanupFunc) { +func getBuildOpts(cmd *cobra.Command) ([]leeway.BuildOption, cache.LocalCache) { // Track if user explicitly set LEEWAY_DOCKER_EXPORT_TO_CACHE before workspace loading. // This allows us to distinguish: // - User set explicitly: High priority (overrides package config) @@ -347,59 +344,9 @@ func getBuildOpts(cmd *cobra.Command) ([]leeway.BuildOption, cache.LocalCache, C reporter = append(reporter, leeway.NewGitHubReporter()) } - // Initialize OpenTelemetry reporter if endpoint is configured - var tracerProvider *sdktrace.TracerProvider - var otelShutdown func() - if otelEndpoint, err := cmd.Flags().GetString("otel-endpoint"); err != nil { - log.Fatal(err) - } else if otelEndpoint != "" { - // Set leeway version for telemetry - telemetry.SetLeewayVersion(leeway.Version) - - // Get insecure flag - otelInsecure, err := cmd.Flags().GetBool("otel-insecure") - if err != nil { - log.Fatal(err) - } - - // Initialize tracer with the provided endpoint and TLS configuration - tp, err := telemetry.InitTracer(context.Background(), otelEndpoint, otelInsecure) - if err != nil { - log.WithError(err).Warn("failed to initialize OpenTelemetry tracer") - } else { - tracerProvider = tp - - // Parse trace context if provided - traceParent, _ := cmd.Flags().GetString("trace-parent") - traceState, _ := cmd.Flags().GetString("trace-state") - - parentCtx := context.Background() - if traceParent != "" { - if err := telemetry.ValidateTraceParent(traceParent); err != nil { - log.WithError(err).Warn("invalid trace-parent format") - } else { - ctx, err := telemetry.ParseTraceContext(traceParent, traceState) - if err != nil { - log.WithError(err).Warn("failed to parse trace context") - } else { - parentCtx = ctx - } - } - } - - // Create OTel reporter - tracer := otel.Tracer("leeway") - reporter = append(reporter, leeway.NewOTelReporter(tracer, parentCtx)) - - // Create shutdown function - otelShutdown = func() { - shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := telemetry.Shutdown(shutdownCtx, tracerProvider); err != nil { - log.WithError(err).Warn("failed to shutdown tracer provider") - } - } - } + // Add OpenTelemetry reporter if tracing is enabled + if telemetry.Enabled() { + reporter = append(reporter, leeway.NewOTelReporter(telemetry.Tracer(), cmd.Context())) } dontTest, err := cmd.Flags().GetBool("dont-test") @@ -465,11 +412,6 @@ func getBuildOpts(cmd *cobra.Command) ([]leeway.BuildOption, cache.LocalCache, C dockerExportSet = true } - // Create a no-op shutdown function if otelShutdown is nil - if otelShutdown == nil { - otelShutdown = func() {} - } - return []leeway.BuildOption{ leeway.WithLocalCache(localCache), leeway.WithRemoteCache(remoteCache), @@ -488,7 +430,7 @@ func getBuildOpts(cmd *cobra.Command) ([]leeway.BuildOption, cache.LocalCache, C leeway.WithInFlightChecksums(inFlightChecksums), leeway.WithDockerExportToCache(dockerExportToCache, dockerExportSet), leeway.WithDockerExportEnv(dockerExportEnvValue, dockerExportEnvSet), - }, localCache, otelShutdown + }, localCache } type pushOnlyRemoteCache struct { diff --git a/cmd/build_test.go b/cmd/build_test.go index 7a1c0213..9ea903b3 100644 --- a/cmd/build_test.go +++ b/cmd/build_test.go @@ -242,7 +242,7 @@ func TestGetBuildOptsWithInFlightChecksums(t *testing.T) { } // Test getBuildOpts function - opts, localCache, _ := getBuildOpts(cmd) + opts, localCache := getBuildOpts(cmd) // We can't directly test the WithInFlightChecksums option since it's internal, // but we can verify the function doesn't error and returns options diff --git a/cmd/provenance-assert.go b/cmd/provenance-assert.go index 2a0e7ef4..0be96f36 100644 --- a/cmd/provenance-assert.go +++ b/cmd/provenance-assert.go @@ -125,7 +125,7 @@ func getProvenanceTarget(cmd *cobra.Command, args []string) (bundleFN, pkgFN str log.Fatal("provenance export requires a package") } - _, cache, _ := getBuildOpts(cmd) + _, cache := getBuildOpts(cmd) var ok bool pkgFN, ok = cache.Location(pkg) diff --git a/cmd/root.go b/cmd/root.go index 643fdc01..9d5e6088 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -10,9 +10,12 @@ import ( "github.com/gookit/color" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "go.opentelemetry.io/otel/attribute" + otelTrace "go.opentelemetry.io/otel/trace" "golang.org/x/xerrors" "github.com/gitpod-io/leeway/pkg/leeway" + "github.com/gitpod-io/leeway/pkg/leeway/telemetry" ) const ( @@ -95,6 +98,9 @@ var ( buildArgs []string verbose bool variant string + + // commandSpan is the root span for the current command execution + commandSpan otelTrace.Span ) // rootCmd represents the base command when called without any subcommands @@ -134,6 +140,46 @@ variables have an effect on leeway: if verbose { log.SetLevel(log.DebugLevel) } + + // Initialize OpenTelemetry tracing if endpoint is configured + otelEndpoint, _ := cmd.Flags().GetString("otel-endpoint") + if otelEndpoint != "" { + telemetry.SetLeewayVersion(leeway.Version) + + otelInsecure, _ := cmd.Flags().GetBool("otel-insecure") + if err := telemetry.Initialize(cmd.Context(), otelEndpoint, otelInsecure); err != nil { + log.WithError(err).Warn("failed to initialize OpenTelemetry tracer") + } else { + // Parse trace context if provided + traceParent, _ := cmd.Flags().GetString("trace-parent") + traceState, _ := cmd.Flags().GetString("trace-state") + + parentCtx := cmd.Context() + if traceParent != "" { + if err := telemetry.ValidateTraceParent(traceParent); err != nil { + log.WithError(err).Warn("invalid trace-parent format") + } else if ctx, err := telemetry.ParseTraceContext(traceParent, traceState); err != nil { + log.WithError(err).Warn("failed to parse trace context") + } else { + parentCtx = ctx + } + } + + // Create command span and update command context + var ctx context.Context + ctx, commandSpan = telemetry.StartSpan(parentCtx, "leeway.command", + attribute.String("leeway.version", leeway.Version), + attribute.String("leeway.command", cmd.Name()), + ) + cmd.SetContext(ctx) + } + } + }, + PersistentPostRun: func(cmd *cobra.Command, args []string) { + telemetry.FinishSpan(commandSpan, nil) + if err := telemetry.Shutdown(context.Background()); err != nil { + log.WithError(err).Warn("failed to shutdown tracer provider") + } }, BashCompletionFunction: bashCompletionFunc, } @@ -183,6 +229,12 @@ func init() { rootCmd.PersistentFlags().StringVar(&variant, "variant", "", "selects a package variant") rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "enables verbose logging") rootCmd.PersistentFlags().Bool("dut", false, "used for testing only - doesn't actually do anything") + + // OpenTelemetry tracing flags + rootCmd.PersistentFlags().String("otel-endpoint", os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"), "OpenTelemetry OTLP endpoint URL for tracing (defaults to $OTEL_EXPORTER_OTLP_ENDPOINT)") + rootCmd.PersistentFlags().Bool("otel-insecure", os.Getenv("OTEL_EXPORTER_OTLP_INSECURE") == "true", "Disable TLS for OTLP endpoint (for local development only, defaults to $OTEL_EXPORTER_OTLP_INSECURE)") + rootCmd.PersistentFlags().String("trace-parent", os.Getenv("TRACEPARENT"), "W3C Trace Context traceparent header for distributed tracing (defaults to $TRACEPARENT)") + rootCmd.PersistentFlags().String("trace-state", os.Getenv("TRACESTATE"), "W3C Trace Context tracestate header for distributed tracing (defaults to $TRACESTATE)") } func getWorkspace() (leeway.Workspace, error) { diff --git a/cmd/run.go b/cmd/run.go index d673f1b3..4a08d6c6 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -27,7 +27,7 @@ Should any of the scripts fail Leeway will exit with an exit code of 1 once all if script == nil { return errors.New("run needs a script") } - opts, _, _ := getBuildOpts(cmd) + opts, _ := getBuildOpts(cmd) return script.Run(opts...) }) } diff --git a/cmd/sbom-export.go b/cmd/sbom-export.go index f1a06254..7ba35625 100644 --- a/cmd/sbom-export.go +++ b/cmd/sbom-export.go @@ -32,7 +32,7 @@ If no package is specified, the workspace's default target is used.`, } // Get build options and cache - _, localCache, _ := getBuildOpts(cmd) + _, localCache := getBuildOpts(cmd) // Get output format and file format, _ := cmd.Flags().GetString("format") diff --git a/cmd/sbom-scan.go b/cmd/sbom-scan.go index f2531278..bbb5591c 100644 --- a/cmd/sbom-scan.go +++ b/cmd/sbom-scan.go @@ -30,7 +30,7 @@ If no package is specified, the workspace's default target is used.`, } // Get cache - _, localCache, _ := getBuildOpts(cmd) + _, localCache := getBuildOpts(cmd) // Get output directory outputDir, _ := cmd.Flags().GetString("output-dir") diff --git a/pkg/leeway/cache/remote/s3.go b/pkg/leeway/cache/remote/s3.go index 33ce113b..a2ce4b1e 100644 --- a/pkg/leeway/cache/remote/s3.go +++ b/pkg/leeway/cache/remote/s3.go @@ -20,10 +20,12 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/smithy-go" log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" "golang.org/x/time/rate" "github.com/gitpod-io/leeway/pkg/leeway/cache" "github.com/gitpod-io/leeway/pkg/leeway/cache/slsa" + "github.com/gitpod-io/leeway/pkg/leeway/telemetry" ) const ( @@ -243,110 +245,37 @@ func (s *S3Cache) processPackages(ctx context.Context, pkgs []cache.Package, wor } // ExistingPackages implements RemoteCache +// Uses parallel HeadObject calls to check for specific package keys. +// This is more efficient than ListObjects for buckets with many objects. func (s *S3Cache) ExistingPackages(ctx context.Context, pkgs []cache.Package) (map[cache.Package]struct{}, error) { - result := make(map[cache.Package]struct{}) - - // Build a map of version -> package for quick lookup - versionToPackage := make(map[string]cache.Package, len(pkgs)) - for _, p := range pkgs { - version, err := p.Version() - if err != nil { - log.WithError(err).WithField("package", p.FullName()).Debug("Failed to get version for package, skipping") - continue - } - versionToPackage[version] = p - } - - if len(versionToPackage) == 0 { - return result, nil - } - - // Use ListObjectsV2 to batch check all packages in 1-2 API calls - // We list all objects and check which packages exist - // This is much faster than 2N HeadObject calls (2 per package) - if err := s.waitForRateLimit(ctx); err != nil { - log.WithError(err).Debug("Rate limiter error during batch existence check") - // Fall back to sequential checks if rate limited - return s.existingPackagesSequential(ctx, pkgs) - } - - timeoutCtx, cancel := context.WithTimeout(ctx, 60*time.Second) - defer cancel() - - // List all objects with empty prefix to get all cached artifacts - // In practice, this could be optimized with a common prefix if versions share one - objects, err := s.storage.ListObjects(timeoutCtx, "") - if err != nil { - log.WithError(err).Debug("Failed to list objects in remote cache, falling back to sequential checks") - // Fall back to sequential checks on error - return s.existingPackagesSequential(ctx, pkgs) - } - - // Build a set of existing keys for O(1) lookup - existingKeys := make(map[string]bool, len(objects)) - for _, key := range objects { - existingKeys[key] = true - } - - // Check which packages exist by looking up their keys - for version, p := range versionToPackage { - gzKey := fmt.Sprintf("%s.tar.gz", version) - tarKey := fmt.Sprintf("%s.tar", version) - - if existingKeys[gzKey] { - log.WithFields(log.Fields{ - "package": p.FullName(), - "key": gzKey, - }).Debug("found package in remote cache (.tar.gz)") - result[p] = struct{}{} - } else if existingKeys[tarKey] { - log.WithFields(log.Fields{ - "package": p.FullName(), - "key": tarKey, - }).Debug("found package in remote cache (.tar)") - result[p] = struct{}{} - } else { - log.WithFields(log.Fields{ - "package": p.FullName(), - "version": version, - }).Debug("package not found in remote cache, will build locally") - } + if len(pkgs) == 0 { + return make(map[cache.Package]struct{}), nil } - return result, nil -} + startTime := time.Now() + log.WithField("count", len(pkgs)).Debug("checking remote cache for existing packages") -// existingPackagesSequential is the fallback implementation using sequential HeadObject calls -// This is used when ListObjects fails or is rate limited -func (s *S3Cache) existingPackagesSequential(ctx context.Context, pkgs []cache.Package) (map[cache.Package]struct{}, error) { result := make(map[cache.Package]struct{}) var mu sync.Mutex err := s.processPackages(ctx, pkgs, s.workerCount, func(ctx context.Context, p cache.Package) error { version, err := p.Version() if err != nil { - return fmt.Errorf("failed to get version: %w", err) + log.WithError(err).WithField("package", p.FullName()).Debug("Failed to get version for package, skipping") + return nil } // Try .tar.gz first gzKey := fmt.Sprintf("%s.tar.gz", version) - // Wait for rate limiter permission if err := s.waitForRateLimit(ctx); err != nil { log.WithError(err).Debug("Rate limiter error during .tar.gz check") - // Continue to .tar check even if rate limited } timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() exists, err := s.storage.HasObject(timeoutCtx, gzKey) - if err != nil { - log.WithFields(log.Fields{ - "package": p.FullName(), - "key": gzKey, - "error": err, - }).Debug("failed to check .tar.gz in remote cache, will try .tar") - // Continue to check .tar format - don't return error here - } else if exists { + cancel() + + if err == nil && exists { log.WithFields(log.Fields{ "package": p.FullName(), "key": gzKey, @@ -357,27 +286,17 @@ func (s *S3Cache) existingPackagesSequential(ctx context.Context, pkgs []cache.P return nil } - // Fall back to .tar if .tar.gz doesn't exist or had error + // Try .tar if .tar.gz doesn't exist tarKey := fmt.Sprintf("%s.tar", version) - // Wait for rate limiter permission if err := s.waitForRateLimit(ctx); err != nil { log.WithError(err).Debug("Rate limiter error during .tar check") } timeoutCtx2, cancel2 := context.WithTimeout(ctx, 30*time.Second) - defer cancel2() exists, err = s.storage.HasObject(timeoutCtx2, tarKey) - if err != nil { - log.WithFields(log.Fields{ - "package": p.FullName(), - "key": tarKey, - "error": err, - }).Debug("failed to check .tar in remote cache") - // Don't return error for missing objects - this is expected - return nil // Continue with next package, will trigger local build - } + cancel2() - if exists { + if err == nil && exists { log.WithFields(log.Fields{ "package": p.FullName(), "key": tarKey, @@ -385,20 +304,23 @@ func (s *S3Cache) existingPackagesSequential(ctx context.Context, pkgs []cache.P mu.Lock() result[p] = struct{}{} mu.Unlock() - } else { - log.WithFields(log.Fields{ - "package": p.FullName(), - "version": version, - }).Debug("package not found in remote cache, will build locally") } return nil }) + duration := time.Since(startTime) if err != nil { - log.WithError(err).Warn("failed to check existing packages in remote cache") - // Return partial results even if some checks failed - return result, nil + log.WithError(err).WithFields(log.Fields{ + "duration": duration.Round(time.Millisecond), + "found": len(result), + }).Warn("error during remote cache check") + } else { + log.WithFields(log.Fields{ + "duration": duration.Round(time.Millisecond), + "checked": len(pkgs), + "found": len(result), + }).Debug("remote cache check completed") } return result, nil @@ -448,7 +370,18 @@ func (s *S3Cache) Download(ctx context.Context, dst cache.LocalCache, pkgs []cac } // downloadPackage downloads a single package and returns detailed status -func (s *S3Cache) downloadPackage(ctx context.Context, dst cache.LocalCache, p cache.Package) cache.DownloadResult { +func (s *S3Cache) downloadPackage(ctx context.Context, dst cache.LocalCache, p cache.Package) (result cache.DownloadResult) { + ctx, span := telemetry.StartSpan(ctx, "leeway.cache.download", + attribute.String("leeway.package.name", p.FullName()), + ) + defer func() { + span.SetAttributes( + attribute.String("leeway.cache.status", result.Status.String()), + attribute.Int64("leeway.cache.bytes", result.Bytes), + ) + telemetry.FinishSpan(span, &result.Err) + }() + version, err := p.Version() if err != nil { log.WithError(err).WithField("package", p.FullName()).Warn("Failed to get version for package, skipping") @@ -489,13 +422,17 @@ func (s *S3Cache) downloadOriginalResult(ctx context.Context, p cache.Package, v // Try downloading .tar.gz first with retry gzKey := fmt.Sprintf("%s.tar.gz", version) gzNotFound := false + var gzBytes int64 gzErr := withRetry(3, func() error { if err := s.waitForRateLimit(ctx); err != nil { return err } timeoutCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() - _, err := s.storage.GetObject(timeoutCtx, gzKey, localPath) + n, err := s.storage.GetObject(timeoutCtx, gzKey, localPath) + if err == nil { + gzBytes = n + } return err }) @@ -512,7 +449,7 @@ func (s *S3Cache) downloadOriginalResult(ctx context.Context, p cache.Package, v // Download SBOM files if they exist (best effort, non-blocking) s.downloadSBOMFiles(ctx, p.FullName(), gzKey, localPath) - return cache.DownloadResult{Status: cache.DownloadStatusSuccess} + return cache.DownloadResult{Status: cache.DownloadStatusSuccess, Bytes: gzBytes} } // Check if this is a "not found" error @@ -532,13 +469,17 @@ func (s *S3Cache) downloadOriginalResult(ctx context.Context, p cache.Package, v // Try .tar if .tar.gz fails, also with retry tarKey := fmt.Sprintf("%s.tar", version) + var tarBytes int64 tarErr := withRetry(3, func() error { if err := s.waitForRateLimit(ctx); err != nil { return err } timeoutCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() - _, err := s.storage.GetObject(timeoutCtx, tarKey, localPath) + n, err := s.storage.GetObject(timeoutCtx, tarKey, localPath) + if err == nil { + tarBytes = n + } return err }) @@ -555,7 +496,7 @@ func (s *S3Cache) downloadOriginalResult(ctx context.Context, p cache.Package, v // Download SBOM files if they exist (best effort, non-blocking) s.downloadSBOMFiles(ctx, p.FullName(), tarKey, localPath) - return cache.DownloadResult{Status: cache.DownloadStatusSuccess} + return cache.DownloadResult{Status: cache.DownloadStatusSuccess, Bytes: tarBytes} } // Determine if this was a "not found" or a transient failure @@ -1206,7 +1147,6 @@ type s3ClientAPI interface { CompleteMultipartUpload(ctx context.Context, params *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) CreateMultipartUpload(ctx context.Context, params *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) UploadPart(ctx context.Context, params *s3.UploadPartInput, optFns ...func(*s3.Options)) (*s3.UploadPartOutput, error) - ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) } // S3Storage implements ObjectStorage using AWS S3 @@ -1369,28 +1309,6 @@ func (s *S3Storage) UploadObject(ctx context.Context, key string, src string) er return nil } -// ListObjects implements ObjectStorage -func (s *S3Storage) ListObjects(ctx context.Context, prefix string) ([]string, error) { - var result []string - paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{ - Bucket: aws.String(s.bucketName), - Prefix: aws.String(prefix), - }) - - for paginator.HasMorePages() { - page, err := paginator.NextPage(ctx) - if err != nil { - return nil, fmt.Errorf("failed to list objects: %w", err) - } - - for _, obj := range page.Contents { - result = append(result, *obj.Key) - } - } - - return result, nil -} - // fileExists checks if a file exists and is not a directory func fileExists(filename string) bool { info, err := os.Stat(filename) diff --git a/pkg/leeway/cache/remote/s3_download_test.go b/pkg/leeway/cache/remote/s3_download_test.go index d4513798..1e40950a 100644 --- a/pkg/leeway/cache/remote/s3_download_test.go +++ b/pkg/leeway/cache/remote/s3_download_test.go @@ -77,16 +77,6 @@ func (m *mockS3Storage) UploadObject(ctx context.Context, key string, src string return nil } -func (m *mockS3Storage) ListObjects(ctx context.Context, prefix string) ([]string, error) { - var result []string - for key := range m.objects { - if len(prefix) == 0 || key[:len(prefix)] == prefix { - result = append(result, key) - } - } - return result, nil -} - func TestS3CacheDownload(t *testing.T) { tmpDir := t.TempDir() diff --git a/pkg/leeway/cache/remote/s3_performance_test.go b/pkg/leeway/cache/remote/s3_performance_test.go index ac1a6411..0b3159ff 100644 --- a/pkg/leeway/cache/remote/s3_performance_test.go +++ b/pkg/leeway/cache/remote/s3_performance_test.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "path/filepath" - "strings" "sync" "testing" "time" @@ -116,23 +115,6 @@ func (m *realisticMockS3Storage) UploadObject(ctx context.Context, key string, s return nil } -func (m *realisticMockS3Storage) ListObjects(ctx context.Context, prefix string) ([]string, error) { - // Simulate network latency for list operation - latency := m.latency - if latency == 0 { - latency = s3Latency // Default to realistic latency - } - time.Sleep(latency / 2) - - var keys []string - for key := range m.objects { - if strings.HasPrefix(key, prefix) { - keys = append(keys, key) - } - } - return keys, nil -} - // realisticMockVerifier implements realistic SLSA verification performance type realisticMockVerifier struct{} @@ -417,9 +399,8 @@ func TestS3Cache_ParallelVerificationScaling(t *testing.T) { } } -// TestS3Cache_ExistingPackagesBatchOptimization tests the ListObjects optimization -func TestS3Cache_ExistingPackagesBatchOptimization(t *testing.T) { - // Use reduced latency for fast tests +// TestS3Cache_ExistingPackagesParallel tests parallel HeadObject checks +func TestS3Cache_ExistingPackagesParallel(t *testing.T) { packageCounts := []int{10, 50, 100} for _, count := range packageCounts { @@ -449,37 +430,16 @@ func TestS3Cache_ExistingPackagesBatchOptimization(t *testing.T) { rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), } - // Measure time for batch check (using ListObjects) + // Measure time for parallel HeadObject checks start := time.Now() existing, err := s3Cache.ExistingPackages(context.Background(), packages) - batchDuration := time.Since(start) + duration := time.Since(start) require.NoError(t, err) require.Equal(t, count, len(existing), "All packages should be found") - // Measure time for sequential check (fallback method) - start = time.Now() - existingSeq, err := s3Cache.existingPackagesSequential(context.Background(), packages) - seqDuration := time.Since(start) - require.NoError(t, err) - require.Equal(t, count, len(existingSeq), "All packages should be found") - - // Calculate speedup - speedup := float64(seqDuration) / float64(batchDuration) - t.Logf("Package count: %d", count) - t.Logf("Batch (ListObjects): %v", batchDuration) - t.Logf("Sequential (HeadObject): %v", seqDuration) - t.Logf("Speedup: %.2fx", speedup) - - // For larger package counts, batch should be significantly faster - // Note: Using 2.5x threshold to account for CI environment variability - if count >= 50 { - require.Greater(t, speedup, 2.5, "Batch optimization should be at least 2.5x faster for 50+ packages") - } else { - // For small package counts, batch overhead may reduce speedup - // Use a lower threshold to avoid flaky tests - require.Greater(t, speedup, 0.45, "Batch optimization should not be significantly slower than sequential") - } + t.Logf("Duration: %v", duration) + t.Logf("Packages/sec: %.2f", float64(count)/duration.Seconds()) }) } } @@ -527,39 +487,6 @@ func BenchmarkS3Cache_ExistingPackages(b *testing.B) { } }) - b.Run(fmt.Sprintf("%d-packages-sequential", count), func(b *testing.B) { - // Create packages - packages := make([]cache.Package, count) - for i := 0; i < count; i++ { - packages[i] = &mockPackagePerf{ - version: fmt.Sprintf("package%d:v%d", i, i), - fullName: fmt.Sprintf("package%d", i), - } - } - - // Setup mock storage - mockStorage := createRealisticMockS3StorageMultiple(b, count) - - config := &cache.RemoteConfig{ - BucketName: "test-bucket", - } - - s3Cache := &S3Cache{ - storage: mockStorage, - cfg: config, - workerCount: defaultWorkerCount, - downloadWorkerCount: defaultDownloadWorkerCount, - rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := s3Cache.existingPackagesSequential(context.Background(), packages) - if err != nil { - b.Fatal(err) - } - } - }) } } diff --git a/pkg/leeway/cache/remote/s3_provenance_test.go b/pkg/leeway/cache/remote/s3_provenance_test.go index 80ba0526..6cc4d946 100644 --- a/pkg/leeway/cache/remote/s3_provenance_test.go +++ b/pkg/leeway/cache/remote/s3_provenance_test.go @@ -360,11 +360,3 @@ func (m *mockS3StorageForProvenance) UploadObject(ctx context.Context, key strin m.objects[key] = data return nil } - -func (m *mockS3StorageForProvenance) ListObjects(ctx context.Context, prefix string) ([]string, error) { - var keys []string - for key := range m.objects { - keys = append(keys, key) - } - return keys, nil -} diff --git a/pkg/leeway/cache/remote/s3_resilience_test.go b/pkg/leeway/cache/remote/s3_resilience_test.go index 11364827..a5e61c20 100644 --- a/pkg/leeway/cache/remote/s3_resilience_test.go +++ b/pkg/leeway/cache/remote/s3_resilience_test.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "os" - "strings" "sync" "testing" "time" @@ -154,20 +153,6 @@ func (m *mockS3WithFailures) UploadObject(ctx context.Context, key string, src s return nil } -func (m *mockS3WithFailures) ListObjects(ctx context.Context, prefix string) ([]string, error) { - m.mu.Lock() - defer m.mu.Unlock() - - var keys []string - for key := range m.data { - if strings.HasPrefix(key, prefix) { - keys = append(keys, key) - } - } - - return keys, nil -} - // Mock package for testing type mockPackageResilience struct { version string diff --git a/pkg/leeway/cache/remote/s3_slsa_test.go b/pkg/leeway/cache/remote/s3_slsa_test.go index a2eccaae..21678ddf 100644 --- a/pkg/leeway/cache/remote/s3_slsa_test.go +++ b/pkg/leeway/cache/remote/s3_slsa_test.go @@ -71,16 +71,6 @@ func (m *mockS3StorageWithSLSA) getCallLog() []string { return result } -func (m *mockS3StorageWithSLSA) ListObjects(ctx context.Context, prefix string) ([]string, error) { - var keys []string - for key := range m.objects { - if strings.HasPrefix(key, prefix) { - keys = append(keys, key) - } - } - return keys, nil -} - type mockNotFoundError struct { key string } diff --git a/pkg/leeway/cache/remote/s3_test.go b/pkg/leeway/cache/remote/s3_test.go index 0379a6d9..6f78734b 100644 --- a/pkg/leeway/cache/remote/s3_test.go +++ b/pkg/leeway/cache/remote/s3_test.go @@ -23,10 +23,9 @@ import ( // mockS3Client implements a mock S3 client for testing type mockS3Client struct { - headObjectFunc func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) - getObjectFunc func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) - putObjectFunc func(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) - listObjectsV2Func func(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) + headObjectFunc func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) + getObjectFunc func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + putObjectFunc func(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) } func (m *mockS3Client) HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { @@ -66,13 +65,6 @@ func (m *mockS3Client) UploadPart(ctx context.Context, params *s3.UploadPartInpu return &s3.UploadPartOutput{}, nil } -func (m *mockS3Client) ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { - if m.listObjectsV2Func != nil { - return m.listObjectsV2Func(ctx, params, optFns...) - } - return &s3.ListObjectsV2Output{}, nil -} - func TestS3Cache_ExistingPackages(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -81,7 +73,6 @@ func TestS3Cache_ExistingPackages(t *testing.T) { name string packages []cache.Package mockHeadObject func(key string) (*s3.HeadObjectOutput, error) - mockListObjects func(prefix string) (*s3.ListObjectsV2Output, error) expectedResults map[string]struct{} expectError bool }{ @@ -96,14 +87,6 @@ func TestS3Cache_ExistingPackages(t *testing.T) { } return nil, &types.NoSuchKey{} }, - mockListObjects: func(prefix string) (*s3.ListObjectsV2Output, error) { - key := "v1.tar.gz" - return &s3.ListObjectsV2Output{ - Contents: []types.Object{ - {Key: &key}, - }, - }, nil - }, expectedResults: map[string]struct{}{ "v1": {}, }, @@ -122,14 +105,6 @@ func TestS3Cache_ExistingPackages(t *testing.T) { } return nil, &types.NoSuchKey{} }, - mockListObjects: func(prefix string) (*s3.ListObjectsV2Output, error) { - key := "v1.tar" - return &s3.ListObjectsV2Output{ - Contents: []types.Object{ - {Key: &key}, - }, - }, nil - }, expectedResults: map[string]struct{}{ "v1": {}, }, @@ -142,11 +117,6 @@ func TestS3Cache_ExistingPackages(t *testing.T) { mockHeadObject: func(key string) (*s3.HeadObjectOutput, error) { return nil, &types.NoSuchKey{} }, - mockListObjects: func(prefix string) (*s3.ListObjectsV2Output, error) { - return &s3.ListObjectsV2Output{ - Contents: []types.Object{}, - }, nil - }, expectedResults: map[string]struct{}{}, }, { @@ -157,14 +127,6 @@ func TestS3Cache_ExistingPackages(t *testing.T) { mockHeadObject: func(key string) (*s3.HeadObjectOutput, error) { return &s3.HeadObjectOutput{}, nil }, - mockListObjects: func(prefix string) (*s3.ListObjectsV2Output, error) { - key := "v1.tar.gz" - return &s3.ListObjectsV2Output{ - Contents: []types.Object{ - {Key: &key}, - }, - }, nil - }, expectedResults: map[string]struct{}{}, expectError: false, }, @@ -176,12 +138,6 @@ func TestS3Cache_ExistingPackages(t *testing.T) { headObjectFunc: func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { return tt.mockHeadObject(*params.Key) }, - listObjectsV2Func: func(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { - if tt.mockListObjects != nil { - return tt.mockListObjects(*params.Prefix) - } - return &s3.ListObjectsV2Output{}, nil - }, } s3Cache := &S3Cache{ diff --git a/pkg/leeway/cache/types.go b/pkg/leeway/cache/types.go index f284bd75..9d1a4790 100644 --- a/pkg/leeway/cache/types.go +++ b/pkg/leeway/cache/types.go @@ -89,6 +89,24 @@ const ( DownloadStatusSkipped ) +// String returns a string representation of the download status +func (s DownloadStatus) String() string { + switch s { + case DownloadStatusSuccess: + return "success" + case DownloadStatusNotFound: + return "not_found" + case DownloadStatusFailed: + return "failed" + case DownloadStatusVerificationFailed: + return "verification_failed" + case DownloadStatusSkipped: + return "skipped" + default: + return "unknown" + } +} + // DownloadResult contains the outcome of a single package download attempt. // This enables callers to make informed decisions about retry strategies // and avoid unnecessary rebuilds when transient failures occur. @@ -97,6 +115,8 @@ type DownloadResult struct { Status DownloadStatus // Err contains the error if Status is Failed or VerificationFailed Err error + // Bytes is the size of the downloaded artifact in bytes (0 if not downloaded) + Bytes int64 } // RemoteCache can download and upload build artifacts into a local cache @@ -133,9 +153,6 @@ type ObjectStorage interface { // UploadObject uploads a local file to remote storage UploadObject(ctx context.Context, key string, src string) error - - // ListObjects lists objects with the given prefix - ListObjects(ctx context.Context, prefix string) ([]string, error) } // Config holds configuration for cache implementations diff --git a/pkg/leeway/reporter.go b/pkg/leeway/reporter.go index 4142190c..47cd4a6c 100644 --- a/pkg/leeway/reporter.go +++ b/pkg/leeway/reporter.go @@ -731,7 +731,7 @@ type OTelReporter struct { rootSpan trace.Span packageCtxs map[string]context.Context packageSpans map[string]trace.Span - phaseSpans map[string]trace.Span // key: "packageName:phaseName" + phaseSpans map[string]trace.Span // key: "packageName:phaseName" phaseCtxs map[string]context.Context // key: "packageName:phaseName" mu sync.RWMutex } diff --git a/pkg/leeway/telemetry/tracer.go b/pkg/leeway/telemetry/tracer.go index 647dbac6..c5c85d36 100644 --- a/pkg/leeway/telemetry/tracer.go +++ b/pkg/leeway/telemetry/tracer.go @@ -6,6 +6,8 @@ import ( "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" @@ -15,8 +17,16 @@ import ( "golang.org/x/xerrors" ) -// leewayVersion is set by the build system and used for telemetry -var leewayVersion = "unknown" +var ( + // leewayVersion is set by the build system and used for telemetry + leewayVersion = "unknown" + + // tracerProvider holds the global tracer provider + tracerProvider *sdktrace.TracerProvider + + // initialized tracks whether tracing has been initialized + initialized bool +) // SetLeewayVersion sets the leeway version for telemetry reporting func SetLeewayVersion(version string) { @@ -25,13 +35,16 @@ func SetLeewayVersion(version string) { } } -// InitTracer initializes the OpenTelemetry tracer with OTLP HTTP exporter. -// The endpoint parameter specifies the OTLP endpoint URL (e.g., "localhost:4318"). -// The insecure parameter controls whether to use TLS (false = use TLS, true = no TLS). -// Returns the TracerProvider which must be shut down when done. -func InitTracer(ctx context.Context, endpoint string, insecure bool) (*sdktrace.TracerProvider, error) { +// Initialize sets up the OpenTelemetry tracer with OTLP HTTP exporter. +// This should be called once at application startup. +// Returns an error if initialization fails, or nil if tracing is disabled (empty endpoint). +func Initialize(ctx context.Context, endpoint string, insecure bool) error { if endpoint == "" { - return nil, xerrors.Errorf("OTLP endpoint not provided") + return nil + } + + if initialized { + return nil } // Create OTLP HTTP exporter with optional TLS @@ -44,28 +57,28 @@ func InitTracer(ctx context.Context, endpoint string, insecure bool) (*sdktrace. exporter, err := otlptracehttp.New(ctx, opts...) if err != nil { - return nil, xerrors.Errorf("failed to create OTLP exporter: %w", err) + return xerrors.Errorf("failed to create OTLP exporter: %w", err) } // Create resource with service information res, err := resource.New(ctx, resource.WithAttributes( semconv.ServiceNameKey.String("leeway"), - semconv.ServiceVersionKey.String(getLeewayVersion()), + semconv.ServiceVersionKey.String(leewayVersion), ), ) if err != nil { - return nil, xerrors.Errorf("failed to create resource: %w", err) + return xerrors.Errorf("failed to create resource: %w", err) } // Create tracer provider - tp := sdktrace.NewTracerProvider( + tracerProvider = sdktrace.NewTracerProvider( sdktrace.WithBatcher(exporter), sdktrace.WithResource(res), ) // Set global tracer provider - otel.SetTracerProvider(tp) + otel.SetTracerProvider(tracerProvider) // Set global propagator for W3C Trace Context otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( @@ -73,36 +86,62 @@ func InitTracer(ctx context.Context, endpoint string, insecure bool) (*sdktrace. propagation.Baggage{}, )) - return tp, nil + initialized = true + return nil } // Shutdown flushes any pending spans and shuts down the tracer provider. -// It uses a timeout context to ensure shutdown completes within a reasonable time. -func Shutdown(ctx context.Context, tp *sdktrace.TracerProvider) error { - if tp == nil { +func Shutdown(ctx context.Context) error { + if tracerProvider == nil { return nil } - // Create a timeout context for shutdown shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - if err := tp.Shutdown(shutdownCtx); err != nil { - return xerrors.Errorf("failed to shutdown tracer provider: %w", err) - } + err := tracerProvider.Shutdown(shutdownCtx) + tracerProvider = nil + initialized = false + return err +} - return nil +// Enabled returns true if tracing has been initialized. +func Enabled() bool { + return initialized +} + +// Tracer returns the global tracer for leeway. +func Tracer() trace.Tracer { + return otel.GetTracerProvider().Tracer("github.com/gitpod-io/leeway") +} + +// StartSpan creates a new span with the given name and attributes. +func StartSpan(ctx context.Context, name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + return Tracer().Start(ctx, name, trace.WithAttributes(attrs...)) +} + +// FinishSpan ends a span and sets its status based on the error. +// Usage: defer telemetry.FinishSpan(span, &err) +func FinishSpan(span trace.Span, err *error) { + if span == nil { + return + } + if err != nil && *err != nil { + span.RecordError(*err) + span.SetStatus(codes.Error, (*err).Error()) + } else { + span.SetStatus(codes.Ok, "") + } + span.End() } // ParseTraceContext parses W3C Trace Context headers (traceparent and tracestate) // and returns a context with the extracted trace information. -// Format: traceparent = "00-{trace-id}-{span-id}-{flags}" func ParseTraceContext(traceparent, tracestate string) (context.Context, error) { if traceparent == "" { return context.Background(), nil } - // Create a carrier with the trace context headers carrier := propagation.MapCarrier{ "traceparent": traceparent, } @@ -110,7 +149,6 @@ func ParseTraceContext(traceparent, tracestate string) (context.Context, error) carrier["tracestate"] = tracestate } - // Extract the trace context using W3C Trace Context propagator ctx := context.Background() propagator := propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, @@ -118,7 +156,6 @@ func ParseTraceContext(traceparent, tracestate string) (context.Context, error) ) ctx = propagator.Extract(ctx, carrier) - // Verify that we extracted a valid span context spanCtx := trace.SpanContextFromContext(ctx) if !spanCtx.IsValid() { return nil, xerrors.Errorf("invalid trace context: traceparent=%s", traceparent) @@ -127,20 +164,12 @@ func ParseTraceContext(traceparent, tracestate string) (context.Context, error) return ctx, nil } -// getLeewayVersion returns the leeway version set via SetLeewayVersion. -// Returns "unknown" if version was not set. -func getLeewayVersion() string { - return leewayVersion -} - // FormatTraceContext formats a span context into W3C Trace Context format. -// This is useful for propagating trace context to child processes. func FormatTraceContext(spanCtx trace.SpanContext) (traceparent, tracestate string) { if !spanCtx.IsValid() { return "", "" } - // Use the propagator to format the trace context properly carrier := propagation.MapCarrier{} propagator := propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, @@ -149,10 +178,7 @@ func FormatTraceContext(spanCtx trace.SpanContext) (traceparent, tracestate stri ctx := trace.ContextWithSpanContext(context.Background(), spanCtx) propagator.Inject(ctx, carrier) - traceparent = carrier.Get("traceparent") - tracestate = carrier.Get("tracestate") - - return traceparent, tracestate + return carrier.Get("traceparent"), carrier.Get("tracestate") } // ValidateTraceParent validates the format of a traceparent header. @@ -166,22 +192,18 @@ func ValidateTraceParent(traceparent string) error { return xerrors.Errorf("invalid traceparent format: expected 4 parts, got %d", len(parts)) } - // Validate version if parts[0] != "00" { return xerrors.Errorf("unsupported traceparent version: %s", parts[0]) } - // Validate trace ID length (32 hex chars) if len(parts[1]) != 32 { return xerrors.Errorf("invalid trace ID length: expected 32, got %d", len(parts[1])) } - // Validate span ID length (16 hex chars) if len(parts[2]) != 16 { return xerrors.Errorf("invalid span ID length: expected 16, got %d", len(parts[2])) } - // Validate flags length (2 hex chars) if len(parts[3]) != 2 { return xerrors.Errorf("invalid flags length: expected 2, got %d", len(parts[3])) } diff --git a/pkg/leeway/telemetry/tracer_test.go b/pkg/leeway/telemetry/tracer_test.go index bba3b776..00307aec 100644 --- a/pkg/leeway/telemetry/tracer_test.go +++ b/pkg/leeway/telemetry/tracer_test.go @@ -166,17 +166,21 @@ func TestFormatTraceContext_Invalid(t *testing.T) { } } -func TestInitTracer_NoEndpoint(t *testing.T) { - _, err := InitTracer(context.Background(), "", false) - if err == nil { - t.Error("InitTracer() should fail when endpoint is empty") +func TestInitialize_NoEndpoint(t *testing.T) { + // Empty endpoint should not error - it just disables tracing + err := Initialize(context.Background(), "", false) + if err != nil { + t.Errorf("Initialize() with empty endpoint should not error, got %v", err) + } + if Enabled() { + t.Error("Enabled() should return false when endpoint is empty") } } -func TestShutdown_NilProvider(t *testing.T) { - // Should not panic with nil provider - err := Shutdown(context.Background(), nil) +func TestShutdown_NotInitialized(t *testing.T) { + // Should not panic when not initialized + err := Shutdown(context.Background()) if err != nil { - t.Errorf("Shutdown() with nil provider should not return error, got %v", err) + t.Errorf("Shutdown() when not initialized should not return error, got %v", err) } }