diff --git a/cmd/cli/serve/full.go b/cmd/cli/serve/full.go index a38791e5..cc936d68 100644 --- a/cmd/cli/serve/full.go +++ b/cmd/cli/serve/full.go @@ -9,17 +9,18 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/storacha/go-ucanto/did" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.uber.org/fx" "go.uber.org/fx/fxevent" "go.uber.org/zap/zapcore" "github.com/storacha/piri/cmd/cli/setup" - "github.com/storacha/piri/pkg/build" - "github.com/storacha/piri/cmd/cliutil" "github.com/storacha/piri/pkg/config" - "github.com/storacha/piri/pkg/fx/app" + appconfig "github.com/storacha/piri/pkg/config/app" + fxapp "github.com/storacha/piri/pkg/fx/app" "github.com/storacha/piri/pkg/presets" "github.com/storacha/piri/pkg/telemetry" ) @@ -320,14 +321,14 @@ func fullServer(cmd *cobra.Command, _ []string) error { return fmt.Errorf("parsing config: %w", err) } - if err := initTelemetry(cmd.Context(), telemetry.Config{ - ServiceName: "piri", - ServiceVersion: build.Version, - Environment: userCfg.Network, - InstanceID: appCfg.Identity.Signer.DID().String(), - Endpoint: appCfg.Server.PublicURL.String(), - }); err != nil { - log.Warnf("failed to initialize telemetry: %s", err) + if err := initTelemetry( + cmd.Context(), + appCfg.Identity.Signer.DID().String(), + userCfg.Network, + appCfg.Storage.DataDir, + appCfg.Telemetry, + ); err != nil { + return fmt.Errorf("initializing telemetry: %w", err) } // build our beloved Piri node piri := fx.New( @@ -348,13 +349,13 @@ func fullServer(cmd *cobra.Command, _ []string) error { // - identity // - http server // - databases & datastores - app.CommonModules(appCfg), + fxapp.CommonModules(appCfg), // ucan service dependencies: // - http handlers // - ucan specific handlers, blob allocate and accept, replicate, etc. // - blob, claim, publisher, replicator, and storage services - app.UCANModule, + fxapp.UCANModule, // pdp service dependencies: // - lotus, eth, and contract clients @@ -363,7 +364,7 @@ func fullServer(cmd *cobra.Command, _ []string) error { // - http handlers // - create proof set, add root, upload piece, etc. // - address wallet - app.PDPModule, + fxapp.PDPModule, // Post-startup operations: print server info and record telemetry fx.Invoke(func(lc fx.Lifecycle) { @@ -374,13 +375,18 @@ func fullServer(cmd *cobra.Command, _ []string) error { cmd.Println("Piri Running on: " + appCfg.Server.Host + ":" + strconv.Itoa(int(appCfg.Server.Port))) cmd.Println("Piri Public Endpoint: " + appCfg.Server.PublicURL.String()) - // Record server telemetry - telemetry.RecordServerInfo(ctx, "full", - telemetry.StringAttr("did", appCfg.Identity.Signer.DID().String()), - telemetry.StringAttr("owner_address", appCfg.PDPService.OwnerAddress.String()), - telemetry.StringAttr("public_url", appCfg.Server.PublicURL.String()), - telemetry.Int64Attr("proof_set", int64(appCfg.UCANService.ProofSetID)), - ) + // Record server metadata + if err := telemetry.RecordServerInfo(otel.GetMeterProvider().Meter("github."+ + "com/storacha/piri/cli/serve"), + ctx, + "full", + attribute.String("did", appCfg.Identity.Signer.DID().String()), + attribute.String("owner_address", appCfg.PDPService.OwnerAddress.String()), + attribute.String("public_url", appCfg.Server.PublicURL.String()), + attribute.Int64("proof_set", int64(appCfg.UCANService.ProofSetID)), + ); err != nil { + log.Warnw("Failed to record server info", "error", err) + } return nil }, OnStop: func(ctx context.Context) error { @@ -403,10 +409,27 @@ func fullServer(cmd *cobra.Command, _ []string) error { return nil } -func initTelemetry(ctx context.Context, cfg telemetry.Config) error { - // bail if this has been disabled. +func initTelemetry(ctx context.Context, instanceID, network string, dataDir string, cfg appconfig.TelemetryConfig) error { + // bail if this has been disabled globally. + // backwards compatible env var if os.Getenv("PIRI_DISABLE_ANALYTICS") != "" { return nil } - return telemetry.Initialize(ctx, cfg) + if cfg.DisableStorachaAnalytics { + return nil + } + + t, err := telemetry.Setup(ctx, network, instanceID) + if err != nil { + return fmt.Errorf("setting up telemetry: %w", err) + } + + if err := telemetry.StartHostMetrics( + ctx, + t.Metrics.Meter("github.com/storacha/piri/cli/serve"), + dataDir, + ); err != nil { + return fmt.Errorf("setting up telemetry host metrics: %w", err) + } + return nil } diff --git a/cmd/cli/serve/ucan.go b/cmd/cli/serve/ucan.go index aa7a6343..472d5f57 100644 --- a/cmd/cli/serve/ucan.go +++ b/cmd/cli/serve/ucan.go @@ -28,7 +28,6 @@ import ( "github.com/storacha/piri/pkg/service/retrieval" "github.com/storacha/piri/pkg/service/storage" "github.com/storacha/piri/pkg/store/blobstore" - "github.com/storacha/piri/pkg/telemetry" ) var ( @@ -310,15 +309,6 @@ func startServer(cmd *cobra.Command, _ []string) error { defer storageSvc.Close(ctx) - telemetry.RecordServerInfo(ctx, "ucan", - telemetry.StringAttr("did", id.DID().String()), - telemetry.StringAttr("indexing_did", indexingServiceDID.String()), - telemetry.StringAttr("indexing_url", indexingServiceURL.String()), - telemetry.StringAttr("upload_did", uploadServiceDID.String()), - telemetry.StringAttr("upload_url", uploadServiceURL.String()), - telemetry.Int64Attr("proof_set", int64(cfg.UCANService.ProofSetID)), - ) - errHandler := func(err ucanserver.HandlerExecutionError[any]) { l := log.With("error", err.Error()) if s := err.Stack(); s != "" { diff --git a/deploy/full-node/deployment-types/multi-instance/main.tf b/deploy/full-node/deployment-types/multi-instance/main.tf index f30eef8d..5bdc0ec4 100644 --- a/deploy/full-node/deployment-types/multi-instance/main.tf +++ b/deploy/full-node/deployment-types/multi-instance/main.tf @@ -55,7 +55,7 @@ module "piri_instances" { service_pem_content = lookup(each.value, "service_pem_content", "") wallet_hex_content = lookup(each.value, "wallet_hex_content", "") operator_email = each.value.operator_email - use_letsencrypt_staging = var.environment != "production" && var.environment != "prod" && var.environment != "forge-prod" + use_letsencrypt_staging = false #var.environment != "production" && var.environment != "prod" && var.environment !="forge-prod" tags = { Owner = var.owner diff --git a/lib/jobqueue/jobqueue.go b/lib/jobqueue/jobqueue.go index 96578df5..e4ddd49f 100644 --- a/lib/jobqueue/jobqueue.go +++ b/lib/jobqueue/jobqueue.go @@ -235,7 +235,15 @@ func New[T any](name string, db *sql.DB, ser serializer.Serializer[T], opts ...O } // instantiate worker which consumes from queue - w := worker.New[T](q, ser, worker.WithLog(c.Logger), worker.WithLimit(int(c.MaxWorkers)), worker.WithExtend(c.ExtendDelay)) + w, err := worker.New[T](q, ser, + worker.WithLog(c.Logger), + worker.WithLimit(int(c.MaxWorkers)), + worker.WithExtend(c.ExtendDelay), + worker.WithQueueName(name), + ) + if err != nil { + return nil, fmt.Errorf("failed to create worker: %w", err) + } return &JobQueue[T]{ queue: q, diff --git a/lib/jobqueue/worker/telemetry.go b/lib/jobqueue/worker/telemetry.go new file mode 100644 index 00000000..4f1e7764 --- /dev/null +++ b/lib/jobqueue/worker/telemetry.go @@ -0,0 +1,130 @@ +package worker + +import ( + "context" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + + "github.com/storacha/piri/lib/telemetry" +) + +// jobDurationBounds covering 5ms up to 30 minutes. +var jobDurationBounds = []float64{ + (5 * time.Millisecond).Seconds(), + (10 * time.Millisecond).Seconds(), + (100 * time.Millisecond).Seconds(), + (time.Second).Seconds(), + (3 * time.Second).Seconds(), + (5 * time.Second).Seconds(), + (10 * time.Second).Seconds(), + (30 * time.Second).Seconds(), + (time.Minute).Seconds(), + (2 * time.Minute).Seconds(), + (3 * time.Minute).Seconds(), + (5 * time.Minute).Seconds(), + (6 * time.Minute).Seconds(), + (7 * time.Minute).Seconds(), + (8 * time.Minute).Seconds(), + (9 * time.Minute).Seconds(), + (10 * time.Minute).Seconds(), + (30 * time.Minute).Seconds(), +} + +type metricsRecorder struct { + activeJobs *telemetry.UpDownCounter + queuedJobs *telemetry.UpDownCounter + failedJobsCounter *telemetry.Counter + jobDurationTimer *telemetry.Timer +} + +func newMetrics() (*metricsRecorder, error) { + meter := otel.GetMeterProvider().Meter("lib/jobqueue/worker") + activeJobs, err := telemetry.NewUpDownCounter( + meter, + "active_jobs", + "number of jobs running", + "1", + ) + if err != nil { + return nil, err + } + queuedJobs, err := telemetry.NewUpDownCounter( + meter, + "queued_jobs", + "number of jobs queued (includes active)", + "1", + ) + if err != nil { + return nil, err + } + failedJobs, err := telemetry.NewCounter( + meter, + "failed_jobs", + "number of jobs that failed permanently", + "1", + ) + jobDuration, err := telemetry.NewTimer( + meter, + "job_duration", + "records duration of a jobs runtime", + jobDurationBounds, + ) + return &metricsRecorder{ + activeJobs: activeJobs, + queuedJobs: queuedJobs, + failedJobsCounter: failedJobs, + jobDurationTimer: jobDuration, + }, nil +} + +func (m *metricsRecorder) recordQueuedDelta(ctx context.Context, queueName, jobName string, delta int64) { + if m == nil || m.queuedJobs == nil { + return + } + m.queuedJobs.Add(ctx, delta, attribute.String("queue", queueName), attribute.String("job", jobName)) +} + +func (m *metricsRecorder) recordActiveDelta(ctx context.Context, queueName, jobName string, delta int64) { + if m == nil || m.activeJobs == nil { + return + } + m.activeJobs.Add(ctx, delta, attribute.String("queue", queueName), attribute.String("job", jobName)) +} + +func (m *metricsRecorder) recordJobFailure(ctx context.Context, queueName, jobName, reason string, attempt int) { + if m == nil || m.failedJobsCounter == nil || queueName == "" || jobName == "" { + return + } + + attrs := []attribute.KeyValue{ + attribute.String("queue", queueName), + attribute.String("job", jobName), + } + if reason != "" { + attrs = append(attrs, attribute.String("failure_reason", reason)) + } + if attempt > 0 { + attrs = append(attrs, attribute.Int("attempt", attempt)) + } + + m.failedJobsCounter.Inc(ctx, attrs...) +} + +func (m *metricsRecorder) recordJobDuration(ctx context.Context, queueName, jobName, status string, attempt int, duration time.Duration) { + if m == nil || m.jobDurationTimer == nil || queueName == "" || jobName == "" { + return + } + + attrs := []attribute.KeyValue{ + attribute.String("queue", queueName), + attribute.String("job", jobName), + attribute.String("status", status), + } + if attempt > 0 { + attrs = append(attrs, attribute.Int("attempt", attempt)) + } + + m.jobDurationTimer.Record(ctx, duration, attrs...) +} diff --git a/lib/jobqueue/worker/worker.go b/lib/jobqueue/worker/worker.go index 67f177e9..8ab71da2 100644 --- a/lib/jobqueue/worker/worker.go +++ b/lib/jobqueue/worker/worker.go @@ -50,10 +50,12 @@ type Worker[T any] struct { pollInterval time.Duration extend time.Duration jobCount int + queueName string jobCountLimit int jobCountLock sync.RWMutex log logger.StandardLogger serializer serializer.Serializer[T] + metrics *metricsRecorder } // Config holds all parameters needed to initialize a Worker. @@ -62,6 +64,7 @@ type Config struct { JobCountLimit int PollInterval time.Duration Extend time.Duration + QueueName string } // Option modifies a Config before creating the Worker. @@ -92,7 +95,14 @@ func WithExtend(d time.Duration) Option { } } -func New[T any](q queue.Interface, ser serializer.Serializer[T], options ...Option) *Worker[T] { +// WithQueueName sets the queue name for telemetry labels. +func WithQueueName(name string) Option { + return func(cfg *Config) { + cfg.QueueName = name + } +} + +func New[T any](q queue.Interface, ser serializer.Serializer[T], options ...Option) (*Worker[T], error) { // Default config cfg := &Config{ Log: &logger.DiscardLogger{}, @@ -106,6 +116,10 @@ func New[T any](q queue.Interface, ser serializer.Serializer[T], options ...Opti opt(cfg) } + metricsRecorder, err := newMetrics() + if err != nil { + return nil, err + } // Construct the Worker using the final config jq := &Worker[T]{ jobs: make(map[string]*jobRegistration[T]), @@ -114,11 +128,13 @@ func New[T any](q queue.Interface, ser serializer.Serializer[T], options ...Opti serializer: ser, log: cfg.Log, + queueName: cfg.QueueName, jobCountLimit: cfg.JobCountLimit, pollInterval: cfg.PollInterval, extend: cfg.Extend, + metrics: metricsRecorder, } - return jq + return jq, nil } type message struct { @@ -199,7 +215,15 @@ func (r *Worker[T]) Enqueue(ctx context.Context, name string, msg T) error { }); err != nil { return err } - return r.queue.Send(ctx, queue.Message{Body: buf.Bytes()}) + + id, err := r.queue.SendAndGetID(ctx, queue.Message{Body: buf.Bytes()}) + if err != nil { + return err + } + if id != "" { + r.metrics.recordQueuedDelta(ctx, r.queueName, name, 1) + } + return nil } func (r *Worker[T]) EnqueueTx(ctx context.Context, tx *sql.Tx, name string, msg T) error { @@ -305,6 +329,9 @@ func (r *Worker[T]) runJob(ctx context.Context, wg *sync.WaitGroup, m *queue.Mes } }() + r.metrics.recordActiveDelta(ctx, r.queueName, jm.Name, 1) + defer r.metrics.recordActiveDelta(ctx, r.queueName, jm.Name, -1) + jobCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -315,6 +342,7 @@ func (r *Worker[T]) runJob(ctx context.Context, wg *sync.WaitGroup, m *queue.Mes r.log.Infow("Running job", "name", jm.Name, "attempt", m.Received) before := time.Now() if err := jobReg.fn(jobCtx, jobInput); err != nil { + r.metrics.recordJobDuration(jobCtx, r.queueName, jm.Name, "failure", m.Received, time.Since(before)) r.handleJobError(jobCtx, m, jm.Name, jobInput, jobReg, err) return } @@ -322,7 +350,8 @@ func (r *Worker[T]) runJob(ctx context.Context, wg *sync.WaitGroup, m *queue.Mes // Job succeeded duration := time.Since(before) r.log.Infow("Ran job", "name", jm.Name, "duration", duration, "attempt", m.Received) - r.deleteMessage(m.ID) + r.metrics.recordJobDuration(jobCtx, r.queueName, jm.Name, "success", m.Received, duration) + r.deleteMessage(jobCtx, m.ID, jm.Name) } // extendMessageTimeout periodically extends the message timeout while the job is running @@ -346,7 +375,7 @@ func (r *Worker[T]) extendMessageTimeout(ctx context.Context, messageID queue.ID func (r *Worker[T]) handleJobError(ctx context.Context, m *queue.Message, jobName string, jobInput T, jobReg *jobRegistration[T], err error) { var permanent *PermanentError if errors.As(err, &permanent) { - r.handlePermanentError(ctx, m.ID, jobName, jobInput, jobReg, err) + r.handlePermanentError(ctx, m.ID, jobName, jobInput, jobReg, err, m.Received) return } @@ -365,8 +394,9 @@ func (r *Worker[T]) handleJobError(ctx context.Context, m *queue.Message, jobNam } // handlePermanentError handles errors that should not be retried -func (r *Worker[T]) handlePermanentError(ctx context.Context, messageID queue.ID, jobName string, jobInput T, jobReg *jobRegistration[T], err error) { +func (r *Worker[T]) handlePermanentError(ctx context.Context, messageID queue.ID, jobName string, jobInput T, jobReg *jobRegistration[T], err error, attempt int) { r.log.Errorw("Failed to run job, PermanentError occurred", "error", err, "name", jobName) + r.metrics.recordJobFailure(ctx, r.queueName, jobName, "permanent_error", attempt) // Invoke OnFailure callback if configured if jobReg.onFailure != nil { @@ -374,7 +404,7 @@ func (r *Worker[T]) handlePermanentError(ctx context.Context, messageID queue.ID } // Move to dead letter queue - r.moveToDeadLetter(messageID, jobName, "permanent_error", err) + r.moveToDeadLetter(ctx, messageID, jobName, "permanent_error", err) } // handleMaxRetriesExceeded handles errors after all retries have been exhausted @@ -386,6 +416,7 @@ func (r *Worker[T]) handleMaxRetriesExceeded(ctx context.Context, messageID queu "max_attempts", r.queue.MaxReceive(), "error", err, ) + r.metrics.recordJobFailure(ctx, r.queueName, jobName, "max_retries", attempt) // Invoke OnFailure callback if configured if jobReg.onFailure != nil { @@ -393,7 +424,7 @@ func (r *Worker[T]) handleMaxRetriesExceeded(ctx context.Context, messageID queu } // Move to dead letter queue - r.moveToDeadLetter(messageID, jobName, "max_retries", err) + r.moveToDeadLetter(ctx, messageID, jobName, "max_retries", err) } // invokeOnFailure calls the OnFailure callback and logs any errors @@ -405,7 +436,7 @@ func (r *Worker[T]) invokeOnFailure(ctx context.Context, jobName string, jobInpu } // moveToDeadLetter moves a message to the dead letter queue -func (r *Worker[T]) moveToDeadLetter(messageID queue.ID, jobName string, reason string, err error) { +func (r *Worker[T]) moveToDeadLetter(ctx context.Context, messageID queue.ID, jobName string, reason string, err error) { // TODO PASS A CONTEXT FORREST dlqCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() @@ -414,15 +445,18 @@ func (r *Worker[T]) moveToDeadLetter(messageID queue.ID, jobName string, reason r.log.Errorw("Error moving job to dead letter queue", "error", dlqErr, "original_error", err) } else { r.log.Infow("Moved job to dead letter queue", "name", jobName, "reason", reason) + r.metrics.recordQueuedDelta(ctx, r.queueName, jobName, -1) } } // deleteMessage deletes a successfully processed message from the queue -func (r *Worker[T]) deleteMessage(messageID queue.ID) { +func (r *Worker[T]) deleteMessage(ctx context.Context, messageID queue.ID, jobName string) { deleteCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() if err := r.queue.Delete(deleteCtx, messageID); err != nil { r.log.Errorw("Error deleting job from queue, it will be retried", "error", err) + return } + r.metrics.recordQueuedDelta(ctx, r.queueName, jobName, -1) } diff --git a/lib/jobqueue/worker/worker_test.go b/lib/jobqueue/worker/worker_test.go index 1bb96e14..244f67e9 100644 --- a/lib/jobqueue/worker/worker_test.go +++ b/lib/jobqueue/worker/worker_test.go @@ -24,15 +24,17 @@ import ( func TestRunner_Register(t *testing.T) { t.Run("can register a new job", func(t *testing.T) { - r := worker.New[[]byte](nil, nil) + r, err := worker.New[[]byte](nil, nil) + require.NoError(t, err) require.NoError(t, r.Register("test", func(ctx context.Context, m []byte) error { return nil })) }) t.Run("errors if the same job is registered twice", func(t *testing.T) { - r := worker.New[[]byte](nil, nil) - err := r.Register("test", func(ctx context.Context, m []byte) error { + r, err := worker.New[[]byte](nil, nil) + require.NoError(t, err) + err = r.Register("test", func(ctx context.Context, m []byte) error { return nil }) require.NoError(t, err) @@ -47,11 +49,12 @@ func TestOnFailure(t *testing.T) { MaxReceive: 3, // Max 3 attempts Timeout: 10 * time.Millisecond, }) - r := worker.New[[]byte]( + r, err := worker.New[[]byte]( q, &PassThroughSerializer[[]byte]{}, worker.WithLimit(10), ) + require.NoError(t, err) var onFailureCalled bool var capturedMsg []byte @@ -61,7 +64,7 @@ func TestOnFailure(t *testing.T) { defer cancel() // Register a job that always fails - err := r.Register("failing-job", + err = r.Register("failing-job", func(ctx context.Context, m []byte) error { return fmt.Errorf("job failed") }, @@ -93,18 +96,19 @@ func TestOnFailure(t *testing.T) { MaxReceive: 3, Timeout: 10 * time.Millisecond, }) - r := worker.New[[]byte]( + r, err := worker.New[[]byte]( q, &PassThroughSerializer[[]byte]{}, worker.WithLimit(10), ) + require.NoError(t, err) var onFailureCalled bool ctx, cancel := context.WithCancel(t.Context()) // Register a job that succeeds - err := r.Register("success-job", + err = r.Register("success-job", func(ctx context.Context, m []byte) error { cancel() return nil @@ -132,11 +136,12 @@ func TestOnFailure(t *testing.T) { MaxReceive: 3, // Max 3 attempts Timeout: 10 * time.Millisecond, }) - r := worker.New[[]byte]( + r, err := worker.New[[]byte]( q, &PassThroughSerializer[[]byte]{}, worker.WithLimit(10), ) + require.NoError(t, err) var onFailureCalled bool var attempts int @@ -145,7 +150,7 @@ func TestOnFailure(t *testing.T) { defer cancel() // Register a job that fails twice then succeeds - err := r.Register("eventual-success", + err = r.Register("eventual-success", func(ctx context.Context, m []byte) error { attempts++ if attempts < 3 { @@ -182,7 +187,7 @@ func TestDeadLetterQueue(t *testing.T) { MaxReceive: 3, Timeout: 10 * time.Millisecond, }) - r := worker.New[[]byte]( + r, err := worker.New[[]byte]( q, &PassThroughSerializer[[]byte]{}, worker.WithLimit(10), @@ -192,7 +197,7 @@ func TestDeadLetterQueue(t *testing.T) { defer cancel() // Register a job that returns a permanent error - err := r.Register("permanent-error-job", func(ctx context.Context, m []byte) error { + err = r.Register("permanent-error-job", func(ctx context.Context, m []byte) error { cancel() return worker.Permanent(fmt.Errorf("this is a permanent error")) }) @@ -225,17 +230,18 @@ func TestDeadLetterQueue(t *testing.T) { MaxReceive: 3, // Max 3 attempts Timeout: 10 * time.Millisecond, }) - r := worker.New[[]byte]( + r, err := worker.New[[]byte]( q, &PassThroughSerializer[[]byte]{}, worker.WithLimit(10), ) + require.NoError(t, err) ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) defer cancel() // Register a job that always fails - err := r.Register("max-retries-job", func(ctx context.Context, m []byte) error { + err = r.Register("max-retries-job", func(ctx context.Context, m []byte) error { return fmt.Errorf("job failed") }) require.NoError(t, err) @@ -267,7 +273,7 @@ func TestDeadLetterQueue(t *testing.T) { MaxReceive: 3, Timeout: 10 * time.Millisecond, }) - r := worker.New[[]byte]( + r, err := worker.New[[]byte]( q, &PassThroughSerializer[[]byte]{}, worker.WithLimit(10), @@ -279,7 +285,7 @@ func TestDeadLetterQueue(t *testing.T) { defer cancel() // Register a job that fails with OnFailure callback - err := r.Register("failing-job-with-callback", + err = r.Register("failing-job-with-callback", func(ctx context.Context, m []byte) error { return fmt.Errorf("job failed") }, @@ -413,7 +419,8 @@ func TestCreateTx(t *testing.T) { t.Run("can create a job inside a transaction", func(t *testing.T) { db := internaltesting.NewInMemoryDB(t) q := internaltesting.NewQ(t, queue.NewOpts{DB: db}) - r := worker.New[[]byte](q, &PassThroughSerializer[[]byte]{}) + r, err := worker.New[[]byte](q, &PassThroughSerializer[[]byte]{}) + require.NoError(t, err) var ran bool ctx, cancel := context.WithCancel(t.Context()) @@ -424,7 +431,7 @@ func TestCreateTx(t *testing.T) { return nil })) - err := internalsql.InTx(db, func(tx *sql.Tx) error { + err = internalsql.InTx(db, func(tx *sql.Tx) error { return r.EnqueueTx(ctx, tx, "test", []byte("yo")) }) require.NoError(t, err) @@ -438,12 +445,13 @@ func newRunner(t *testing.T) (*queue.Queue, *worker.Worker[[]byte]) { t.Helper() q := internaltesting.NewQ(t, queue.NewOpts{Timeout: 100 * time.Millisecond}) - r := worker.New[[]byte]( + r, err := worker.New[[]byte]( q, &PassThroughSerializer[[]byte]{}, worker.WithLimit(10), worker.WithExtend(100*time.Millisecond), ) + require.NoError(t, err) return q, r } diff --git a/lib/telemetry/metrics.go b/lib/telemetry/metrics.go new file mode 100644 index 00000000..6ab00314 --- /dev/null +++ b/lib/telemetry/metrics.go @@ -0,0 +1,165 @@ +package telemetry + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type Counter struct { + counter metric.Int64Counter +} + +func NewCounter(meter metric.Meter, name, description, unit string) (*Counter, error) { + if name == "" { + return nil, fmt.Errorf("counter name required") + } + if description == "" { + return nil, fmt.Errorf("counter description required") + } + counter, err := meter.Int64Counter( + name, + metric.WithDescription(description), + metric.WithUnit(unit), + ) + if err != nil { + return nil, fmt.Errorf("failed to create counter %s: %w", name, err) + } + + return &Counter{ + counter: counter, + }, nil +} + +func (c *Counter) Add(ctx context.Context, value int64, attrs ...attribute.KeyValue) { + c.counter.Add(ctx, value, metric.WithAttributes(attrs...)) +} + +func (c *Counter) Inc(ctx context.Context, attrs ...attribute.KeyValue) { + c.Add(ctx, 1, attrs...) +} + +type UpDownCounter struct { + counter metric.Int64UpDownCounter +} + +func NewUpDownCounter(meter metric.Meter, name string, description string, unit string) (*UpDownCounter, error) { + if name == "" { + return nil, fmt.Errorf("counter name required") + } + if description == "" { + return nil, fmt.Errorf("counter description required") + } + + counter, err := meter.Int64UpDownCounter( + name, + metric.WithDescription(description), + metric.WithUnit(unit), + ) + if err != nil { + return nil, fmt.Errorf("failed to create counter %s: %w", name, err) + } + + return &UpDownCounter{ + counter: counter, + }, nil +} + +func (c *UpDownCounter) Add(ctx context.Context, value int64, attrs ...attribute.KeyValue) { + c.counter.Add(ctx, value, metric.WithAttributes(attrs...)) +} + +func (c *UpDownCounter) Inc(ctx context.Context, attrs ...attribute.KeyValue) { + c.Add(ctx, 1, attrs...) +} + +type Timer struct { + histogram metric.Float64Histogram +} + +func NewTimer(meter metric.Meter, name, description string, boundaries []float64) (*Timer, error) { + if name == "" { + return nil, fmt.Errorf("timer name required") + } + if description == "" { + return nil, fmt.Errorf("timer description required") + } + if len(boundaries) == 0 { + return nil, fmt.Errorf("timer boundaries required") + } + histogram, err := meter.Float64Histogram( + name, + metric.WithDescription(description), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(boundaries...), + ) + if err != nil { + return nil, fmt.Errorf("failed to create timer %s: %w", name, err) + } + + return &Timer{ + histogram: histogram, + }, nil + +} + +func (t *Timer) Record(ctx context.Context, duration time.Duration, attrs ...attribute.KeyValue) { + t.histogram.Record(ctx, float64(duration.Milliseconds()), metric.WithAttributes(attrs...)) +} + +func (t *Timer) Start(attrs ...attribute.KeyValue) *StopWatch { + return &StopWatch{ + timer: t, + startTime: time.Now(), + attrs: attrs, + } +} + +func (t *StopWatch) Stop(ctx context.Context, attrs ...attribute.KeyValue) { + duration := time.Since(t.startTime) + allAttrs := append(t.attrs, attrs...) + t.timer.Record(ctx, duration, allAttrs...) +} + +type StopWatch struct { + timer *Timer + startTime time.Time + attrs []attribute.KeyValue +} + +// Info represents an info metric - a gauge that always has value 1 +// Info metrics are used to expose textual information as labels. +type Info struct { + gauge metric.Int64Gauge + attrs []attribute.KeyValue +} + +// InfoConfig configures an info metric +type InfoConfig struct { + Name string + Description string + Labels map[string]string +} + +// NewInfo creates a new info metric. Under the hood it is a gauge that always reports 1. +// This is useful for exposing version info, addresses, and other metadata +func NewInfo(meter metric.Meter, name, description string, attrs ...attribute.KeyValue) (*Info, error) { + gauge, err := meter.Int64Gauge(name, metric.WithDescription(description)) + if err != nil { + return nil, err + } + + return &Info{ + gauge: gauge, + attrs: attrs, + }, nil +} + +// Record records the info metric with the given attributes, merging them with existing ones +func (i *Info) Record(ctx context.Context) { + // Update the stored attributes + i.gauge.Record(ctx, 1, metric.WithAttributes(i.attrs...)) +} diff --git a/lib/telemetry/metrics/metrics.go b/lib/telemetry/metrics/metrics.go new file mode 100644 index 00000000..a14b25a4 --- /dev/null +++ b/lib/telemetry/metrics/metrics.go @@ -0,0 +1,73 @@ +package metrics + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" +) + +type CollectorConfig struct { + Endpoint string + Insecure bool + Headers map[string]string + PublishInterval time.Duration +} + +type Config struct { + Collectors []CollectorConfig + Options []sdkmetric.Option +} + +func NewProvider( + ctx context.Context, + res *resource.Resource, + cfg Config, +) (metric.MeterProvider, func(ctx2 context.Context) error, error) { + if len(cfg.Collectors) == 0 { + return noop.NewMeterProvider(), + func(ctx context.Context) error { return nil }, + nil + } + + var readers []sdkmetric.Reader + for _, collector := range cfg.Collectors { + if collector.Endpoint == "" { + return nil, nil, fmt.Errorf("telemetry provider endpoint is required") + } + if collector.PublishInterval == 0 { + return nil, nil, fmt.Errorf("telemetry provider publish interval is required") + } + opts := []otlpmetrichttp.Option{ + otlpmetrichttp.WithEndpoint(collector.Endpoint), + otlpmetrichttp.WithHeaders(collector.Headers), + } + if collector.Insecure { + opts = append(opts, otlpmetrichttp.WithInsecure()) + } + exporter, err := otlpmetrichttp.New(ctx, opts...) + if err != nil { + return nil, nil, fmt.Errorf("failed to create metrics provider: %w", err) + } + readers = append(readers, sdkmetric.NewPeriodicReader(exporter, sdkmetric.WithInterval(collector.PublishInterval))) + } + + // attach the resource + providerOptions := []sdkmetric.Option{ + sdkmetric.WithResource(res), + } + // attach reader(s) + for _, r := range readers { + providerOptions = append(providerOptions, sdkmetric.WithReader(r)) + } + // any remaining options + providerOptions = append(providerOptions, cfg.Options...) + + provider := sdkmetric.NewMeterProvider(providerOptions...) + return provider, provider.Shutdown, nil +} diff --git a/lib/telemetry/telemetry.go b/lib/telemetry/telemetry.go new file mode 100644 index 00000000..2d6446f2 --- /dev/null +++ b/lib/telemetry/telemetry.go @@ -0,0 +1,87 @@ +package telemetry + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/trace" + + "github.com/storacha/piri/lib/telemetry/metrics" + "github.com/storacha/piri/lib/telemetry/traces" +) + +type shutdownFn func(context.Context) error + +type Telemetry struct { + Metrics metric.MeterProvider + Traces trace.TracerProvider + shutdownFns []shutdownFn +} + +func New( + ctx context.Context, + environment, serviceName, serviceVersion, instanceID string, + metricCollectors metrics.Config, + tracesCollectors traces.Config, + resourceOpts ...resource.Option, +) (*Telemetry, error) { + if serviceName == "" { + return nil, fmt.Errorf("telemetry service name required") + } + if serviceVersion == "" { + return nil, fmt.Errorf("telemetry service version required") + } + if instanceID == "" { + return nil, fmt.Errorf("telemetry instance id required") + } + if environment == "" { + return nil, fmt.Errorf("telemetry environment required") + } + + var rsrcOpts []resource.Option + rsrcOpts = append(rsrcOpts, resource.WithAttributes( + semconv.ServiceNameKey.String(serviceName), + semconv.ServiceVersionKey.String(serviceVersion), + semconv.ServiceInstanceIDKey.String(instanceID), + semconv.DeploymentEnvironmentNameKey.String(environment), + )) + rsrcOpts = append(rsrcOpts, resourceOpts...) + + rsrc, err := resource.New(ctx, rsrcOpts...) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + metricsProvider, metricShutdownFn, err := metrics.NewProvider(ctx, rsrc, metricCollectors) + if err != nil { + return nil, fmt.Errorf("failed to create metrics provider: %w", err) + } + + traceProvider, traceShutdownFn, err := traces.NewProvider(ctx, rsrc, tracesCollectors) + if err != nil { + return nil, fmt.Errorf("failed to create trace provider: %w", err) + } + + otel.SetMeterProvider(metricsProvider) + otel.SetTracerProvider(traceProvider) + + return &Telemetry{ + Metrics: metricsProvider, + Traces: traceProvider, + shutdownFns: []shutdownFn{metricShutdownFn, traceShutdownFn}, + }, + nil +} + +func (t *Telemetry) Shutdown(ctx context.Context) error { + for _, fn := range t.shutdownFns { + if err := fn(ctx); err != nil { + return err + } + } + return nil +} diff --git a/lib/telemetry/traces/traces.go b/lib/telemetry/traces/traces.go new file mode 100644 index 00000000..0ba26458 --- /dev/null +++ b/lib/telemetry/traces/traces.go @@ -0,0 +1,81 @@ +package traces + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/propagation" + sdkresource "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" +) + +type Config struct { + Collectors []CollectorConfig + Options []sdktrace.TracerProviderOption +} + +type CollectorConfig struct { + Endpoint string + Insecure bool + Headers map[string]string + PublishInterval time.Duration +} + +func NewProvider( + ctx context.Context, + res *sdkresource.Resource, + cfg Config, +) (trace.TracerProvider, func(ctx context.Context) error, error) { + defaultPropagator := propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ) + if len(cfg.Collectors) == 0 { + return noop.NewTracerProvider(), + func(ctx context.Context) error { return nil }, + nil + } + + var processors []sdktrace.SpanProcessor + for _, collector := range cfg.Collectors { + if collector.Endpoint == "" { + return nil, nil, fmt.Errorf("collector endpoint required") + } + opts := []otlptracehttp.Option{ + otlptracehttp.WithEndpoint(collector.Endpoint), + otlptracehttp.WithHeaders(collector.Headers), + } + if collector.Insecure { + opts = append(opts, otlptracehttp.WithInsecure()) + } + exporter, err := otlptracehttp.New(ctx, opts...) + if err != nil { + return nil, nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + var bspOpts []sdktrace.BatchSpanProcessorOption + if collector.PublishInterval > 0 { + bspOpts = append(bspOpts, sdktrace.WithBatchTimeout(collector.PublishInterval)) + } + processors = append(processors, sdktrace.NewBatchSpanProcessor(exporter, bspOpts...)) + } + + // attach resource + providerOptions := []sdktrace.TracerProviderOption{ + sdktrace.WithResource(res), + } + // all processors (endpoints namely) + for _, p := range processors { + providerOptions = append(providerOptions, sdktrace.WithSpanProcessor(p)) + } + // remaining options + providerOptions = append(providerOptions, cfg.Options...) + + provider := sdktrace.NewTracerProvider(providerOptions...) + otel.SetTextMapPropagator(defaultPropagator) + return provider, provider.Shutdown, nil +} diff --git a/pkg/config/app/config.go b/pkg/config/app/config.go index a974c8b6..190cad3b 100644 --- a/pkg/config/app/config.go +++ b/pkg/config/app/config.go @@ -17,6 +17,9 @@ type AppConfig struct { // Configuration specific for PDP operations PDPService PDPServiceConfig + // Telemetry configuration + Telemetry TelemetryConfig + // // Configs below are not exposed to users, they are hard coded with defaults // their purpose is to allow configurable configuration injection in tests diff --git a/pkg/config/app/telemetry.go b/pkg/config/app/telemetry.go new file mode 100644 index 00000000..98ca228c --- /dev/null +++ b/pkg/config/app/telemetry.go @@ -0,0 +1,16 @@ +package app + +import "time" + +type TelemetryConfig struct { + Metrics []TelemetryCollectorConfig + Traces []TelemetryCollectorConfig + DisableStorachaAnalytics bool +} + +type TelemetryCollectorConfig struct { + Endpoint string + Insecure bool + Headers map[string]string + PublishInterval time.Duration +} diff --git a/pkg/config/full.go b/pkg/config/full.go index 3dc15346..e1843d22 100644 --- a/pkg/config/full.go +++ b/pkg/config/full.go @@ -13,6 +13,7 @@ type FullServerConfig struct { Server ServerConfig `mapstructure:"server" toml:"server"` PDPService PDPServiceConfig `mapstructure:"pdp" toml:"pdp"` UCANService UCANServiceConfig `mapstructure:"ucan" toml:"ucan"` + Telemetry TelemetryConfig `mapstructure:"telemetry" toml:"telemetry"` } func (f FullServerConfig) Validate() error { @@ -58,6 +59,8 @@ func (f FullServerConfig) ToAppConfig() (app.AppConfig, error) { return app.AppConfig{}, fmt.Errorf("converting local pdp to app config: %s", err) } + out.Telemetry = f.Telemetry.ToAppConfig() + // // non-user configuration // diff --git a/pkg/config/telemetry.go b/pkg/config/telemetry.go new file mode 100644 index 00000000..993005b8 --- /dev/null +++ b/pkg/config/telemetry.go @@ -0,0 +1,45 @@ +package config + +import ( + "time" + + "github.com/storacha/piri/pkg/config/app" +) + +type TelemetryCollectorConfig struct { + Endpoint string `mapstructure:"endpoint" validate:"required" toml:"endpoint"` + Insecure bool `mapstructure:"insecure" toml:"insecure,omitempty"` + Headers map[string]string `mapstructure:"headers" toml:"headers,omitempty"` + PublishInterval time.Duration `mapstructure:"publish_interval" toml:"publish_interval,omitempty"` +} + +type TelemetryConfig struct { + Metrics []TelemetryCollectorConfig `mapstructure:"metrics" toml:"metrics,omitempty"` + Traces []TelemetryCollectorConfig `mapstructure:"traces" toml:"traces,omitempty"` + DisableStorachaAnalytics bool `mapstructure:"disable_storacha_analytics" toml:"disable_storacha_analytics,omitempty"` +} + +func (t TelemetryConfig) Validate() error { + return validateConfig(t) +} + +func (t TelemetryConfig) ToAppConfig() app.TelemetryConfig { + convert := func(in []TelemetryCollectorConfig) []app.TelemetryCollectorConfig { + out := make([]app.TelemetryCollectorConfig, 0, len(in)) + for _, c := range in { + out = append(out, app.TelemetryCollectorConfig{ + Endpoint: c.Endpoint, + Insecure: c.Insecure, + Headers: c.Headers, + PublishInterval: c.PublishInterval, + }) + } + return out + } + + return app.TelemetryConfig{ + Metrics: convert(t.Metrics), + Traces: convert(t.Traces), + DisableStorachaAnalytics: t.DisableStorachaAnalytics, + } +} diff --git a/pkg/fx/scheduler/messages.go b/pkg/fx/scheduler/messages.go index 2ac8eed2..7bd3f0c8 100644 --- a/pkg/fx/scheduler/messages.go +++ b/pkg/fx/scheduler/messages.go @@ -57,12 +57,12 @@ type SenderETHPair struct { SendTask *tasks.SendTaskETH } -func ProvideSenderETHPair(params SenderETHParams) *SenderETHPair { - sender, sendTask := tasks.NewSenderETH(params.Client, params.Wallet, params.DB) +func ProvideSenderETHPair(params SenderETHParams) (*SenderETHPair, error) { + sender, sendTask, err := tasks.NewSenderETH(params.Client, params.Wallet, params.DB) return &SenderETHPair{ Sender: sender, SendTask: sendTask, - } + }, err } func ProvideSenderFromPair(pair *SenderETHPair) *tasks.SenderETH { diff --git a/pkg/pdp/tasks/next_pdp.go b/pkg/pdp/tasks/next_pdp.go index d9b697a9..f4cfd7bb 100644 --- a/pkg/pdp/tasks/next_pdp.go +++ b/pkg/pdp/tasks/next_pdp.go @@ -9,11 +9,13 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/core/types" chaintypes "github.com/filecoin-project/lotus/chain/types" + "go.opentelemetry.io/otel" "gorm.io/gorm" "gorm.io/gorm/clause" "github.com/storacha/filecoin-services/go/evmerrors" + "github.com/storacha/piri/lib/telemetry" "github.com/storacha/piri/pkg/pdp/chainsched" "github.com/storacha/piri/pkg/pdp/ethereum" "github.com/storacha/piri/pkg/pdp/promise" @@ -34,6 +36,8 @@ type NextProvingPeriodTask struct { fil ChainAPI addFunc promise.Promise[scheduler.AddTaskFunc] + + taskFailure *telemetry.Counter } func NewNextProvingPeriodTask( @@ -45,13 +49,24 @@ func NewNextProvingPeriodTask( verifier smartcontracts.Verifier, service smartcontracts.Service, ) (*NextProvingPeriodTask, error) { + meter := otel.GetMeterProvider().Meter("github.com/storacha/piri/pkg/pdp/tasks") + pdpNextFailureCounter, err := telemetry.NewCounter( + meter, + "pdp_next_failure", + "records failure of next pdp task", + "1", + ) + if err != nil { + return nil, err + } n := &NextProvingPeriodTask{ - db: db, - ethClient: ethClient, - sender: sender, - fil: api, - verifier: verifier, - service: service, + db: db, + ethClient: ethClient, + sender: sender, + fil: api, + verifier: verifier, + service: service, + taskFailure: pdpNextFailureCounter, } if err := chainSched.AddHandler(func(ctx context.Context, revert, apply *chaintypes.TipSet) error { @@ -148,6 +163,11 @@ func adjustNextProveAt(nextProveAt int64, minRequiredEpoch int64, provingPeriod func (n *NextProvingPeriodTask) Do(taskID scheduler.TaskID) (done bool, err error) { ctx := context.Background() + defer func() { + if err != nil { + n.taskFailure.Inc(ctx) + } + }() // Select the proof set where challenge_request_task_id equals taskID and prove_at_epoch is not NULL var pdp models.PDPProofSet err = n.db.WithContext(ctx). diff --git a/pkg/pdp/tasks/prove_pdp.go b/pkg/pdp/tasks/prove_pdp.go index 9783c9c8..b605add9 100644 --- a/pkg/pdp/tasks/prove_pdp.go +++ b/pkg/pdp/tasks/prove_pdp.go @@ -23,10 +23,12 @@ import ( pool "github.com/libp2p/go-buffer-pool" "github.com/minio/sha256-simd" "github.com/samber/lo" + "go.opentelemetry.io/otel" "golang.org/x/crypto/sha3" "gorm.io/gorm" "gorm.io/gorm/clause" + "github.com/storacha/piri/lib/telemetry" "github.com/storacha/piri/pkg/pdp/chainsched" "github.com/storacha/piri/pkg/pdp/ethereum" "github.com/storacha/piri/pkg/pdp/promise" @@ -55,6 +57,8 @@ type ProveTask struct { head atomic.Pointer[chaintypes.TipSet] addFunc promise.Promise[scheduler.AddTaskFunc] + + taskFailure *telemetry.Counter } func NewProveTask( @@ -68,21 +72,31 @@ func NewProveTask( reader types.PieceReaderAPI, resolver types.PieceResolverAPI, ) (*ProveTask, error) { + meter := otel.GetMeterProvider().Meter("github.com/storacha/piri/pkg/pdp/tasks") + pdpProveFailure, err := telemetry.NewCounter( + meter, + "pdp_prove_failure", + "records failure to perform a pdp proof", + "1", + ) + if err != nil { + return nil, err + } pt := &ProveTask{ - db: db, - ethClient: ethClient, - verifier: verifier, - sender: sender, - api: api, - bs: bs, - reader: reader, - resolver: resolver, + db: db, + ethClient: ethClient, + verifier: verifier, + sender: sender, + api: api, + bs: bs, + reader: reader, + resolver: resolver, + taskFailure: pdpProveFailure, } // ProveTasks are created on pdp_proof_sets entries where // challenge_request_msg_hash is not null (=not yet landed) - - err := chainSched.AddHandler(func(ctx context.Context, revert, apply *chaintypes.TipSet) error { + err = chainSched.AddHandler(func(ctx context.Context, revert, apply *chaintypes.TipSet) error { if apply == nil { return nil } @@ -162,6 +176,11 @@ func NewProveTask( func (p *ProveTask) Do(taskID scheduler.TaskID) (done bool, err error) { ctx := context.Background() + defer func() { + if err != nil { + p.taskFailure.Inc(ctx) + } + }() // Retrieve proof set and challenge epoch for the task var proveTask models.PDPProveTask diff --git a/pkg/pdp/tasks/sender_eth.go b/pkg/pdp/tasks/sender_eth.go index 3cc5fd1b..0b607cfe 100644 --- a/pkg/pdp/tasks/sender_eth.go +++ b/pkg/pdp/tasks/sender_eth.go @@ -12,11 +12,14 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" "github.com/storacha/filecoin-services/go/evmerrors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.uber.org/multierr" "golang.org/x/xerrors" "gorm.io/gorm" "gorm.io/gorm/clause" + "github.com/storacha/piri/lib/telemetry" "github.com/storacha/piri/pkg/pdp/types" "github.com/storacha/piri/pkg/pdp/promise" @@ -44,21 +47,45 @@ type SenderETH struct { sendTask *SendTaskETH db *gorm.DB + + messageEstimateGasFailureCounter *telemetry.Counter } // NewSenderETH creates a new SenderETH. -func NewSenderETH(client SenderETHClient, wallet wallet.Wallet, db *gorm.DB) (*SenderETH, *SendTaskETH) { +func NewSenderETH(client SenderETHClient, wallet wallet.Wallet, db *gorm.DB) (*SenderETH, *SendTaskETH, error) { + meter := otel.GetMeterProvider().Meter("github.com/storacha/piri/pkg/pdp/tasks") + sendFailure, err := telemetry.NewCounter( + meter, + "message_send_failure", + "records failure to send a message", + "1", + ) + if err != nil { + return nil, nil, err + } + st := &SendTaskETH{ - client: client, - wallet: wallet, - db: db, + client: client, + wallet: wallet, + db: db, + messageSendFailureCounter: sendFailure, + } + estimateGasFailureCounter, err := telemetry.NewCounter( + meter, + "message_estimate_gas_failure", + "records failure to estimate gas for sending messages; similar to a send failure", + "1", + ) + if err != nil { + return nil, nil, err } return &SenderETH{ - client: client, - db: db, - sendTask: st, - }, st + client: client, + db: db, + sendTask: st, + messageEstimateGasFailureCounter: estimateGasFailureCounter, + }, st, nil } func (s *SenderETH) Send(ctx context.Context, fromAddress common.Address, tx *ethtypes.Transaction, reason string) (common.Hash, error) { @@ -92,6 +119,9 @@ func (s *SenderETH) Send(ctx context.Context, fromAddress common.Address, tx *et } } } + // NB(forrest): otherwise we consider the selector unknown + s.messageEstimateGasFailureCounter.Inc(ctx, attribute.String("selector", "unknown"), + attribute.String("method", reason)) return common.Hash{}, fmt.Errorf("failed to estimate gas: %w", err) } if gasLimit == 0 { @@ -222,7 +252,8 @@ type SendTaskETH struct { client SenderETHClient wallet wallet.Wallet - db *gorm.DB + db *gorm.DB + messageSendFailureCounter *telemetry.Counter } func (s *SendTaskETH) Do(taskID scheduler.TaskID) (done bool, err error) { @@ -361,6 +392,7 @@ func (s *SendTaskETH) Do(taskID scheduler.TaskID) (done bool, err error) { var sendError string if err != nil { sendError = err.Error() + s.messageSendFailureCounter.Inc(ctx, attribute.String("method", dbTx.SendReason)) } err = s.db.Model(&models.MessageSendsEth{}). diff --git a/pkg/service/replicator/replicate.go b/pkg/service/replicator/replicate.go index 6156da00..62052b5b 100644 --- a/pkg/service/replicator/replicate.go +++ b/pkg/service/replicator/replicate.go @@ -21,6 +21,7 @@ type Replicator interface { type Service struct { queue *jobqueue.JobQueue[*replicahandler.TransferRequest] adapter *adapter + metrics *replicahandler.Metrics } type adapter struct { @@ -48,7 +49,11 @@ func New( uploadConn client.Connection, queue *jobqueue.JobQueue[*replicahandler.TransferRequest], ) (*Service, error) { - return &Service{ + metrics, err := replicahandler.NewMetrics() + if err != nil { + return nil, err + } + svc := &Service{ queue: queue, adapter: &adapter{ id: id, @@ -58,7 +63,9 @@ func New( receipts: rstore, uploadConn: uploadConn, }, - }, nil + metrics: metrics, + } + return svc, nil } const TransferTaskName = "transfer-task" @@ -69,7 +76,7 @@ func (r *Service) Replicate(ctx context.Context, task *replicahandler.TransferRe func (r *Service) RegisterTransferTask(queue *jobqueue.JobQueue[*replicahandler.TransferRequest]) error { return queue.Register(TransferTaskName, func(ctx context.Context, request *replicahandler.TransferRequest) error { - return replicahandler.Transfer(ctx, r.adapter, request) + return replicahandler.Transfer(ctx, r.adapter, request, r.metrics) }, jobqueue.WithOnFailure(func(ctx context.Context, msg *replicahandler.TransferRequest, err error) error { return replicahandler.SendFailureReceipt(ctx, r.adapter, msg, err) })) diff --git a/pkg/service/storage/handlers/replica/telemetry.go b/pkg/service/storage/handlers/replica/telemetry.go new file mode 100644 index 00000000..bcbb99bc --- /dev/null +++ b/pkg/service/storage/handlers/replica/telemetry.go @@ -0,0 +1,76 @@ +package replica + +import ( + "context" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + + "github.com/storacha/piri/lib/telemetry" +) + +var replicaDurationBounds = []float64{ + (5 * time.Millisecond).Seconds(), + (10 * time.Millisecond).Seconds(), + (100 * time.Millisecond).Seconds(), + (time.Second).Seconds(), + (3 * time.Second).Seconds(), + (5 * time.Second).Seconds(), + (10 * time.Second).Seconds(), + (30 * time.Second).Seconds(), + (time.Minute).Seconds(), + (2 * time.Minute).Seconds(), + (3 * time.Minute).Seconds(), + (5 * time.Minute).Seconds(), + (6 * time.Minute).Seconds(), + (7 * time.Minute).Seconds(), + (8 * time.Minute).Seconds(), + (9 * time.Minute).Seconds(), + (10 * time.Minute).Seconds(), + (30 * time.Minute).Seconds(), +} + +type Metrics struct { + failureCounter *telemetry.Counter + durationTimer *telemetry.Timer +} + +func NewMetrics() (*Metrics, error) { + meter := otel.GetMeterProvider().Meter("github.com/storacha/piri/pkg/service/storage/handlers/replica") + failureCounter, err := telemetry.NewCounter( + meter, + "replica_transfer_failure", + "records failures during a replica transfer", + "1", + ) + if err != nil { + return nil, err + } + + durationTimer, err := telemetry.NewTimer( + meter, + "transfer_duration", + "durating of replica transfer operation", + replicaDurationBounds, + ) + + return &Metrics{ + failureCounter: failureCounter, + durationTimer: durationTimer, + }, nil +} + +func (m *Metrics) recordFailure(ctx context.Context, sink string) { + if m == nil || m.failureCounter == nil { + return + } + m.failureCounter.Inc(ctx, attribute.String("sink", sink)) +} + +func (m *Metrics) startDuration(sink string) *telemetry.StopWatch { + if m == nil || m.durationTimer == nil { + return nil + } + return m.durationTimer.Start(attribute.String("sink", sink)) +} diff --git a/pkg/service/storage/handlers/replica/transfer.go b/pkg/service/storage/handlers/replica/transfer.go index 1daa44b5..4140dffd 100644 --- a/pkg/service/storage/handlers/replica/transfer.go +++ b/pkg/service/storage/handlers/replica/transfer.go @@ -35,6 +35,8 @@ import ( ucan_http "github.com/storacha/go-ucanto/transport/http" "github.com/storacha/go-ucanto/ucan" "github.com/storacha/go-ucanto/validator" + "go.opentelemetry.io/otel/attribute" + "github.com/storacha/piri/pkg/pdp" "github.com/storacha/piri/pkg/service/blobs" "github.com/storacha/piri/pkg/service/claims" @@ -176,12 +178,21 @@ func (t *TransferRequest) UnmarshalJSON(b []byte) error { // // Both paths end with sending the receipt to the upload service, which confirms // successful replication to the requesting node. -func Transfer(ctx context.Context, service TransferService, request *TransferRequest) error { +func Transfer(ctx context.Context, service TransferService, request *TransferRequest, metrics *Metrics) (err error) { var ( rcpt receipt.AnyReceipt forks []fx.Effect ) + stopwatch := metrics.startDuration(sinkLabel(request.Sink)) + defer func() { + success := true + if err != nil { + success = false + } + stopwatch.Stop(ctx, attribute.Bool("success", success)) + }() + // Check if the blob already exists blobExists, err := checkBlobExists(ctx, service, request.Blob) if err != nil { @@ -232,6 +243,16 @@ func Transfer(ctx context.Context, service TransferService, request *TransferReq return sendMessageToUploadService(ctx, service, rcpt) } +func sinkLabel(sink *url.URL) string { + if sink == nil { + return "none" + } + if sink.Host != "" { + return sink.Host + } + return sink.String() +} + // checkBlobExists checks if the blob already exists in either PDP or Blobs store func checkBlobExists(ctx context.Context, service TransferService, blob types.Blob) (bool, error) { var err error diff --git a/pkg/telemetry/counter.go b/pkg/telemetry/counter.go deleted file mode 100644 index 75b7389b..00000000 --- a/pkg/telemetry/counter.go +++ /dev/null @@ -1,111 +0,0 @@ -package telemetry - -import ( - "context" - "fmt" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" -) - -type Counter struct { - counter metric.Int64Counter - attrs []attribute.KeyValue -} - -type CounterConfig struct { - Name string - Description string - Unit string - Attributes map[string]string -} - -func NewCounter(meter metric.Meter, cfg CounterConfig) (*Counter, error) { - opts := []metric.Int64CounterOption{ - metric.WithDescription(cfg.Description), - } - - if cfg.Unit != "" { - opts = append(opts, metric.WithUnit(cfg.Unit)) - } - - counter, err := meter.Int64Counter(cfg.Name, opts...) - if err != nil { - return nil, fmt.Errorf("failed to create counter %s: %w", cfg.Name, err) - } - - attrs := make([]attribute.KeyValue, 0, len(cfg.Attributes)) - for k, v := range cfg.Attributes { - attrs = append(attrs, attribute.String(k, v)) - } - - return &Counter{ - counter: counter, - attrs: attrs, - }, nil -} - -func (c *Counter) Add(ctx context.Context, value int64, attrs ...attribute.KeyValue) { - allAttrs := append(c.attrs, attrs...) - c.counter.Add(ctx, value, metric.WithAttributes(allAttrs...)) -} - -func (c *Counter) Inc(ctx context.Context, attrs ...attribute.KeyValue) { - c.Add(ctx, 1, attrs...) -} - -func (c *Counter) WithAttributes(attrs ...attribute.KeyValue) *Counter { - return &Counter{ - counter: c.counter, - attrs: append(c.attrs, attrs...), - } -} - -type FloatCounter struct { - counter metric.Float64Counter - attrs []attribute.KeyValue -} - -type FloatCounterConfig struct { - Name string - Description string - Unit string - Attributes map[string]string -} - -func NewFloatCounter(meter metric.Meter, cfg FloatCounterConfig) (*FloatCounter, error) { - opts := []metric.Float64CounterOption{ - metric.WithDescription(cfg.Description), - } - - if cfg.Unit != "" { - opts = append(opts, metric.WithUnit(cfg.Unit)) - } - - counter, err := meter.Float64Counter(cfg.Name, opts...) - if err != nil { - return nil, fmt.Errorf("failed to create float counter %s: %w", cfg.Name, err) - } - - attrs := make([]attribute.KeyValue, 0, len(cfg.Attributes)) - for k, v := range cfg.Attributes { - attrs = append(attrs, attribute.String(k, v)) - } - - return &FloatCounter{ - counter: counter, - attrs: attrs, - }, nil -} - -func (c *FloatCounter) Add(ctx context.Context, value float64, attrs ...attribute.KeyValue) { - allAttrs := append(c.attrs, attrs...) - c.counter.Add(ctx, value, metric.WithAttributes(allAttrs...)) -} - -func (c *FloatCounter) WithAttributes(attrs ...attribute.KeyValue) *FloatCounter { - return &FloatCounter{ - counter: c.counter, - attrs: append(c.attrs, attrs...), - } -} diff --git a/pkg/telemetry/gauge.go b/pkg/telemetry/gauge.go deleted file mode 100644 index 787e54b5..00000000 --- a/pkg/telemetry/gauge.go +++ /dev/null @@ -1,107 +0,0 @@ -package telemetry - -import ( - "context" - "fmt" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" -) - -type Gauge struct { - gauge metric.Int64Gauge - attrs []attribute.KeyValue -} - -type GaugeConfig struct { - Name string - Description string - Unit string - Attributes map[string]string -} - -func NewGauge(meter metric.Meter, cfg GaugeConfig) (*Gauge, error) { - opts := []metric.Int64GaugeOption{ - metric.WithDescription(cfg.Description), - } - - if cfg.Unit != "" { - opts = append(opts, metric.WithUnit(cfg.Unit)) - } - - gauge, err := meter.Int64Gauge(cfg.Name, opts...) - if err != nil { - return nil, fmt.Errorf("failed to create gauge %s: %w", cfg.Name, err) - } - - attrs := make([]attribute.KeyValue, 0, len(cfg.Attributes)) - for k, v := range cfg.Attributes { - attrs = append(attrs, attribute.String(k, v)) - } - - return &Gauge{ - gauge: gauge, - attrs: attrs, - }, nil -} - -func (g *Gauge) Record(ctx context.Context, value int64, attrs ...attribute.KeyValue) { - allAttrs := append(g.attrs, attrs...) - g.gauge.Record(ctx, value, metric.WithAttributes(allAttrs...)) -} - -func (g *Gauge) WithAttributes(attrs ...attribute.KeyValue) *Gauge { - return &Gauge{ - gauge: g.gauge, - attrs: append(g.attrs, attrs...), - } -} - -type FloatGauge struct { - gauge metric.Float64Gauge - attrs []attribute.KeyValue -} - -type FloatGaugeConfig struct { - Name string - Description string - Unit string - Attributes map[string]string -} - -func NewFloatGauge(meter metric.Meter, cfg FloatGaugeConfig) (*FloatGauge, error) { - opts := []metric.Float64GaugeOption{ - metric.WithDescription(cfg.Description), - } - - if cfg.Unit != "" { - opts = append(opts, metric.WithUnit(cfg.Unit)) - } - - gauge, err := meter.Float64Gauge(cfg.Name, opts...) - if err != nil { - return nil, fmt.Errorf("failed to create float gauge %s: %w", cfg.Name, err) - } - - attrs := make([]attribute.KeyValue, 0, len(cfg.Attributes)) - for k, v := range cfg.Attributes { - attrs = append(attrs, attribute.String(k, v)) - } - - return &FloatGauge{ - gauge: gauge, - attrs: attrs, - }, nil -} - -func (g *FloatGauge) Record(ctx context.Context, value float64, attrs ...attribute.KeyValue) { - allAttrs := append(g.attrs, attrs...) - g.gauge.Record(ctx, value, metric.WithAttributes(allAttrs...)) -} - -func (g *FloatGauge) WithAttributes(attrs ...attribute.KeyValue) *FloatGauge { - return &FloatGauge{ - gauge: g.gauge, - attrs: append(g.attrs, attrs...), - } -} diff --git a/pkg/telemetry/global.go b/pkg/telemetry/global.go deleted file mode 100644 index 3ef440f1..00000000 --- a/pkg/telemetry/global.go +++ /dev/null @@ -1,67 +0,0 @@ -package telemetry - -import ( - "context" - "sync" - - "go.opentelemetry.io/otel/metric/noop" -) - -var ( - globalTelemetry *Telemetry - globalMu sync.RWMutex -) - -// Initialize sets up the global telemetry instance. -// This should be called once at application startup. -func Initialize(ctx context.Context, cfg Config) error { - globalMu.Lock() - defer globalMu.Unlock() - - tel, err := New(ctx, cfg) - if err != nil { - return err - } - - globalTelemetry = tel - return nil -} - -// Global returns the global telemetry instance. -// If Initialize hasn't been called, it returns a no-op telemetry instance. -func Global() *Telemetry { - globalMu.RLock() - if globalTelemetry != nil { - defer globalMu.RUnlock() - return globalTelemetry - } - globalMu.RUnlock() - - // If not initialized, create a no-op instance - globalMu.Lock() - defer globalMu.Unlock() - if globalTelemetry == nil { - globalTelemetry = NewWithMeter(noop.NewMeterProvider().Meter("noop")) - } - - return globalTelemetry -} - -// Shutdown shuts down the global telemetry instance. -func Shutdown(ctx context.Context) error { - globalMu.RLock() - defer globalMu.RUnlock() - - if globalTelemetry != nil { - return globalTelemetry.Shutdown(ctx) - } - return nil -} - -// setGlobalForTesting sets a custom telemetry instance for testing. -// This should only be used in tests. -func setGlobalForTesting(tel *Telemetry) { - globalMu.Lock() - defer globalMu.Unlock() - globalTelemetry = tel -} diff --git a/pkg/telemetry/global_test.go b/pkg/telemetry/global_test.go deleted file mode 100644 index 07f760e2..00000000 --- a/pkg/telemetry/global_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package telemetry - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestGlobalTelemetry(t *testing.T) { - ctx := context.Background() - - t.Run("Global returns noop before initialization", func(t *testing.T) { - // Reset global state for test - setGlobalForTesting(nil) - - // Before initialization, should get a no-op instance - tel := Global() - assert.NotNil(t, tel) - - // Should be able to create metrics without error - counter, err := Global().NewCounter(CounterConfig{ - Name: "test_counter", - }) - require.NoError(t, err) - assert.NotNil(t, counter) - - // Should be safe to use - counter.Inc(ctx) - }) - - t.Run("Initialize sets global instance", func(t *testing.T) { - // Reset global state for test - setGlobalForTesting(nil) - - // Before initialization, should get a no-op instance - before := Global() - assert.NotNil(t, before) - - // After initialization, should get the new instance - Initialize(context.Background(), Config{ - ServiceName: "test", - ServiceVersion: "1.0.0", - Environment: "test", - Endpoint: "http://localhost:3000", - endpoint: "http://localhost:4317", - insecure: true, - }) - after := Global() - assert.NotEqual(t, before, after) - }) - - t.Run("Shutdown handles nil global", func(t *testing.T) { - setGlobalForTesting(nil) - - // Should not panic - err := Shutdown(ctx) - assert.NoError(t, err) - }) -} diff --git a/pkg/telemetry/host_metrics.go b/pkg/telemetry/host_metrics.go new file mode 100644 index 00000000..0c0f5712 --- /dev/null +++ b/pkg/telemetry/host_metrics.go @@ -0,0 +1,122 @@ +package telemetry + +import ( + "context" + "fmt" + + logging "github.com/ipfs/go-log/v2" + "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/disk" + "github.com/shirou/gopsutil/v4/mem" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +var log = logging.Logger("telemetry") + +// StartHostMetrics exports basic host metrics (CPU, memory, and data-dir disk usage) +// via the global meter. Metrics are intentionally scoped to avoid PII; only the +// data-dir path is attached to disk metrics so we can distinguish the storage +// volume being monitored. +func StartHostMetrics(ctx context.Context, meter metric.Meter, dataDir string) error { + if dataDir == "" { + return fmt.Errorf("dataDir is required to start host metrics") + } + + cpuUtilization, err := meter.Float64ObservableGauge( + "system_cpu_utilization", + metric.WithDescription("System-wide CPU utilization as a fraction (0-1)"), + metric.WithUnit("1"), + ) + if err != nil { + return fmt.Errorf("create cpu utilization gauge: %w", err) + } + + memUsed, err := meter.Int64ObservableGauge( + "system_memory_used_bytes", + metric.WithDescription("System memory used in bytes"), + metric.WithUnit("By"), + ) + if err != nil { + return fmt.Errorf("create memory used gauge: %w", err) + } + + memTotal, err := meter.Int64ObservableGauge( + "system_memory_total_bytes", + metric.WithDescription("System memory total in bytes"), + metric.WithUnit("By"), + ) + if err != nil { + return fmt.Errorf("create memory total gauge: %w", err) + } + + dataDirUsed, err := meter.Int64ObservableGauge( + "piri_datadir_used_bytes", + metric.WithDescription("Bytes used on the filesystem backing the Piri data-dir"), + metric.WithUnit("By"), + ) + if err != nil { + return fmt.Errorf("create data-dir used gauge: %w", err) + } + + dataDirFree, err := meter.Int64ObservableGauge( + "piri_datadir_free_bytes", + metric.WithDescription("Free bytes on the filesystem backing the Piri data-dir"), + metric.WithUnit("By"), + ) + if err != nil { + return fmt.Errorf("create data-dir free gauge: %w", err) + } + + dataDirTotal, err := meter.Int64ObservableGauge( + "piri_datadir_total_bytes", + metric.WithDescription("Total bytes on the filesystem backing the Piri data-dir"), + metric.WithUnit("By"), + ) + if err != nil { + return fmt.Errorf("create data-dir total gauge: %w", err) + } + + dataDirAttr := attribute.String("path", dataDir) + + reg, err := meter.RegisterCallback( + func(ctx context.Context, o metric.Observer) error { + if percentages, err := cpu.Percent(0, false); err == nil && len(percentages) > 0 { + // cpu.Percent returns 0-100; convert to 0-1 for utilization. + o.ObserveFloat64(cpuUtilization, percentages[0]/100.0) + } + + if vm, err := mem.VirtualMemory(); err == nil { + o.ObserveInt64(memUsed, int64(vm.Used)) + o.ObserveInt64(memTotal, int64(vm.Total)) + } + + if usage, err := disk.Usage(dataDir); err == nil { + attrOpt := metric.WithAttributes(dataDirAttr) + o.ObserveInt64(dataDirUsed, int64(usage.Used), attrOpt) + o.ObserveInt64(dataDirFree, int64(usage.Free), attrOpt) + o.ObserveInt64(dataDirTotal, int64(usage.Total), attrOpt) + } + + return nil + }, + cpuUtilization, + memUsed, + memTotal, + dataDirUsed, + dataDirFree, + dataDirTotal, + ) + if err != nil { + return fmt.Errorf("register host metrics callback: %w", err) + } + + go func() { + <-ctx.Done() + if err := reg.Unregister(); err != nil { + log.Warnw("failed to unregister host metrics callback", "error", err) + } + }() + + return nil +} diff --git a/pkg/telemetry/http_metrics.go b/pkg/telemetry/http_metrics.go deleted file mode 100644 index 35abca10..00000000 --- a/pkg/telemetry/http_metrics.go +++ /dev/null @@ -1,20 +0,0 @@ -package telemetry - -import ( - semconvhttp "go.opentelemetry.io/otel/semconv/v1.37.0/httpconv" -) - -var ( - HTTPServerRequestDurationInstrument = semconvhttp.ServerRequestDuration{}.Name() - HTTPServerRequestSizeInstrument = semconvhttp.ServerRequestBodySize{}.Name() - HTTPServerResponseSizeInstrument = semconvhttp.ServerResponseBodySize{}.Name() -) - -// HTTPServerDurationBounds extends the default middleware buckets (0.005–10s from -// go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho/internal/semconv) -// to capture long uploads/downloads up to 10 minutes. -var HTTPServerDurationBounds = []float64{ - 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, - 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, - 30, 60, 120, 300, 600, -} diff --git a/pkg/telemetry/info.go b/pkg/telemetry/info.go index e7bbef71..fb02302c 100644 --- a/pkg/telemetry/info.go +++ b/pkg/telemetry/info.go @@ -2,58 +2,33 @@ package telemetry import ( "context" + "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" -) - -// Info represents an info metric - a gauge that always has value 1 -// Info metrics are used to expose textual information as labels. -type Info struct { - gauge *Gauge - attrs map[string]attribute.KeyValue -} -// InfoConfig configures an info metric -type InfoConfig struct { - Name string - Description string - Labels map[string]string -} + "github.com/storacha/piri/lib/telemetry" + "github.com/storacha/piri/pkg/build" +) -// NewInfo creates a new info metric. Under the hood it is a gauge that always reports 1. -// This is useful for exposing version info, addresses, and other metadata -func NewInfo(meter metric.Meter, cfg InfoConfig) (*Info, error) { - gauge, err := NewGauge(meter, GaugeConfig{ - Name: cfg.Name, - Description: cfg.Description, - }) +func RecordServerInfo(meter metric.Meter, ctx context.Context, serverType string, extraAttrs ...attribute.KeyValue) error { + allAttrs := append(extraAttrs, + attribute.String("version", build.Version), + attribute.String("commit", build.Commit), + attribute.String("built_by", build.BuiltBy), + attribute.String("build_date", build.Date), + attribute.Int64("start_time_unix", time.Now().Unix()), + attribute.String("server_type", serverType), + ) + info, err := telemetry.NewInfo( + meter, + "piri_server_info", + "Build and runtime information about the Piri server", + allAttrs..., + ) if err != nil { - return nil, err - } - - attrs := make(map[string]attribute.KeyValue, len(cfg.Labels)) - for k, v := range cfg.Labels { - attrs[k] = attribute.String(k, v) + return err } - - return &Info{ - gauge: gauge, - attrs: attrs, - }, nil -} - -// Record records the info metric with the given attributes, merging them with existing ones -func (i *Info) Record(ctx context.Context, attrs ...attribute.KeyValue) { - // Update the stored attributes - for _, attr := range attrs { - i.attrs[string(attr.Key)] = attr - } - - recordedAttrs := make([]attribute.KeyValue, 0, len(i.attrs)) - for _, v := range i.attrs { - recordedAttrs = append(recordedAttrs, v) - } - - i.gauge.Record(ctx, 1, recordedAttrs...) + info.Record(ctx) + return nil } diff --git a/pkg/telemetry/metrics.go b/pkg/telemetry/metrics.go deleted file mode 100644 index adc2a436..00000000 --- a/pkg/telemetry/metrics.go +++ /dev/null @@ -1,37 +0,0 @@ -package telemetry - -import ( - "context" - "time" - - "go.opentelemetry.io/otel/attribute" - - "github.com/storacha/piri/pkg/build" -) - -// RecordServerInfo records server metadata as an info metric -// this metric is best effort, if it fails, a warning is log and the process continues -func RecordServerInfo(ctx context.Context, serverType string, extraAttrs ...attribute.KeyValue) { - info, err := Global().NewInfo(InfoConfig{ - Name: "piri_server_info", - Description: "Build and runtime information about the Piri server", - }) - if err != nil { - log.Warnw("failed to initialize piri server info metric", "error", err, "type", serverType) - } - - // Base attributes that all servers share - attrs := []attribute.KeyValue{ - StringAttr("version", build.Version), - StringAttr("commit", build.Commit), - StringAttr("built_by", build.BuiltBy), - StringAttr("build_date", build.Date), - Int64Attr("start_time_unix", time.Now().Unix()), - StringAttr("server_type", serverType), - } - - // Add any server-specific attributes - attrs = append(attrs, extraAttrs...) - - info.Record(ctx, attrs...) -} diff --git a/pkg/telemetry/otlp_options.go b/pkg/telemetry/otlp_options.go deleted file mode 100644 index e1c8afca..00000000 --- a/pkg/telemetry/otlp_options.go +++ /dev/null @@ -1,51 +0,0 @@ -package telemetry - -import ( - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" -) - -type otlpHTTPOptions struct { - endpoint string - insecure bool - headers map[string]string -} - -func newOTLPHTTPOptions(endpoint string, insecure bool, headers map[string]string) otlpHTTPOptions { - return otlpHTTPOptions{ - endpoint: endpoint, - insecure: insecure, - headers: headers, - } -} - -func (o otlpHTTPOptions) metricOptions() []otlpmetrichttp.Option { - return buildHTTPOptions(o, otlpmetrichttp.WithEndpoint, otlpmetrichttp.WithInsecure, otlpmetrichttp.WithHeaders) -} - -func (o otlpHTTPOptions) traceOptions() []otlptracehttp.Option { - return buildHTTPOptions(o, otlptracehttp.WithEndpoint, otlptracehttp.WithInsecure, otlptracehttp.WithHeaders) -} - -func buildHTTPOptions[T any]( - opts otlpHTTPOptions, - withEndpoint func(string) T, - withInsecure func() T, - withHeaders func(map[string]string) T, -) []T { - if opts.endpoint == "" { - return nil - } - - options := []T{withEndpoint(opts.endpoint)} - - if opts.insecure { - options = append(options, withInsecure()) - } - - if len(opts.headers) > 0 { - options = append(options, withHeaders(opts.headers)) - } - - return options -} diff --git a/pkg/telemetry/provider.go b/pkg/telemetry/provider.go deleted file mode 100644 index a9340b77..00000000 --- a/pkg/telemetry/provider.go +++ /dev/null @@ -1,94 +0,0 @@ -package telemetry - -import ( - "context" - "fmt" - "time" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" - "go.opentelemetry.io/otel/metric" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - sdkresource "go.opentelemetry.io/otel/sdk/resource" -) - -type Provider struct { - provider *sdkmetric.MeterProvider - meter metric.Meter -} - -type Config struct { - ServiceName string - ServiceVersion string - InstanceID string - Environment string - Endpoint string - PublishInterval time.Duration - TracesEndpoint string - endpoint string - insecure bool - headers map[string]string -} - -func newProvider(ctx context.Context, cfg Config, res *sdkresource.Resource, opts []otlpmetrichttp.Option) (*Provider, error) { - if len(opts) == 0 { - return nil, fmt.Errorf("metrics endpoint is required") - } - - // Expand the default server duration histogram so long uploads/downloads are visible - - exporter, err := otlpmetrichttp.New(ctx, opts...) - if err != nil { - return nil, fmt.Errorf("failed to create metric exporter: %w", err) - } - - provider := sdkmetric.NewMeterProvider( - sdkmetric.WithReader( - sdkmetric.NewPeriodicReader(exporter, - sdkmetric.WithInterval(cfg.PublishInterval), - ), - ), - sdkmetric.WithResource(res), - sdkmetric.WithView( - sdkmetric.NewView( - sdkmetric.Instrument{Name: HTTPServerRequestDurationInstrument}, - sdkmetric.Stream{ - Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - Boundaries: HTTPServerDurationBounds, - }, - }, - ), - sdkmetric.NewView( - sdkmetric.Instrument{Name: HTTPServerRequestSizeInstrument}, - sdkmetric.Stream{ - Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - Boundaries: SizeBoundaries, - }, - }, - ), - sdkmetric.NewView( - sdkmetric.Instrument{Name: HTTPServerResponseSizeInstrument}, - sdkmetric.Stream{ - Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - Boundaries: SizeBoundaries, - }, - }, - ), - ), - ) - - otel.SetMeterProvider(provider) - - return &Provider{ - provider: provider, - meter: provider.Meter(cfg.ServiceName), - }, nil -} - -func (p *Provider) Meter() metric.Meter { - return p.meter -} - -func (p *Provider) Shutdown(ctx context.Context) error { - return p.provider.Shutdown(ctx) -} diff --git a/pkg/telemetry/resource.go b/pkg/telemetry/resource.go deleted file mode 100644 index 316873e6..00000000 --- a/pkg/telemetry/resource.go +++ /dev/null @@ -1,29 +0,0 @@ -package telemetry - -import ( - "context" - "fmt" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.37.0" -) - -func newResource(ctx context.Context, cfg Config) (*resource.Resource, error) { - attrs := []attribute.KeyValue{ - semconv.ServiceNameKey.String(cfg.ServiceName), // e.g "piri" - semconv.ServiceVersionKey.String(cfg.ServiceVersion), // e.g. "v0.1.0" - semconv.ServiceInstanceIDKey.String(cfg.InstanceID), // nodes DID - semconv.ServerAddressKey.String(cfg.Endpoint), // e.g. https://spicystorage.tech (endpoint as advertised to network - attribute.String("deployment.environment", cfg.Environment), // i.e. "staging", "production", etc. - } - - res, err := resource.New(ctx, - resource.WithAttributes(attrs...), - ) - if err != nil { - return nil, fmt.Errorf("failed to create resource: %w", err) - } - - return res, nil -} diff --git a/pkg/telemetry/setup.go b/pkg/telemetry/setup.go new file mode 100644 index 00000000..3a8240e0 --- /dev/null +++ b/pkg/telemetry/setup.go @@ -0,0 +1,153 @@ +package telemetry + +import ( + "context" + "time" + + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconvhttp "go.opentelemetry.io/otel/semconv/v1.37.0/httpconv" + + "github.com/storacha/piri/lib/telemetry" + "github.com/storacha/piri/lib/telemetry/metrics" + "github.com/storacha/piri/lib/telemetry/traces" + "github.com/storacha/piri/pkg/build" +) + +const ( + defaultEndpoint = "telemetry.storacha.network:443" + defaultPublishInterval = 30 * time.Second +) + +func Setup(ctx context.Context, network string, id string) (*telemetry.Telemetry, error) { + return telemetry.New( + ctx, + network, + "piri", + build.Version, + id, + metrics.Config{ + Collectors: []metrics.CollectorConfig{ + { + Endpoint: defaultEndpoint, + PublishInterval: defaultPublishInterval, + }, + }, + Options: []sdkmetric.Option{ + sdkmetric.WithView( + // custom views for http metics with more buckets for histograms + DefaultHTTPServicerRequestDurationView, + DefaultHTTPServerRequestBodySizeView, + DefaultHTTPServerResponseBodySizeView, + ), + }, + }, + traces.Config{ + Collectors: []traces.CollectorConfig{ + { + Endpoint: defaultEndpoint, + PublishInterval: defaultPublishInterval, + }, + }, + Options: []sdktrace.TracerProviderOption{ + // Only sample when there is a parent trace; never start local roots. + sdktrace.WithSampler( + sdktrace.ParentBased(sdktrace.NeverSample()), + ), + }, + }, + ) +} + +var HTTPServerDurationBounds = []float64{ + (5 * time.Millisecond).Seconds(), + (10 * time.Millisecond).Seconds(), + (100 * time.Millisecond).Seconds(), + (time.Second).Seconds(), + (3 * time.Second).Seconds(), + (5 * time.Second).Seconds(), + (10 * time.Second).Seconds(), + (30 * time.Second).Seconds(), + (time.Minute).Seconds(), + (2 * time.Minute).Seconds(), + (3 * time.Minute).Seconds(), + (5 * time.Minute).Seconds(), + (6 * time.Minute).Seconds(), + (7 * time.Minute).Seconds(), + (8 * time.Minute).Seconds(), + (9 * time.Minute).Seconds(), + (10 * time.Minute).Seconds(), +} + +const ( + KiB float64 = 1024 + MiB = KiB * 1024 + GiB = MiB * 1024 +) + +var SizeBoundaries = []float64{ + // Explicit histogram buckets for request/response body sizes (bytes), up to 1 GiB. + KiB, + 2 * KiB, + 4 * KiB, + 8 * KiB, + 16 * KiB, + 32 * KiB, + 64 * KiB, + 128 * KiB, + 256 * KiB, + 512 * KiB, + MiB, + 2 * MiB, + 4 * MiB, + 8 * MiB, + 16 * MiB, + 32 * MiB, + 64 * MiB, + 128 * MiB, + 256 * MiB, + 512 * MiB, + GiB, +} + +var ( + DefaultHTTPServicerRequestDurationView = sdkmetric.NewView( + sdkmetric.Instrument{ + Name: semconvhttp.ServerRequestDuration{}.Name(), + Description: semconvhttp.ServerRequestDuration{}.Description(), + Kind: sdkmetric.InstrumentKindHistogram, + Unit: semconvhttp.ServerRequestDuration{}.Unit(), + }, + sdkmetric.Stream{ + Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: HTTPServerDurationBounds, + }, + }, + ) + DefaultHTTPServerRequestBodySizeView = sdkmetric.NewView( + sdkmetric.Instrument{ + Name: semconvhttp.ServerRequestBodySize{}.Name(), + Description: semconvhttp.ServerRequestBodySize{}.Description(), + Kind: sdkmetric.InstrumentKindHistogram, + Unit: semconvhttp.ServerRequestBodySize{}.Unit(), + }, + sdkmetric.Stream{ + Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: SizeBoundaries, + }, + }, + ) + DefaultHTTPServerResponseBodySizeView = sdkmetric.NewView( + sdkmetric.Instrument{ + Name: semconvhttp.ServerResponseBodySize{}.Name(), + Description: semconvhttp.ServerResponseBodySize{}.Description(), + Kind: sdkmetric.InstrumentKindHistogram, + Unit: semconvhttp.ServerResponseBodySize{}.Unit(), + }, + sdkmetric.Stream{ + Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: SizeBoundaries, + }, + }, + ) +) diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go deleted file mode 100644 index f6bda353..00000000 --- a/pkg/telemetry/telemetry.go +++ /dev/null @@ -1,181 +0,0 @@ -package telemetry - -import ( - "context" - "errors" - "fmt" - "os" - "time" - - logging "github.com/ipfs/go-log/v2" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/sdk/trace" -) - -var log = logging.Logger("telemetry") - -const ( - defaultEndpoint = "telemetry.storacha.network:443" - defaultPublishInterval = 30 * time.Second -) - -type Telemetry struct { - provider *Provider - traceProvider *trace.TracerProvider - meter metric.Meter -} - -func New(ctx context.Context, cfg Config) (*Telemetry, error) { - // collector endpoint and environment will be hard-coded for now - cfg.endpoint = defaultEndpoint - if cfg.PublishInterval == 0 { - cfg.PublishInterval = defaultPublishInterval - } - - // tracing is off by default, only enable if set - if cfg.TracesEndpoint == "" { - if os.Getenv("PIRI_TRACING_ENABLED") != "" { - cfg.TracesEndpoint = cfg.endpoint - } - } - - res, err := newResource(ctx, cfg) - if err != nil { - return nil, fmt.Errorf("failed to create telemetry resource: %w", err) - } - - metricOpts := newOTLPHTTPOptions(cfg.endpoint, cfg.insecure, cfg.headers).metricOptions() - provider, err := newProvider(ctx, cfg, res, metricOpts) - if err != nil { - return nil, fmt.Errorf("failed to create telemetry provider: %w", err) - } - - t := &Telemetry{ - provider: provider, - meter: provider.Meter(), - } - - traceOpts := newOTLPHTTPOptions(cfg.TracesEndpoint, cfg.insecure, cfg.headers).traceOptions() - if len(traceOpts) > 0 { - traceProvider, err := newTraceProvider(ctx, res, traceOpts) - if err != nil { - return nil, fmt.Errorf("failed to create trace provider: %w", err) - } - t.traceProvider = traceProvider - } - return t, nil -} - -// NewWithMeter creates a new Telemetry instance with a custom meter. -// This is useful for testing with in-memory exporters or manual readers. -func NewWithMeter(meter metric.Meter) *Telemetry { - return &Telemetry{ - meter: meter, - } -} - -func (t *Telemetry) Meter() metric.Meter { - return t.meter -} - -func (t *Telemetry) NewCounter(cfg CounterConfig) (*Counter, error) { - return NewCounter(t.meter, cfg) -} - -func (t *Telemetry) NewFloatCounter(cfg FloatCounterConfig) (*FloatCounter, error) { - return NewFloatCounter(t.meter, cfg) -} - -func (t *Telemetry) NewGauge(cfg GaugeConfig) (*Gauge, error) { - return NewGauge(t.meter, cfg) -} - -func (t *Telemetry) NewFloatGauge(cfg FloatGaugeConfig) (*FloatGauge, error) { - return NewFloatGauge(t.meter, cfg) -} - -func (t *Telemetry) NewTimer(cfg TimerConfig) (*Timer, error) { - return NewTimer(t.meter, cfg) -} - -func (t *Telemetry) NewHistogram(cfg HistogramConfig) (*Histogram, error) { - return NewHistogram(t.meter, cfg) -} - -func (t *Telemetry) NewInfo(cfg InfoConfig) (*Info, error) { - return NewInfo(t.meter, cfg) -} - -func (t *Telemetry) Shutdown(ctx context.Context) error { - var errs []error - - if t.provider != nil { - if err := t.provider.Shutdown(ctx); err != nil { - errs = append(errs, err) - } - } - - if t.traceProvider != nil { - if err := t.traceProvider.Shutdown(ctx); err != nil { - errs = append(errs, err) - } - } - - if len(errs) > 0 { - return errors.Join(errs...) - } - - return nil -} - -func StringAttr(key, value string) attribute.KeyValue { - return attribute.String(key, value) -} - -func IntAttr(key string, value int) attribute.KeyValue { - return attribute.Int(key, value) -} - -func Int64Attr(key string, value int64) attribute.KeyValue { - return attribute.Int64(key, value) -} - -func FloatAttr(key string, value float64) attribute.KeyValue { - return attribute.Float64(key, value) -} - -func BoolAttr(key string, value bool) attribute.KeyValue { - return attribute.Bool(key, value) -} - -const ( - KiB float64 = 1024 - MiB = KiB * 1024 - GiB = MiB * 1024 -) - -var SizeBoundaries = []float64{ - // Explicit histogram buckets for request/response body sizes (bytes), up to 1 GiB. - KiB, - 2 * KiB, - 4 * KiB, - 8 * KiB, - 16 * KiB, - 32 * KiB, - 64 * KiB, - 128 * KiB, - 256 * KiB, - 512 * KiB, - MiB, - 2 * MiB, - 4 * MiB, - 8 * MiB, - 16 * MiB, - 32 * MiB, - 64 * MiB, - 128 * MiB, - 256 * MiB, - 512 * MiB, - GiB, -} diff --git a/pkg/telemetry/telemetry_test.go b/pkg/telemetry/telemetry_test.go deleted file mode 100644 index fafb326b..00000000 --- a/pkg/telemetry/telemetry_test.go +++ /dev/null @@ -1,541 +0,0 @@ -package telemetry_test - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/resource" - - "github.com/storacha/piri/pkg/telemetry" -) - -func TestMetricsWithManualReader(t *testing.T) { - ctx := context.Background() - reader := metric.NewManualReader() - - // Create a meter provider with manual reader - res, err := resource.New(ctx, - resource.WithAttributes( - attribute.String("service.name", "test-service"), - ), - ) - require.NoError(t, err) - - provider := metric.NewMeterProvider( - metric.WithReader(reader), - metric.WithResource(res), - ) - defer func() { - err := provider.Shutdown(ctx) - require.NoError(t, err) - }() - - // Create telemetry with custom meter - tel := telemetry.NewWithMeter(provider.Meter("test-service")) - defer tel.Shutdown(ctx) - - t.Run("Counter", func(t *testing.T) { - counter, err := tel.NewCounter(telemetry.CounterConfig{ - Name: "test_counter", - Description: "Test counter", - }) - require.NoError(t, err) - - // Record some values - counter.Add(ctx, 5) - counter.Inc(ctx) - counter.Add(ctx, 10, telemetry.StringAttr("method", "GET")) - - // Collect metrics - rm := metricdata.ResourceMetrics{} - err = reader.Collect(ctx, &rm) - require.NoError(t, err) - - // Find our metric - var testMetric *metricdata.Metrics - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == "test_counter" { - testMetric = &m - break - } - } - } - require.NotNil(t, testMetric) - assert.Equal(t, "Test counter", testMetric.Description) - - // Check the data - sum, ok := testMetric.Data.(metricdata.Sum[int64]) - assert.True(t, ok) - assert.True(t, sum.IsMonotonic) - assert.GreaterOrEqual(t, len(sum.DataPoints), 1) - - // Check total value - var total int64 - for _, dp := range sum.DataPoints { - total += dp.Value - } - assert.Equal(t, int64(16), total) // 5 + 1 + 10 - }) - - t.Run("Gauge", func(t *testing.T) { - gauge, err := tel.NewGauge(telemetry.GaugeConfig{ - Name: "test_gauge", - Description: "Test gauge", - Unit: "connections", - }) - require.NoError(t, err) - - // Record some values - gauge.Record(ctx, 100) - gauge.Record(ctx, 50) - gauge.Record(ctx, 75, telemetry.StringAttr("pool", "primary")) - - // Collect metrics - rm := metricdata.ResourceMetrics{} - err = reader.Collect(ctx, &rm) - require.NoError(t, err) - - // Find our metric - var testMetric *metricdata.Metrics - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == "test_gauge" { - testMetric = &m - break - } - } - } - require.NotNil(t, testMetric) - assert.Equal(t, "connections", testMetric.Unit) - - // Check the data - gaugeData, ok := testMetric.Data.(metricdata.Gauge[int64]) - assert.True(t, ok) - assert.GreaterOrEqual(t, len(gaugeData.DataPoints), 1) - - // Check that we have the expected values - hasDefault := false - hasPrimary := false - for _, dp := range gaugeData.DataPoints { - attrs := dp.Attributes.ToSlice() - if len(attrs) == 0 { - hasDefault = true - assert.Equal(t, int64(50), dp.Value) // Last value without attributes - } else { - for _, attr := range attrs { - if attr.Key == "pool" && attr.Value.AsString() == "primary" { - hasPrimary = true - assert.Equal(t, int64(75), dp.Value) - } - } - } - } - assert.True(t, hasDefault || hasPrimary) - }) - - t.Run("Timer", func(t *testing.T) { - timer, err := tel.NewTimer(telemetry.TimerConfig{ - Name: "test_timer", - Description: "Test timer", - Unit: "ms", - Boundaries: []float64{10, 50, 100, 500, 1000}, - }) - require.NoError(t, err) - - // Record some durations - timer.Record(ctx, 25*time.Millisecond) - timer.Record(ctx, 150*time.Millisecond) - timer.RecordFloat(ctx, 75.5) - - // Collect metrics - rm := metricdata.ResourceMetrics{} - err = reader.Collect(ctx, &rm) - require.NoError(t, err) - - // Find our metric - var testMetric *metricdata.Metrics - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == "test_timer" { - testMetric = &m - break - } - } - } - require.NotNil(t, testMetric) - assert.Equal(t, "ms", testMetric.Unit) - - // Check the data - hist, ok := testMetric.Data.(metricdata.Histogram[float64]) - assert.True(t, ok) - assert.GreaterOrEqual(t, len(hist.DataPoints), 1) - - // Check aggregated data - var totalCount uint64 - var totalSum float64 - for _, dp := range hist.DataPoints { - totalCount += dp.Count - totalSum += dp.Sum - } - assert.Equal(t, uint64(3), totalCount) // 3 recordings - assert.Equal(t, 250.5, totalSum) // 25 + 150 + 75.5 - }) - - t.Run("TimedContext", func(t *testing.T) { - timer, err := tel.NewTimer(telemetry.TimerConfig{ - Name: "test_timed_operation", - }) - require.NoError(t, err) - - // Time an operation - operation := timer.Start(ctx, telemetry.StringAttr("operation", "test")) - time.Sleep(10 * time.Millisecond) - operation.End(telemetry.BoolAttr("success", true)) - - // Collect metrics - rm := metricdata.ResourceMetrics{} - err = reader.Collect(ctx, &rm) - require.NoError(t, err) - - // Find our metric - var testMetric *metricdata.Metrics - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == "test_timed_operation" { - testMetric = &m - break - } - } - } - require.NotNil(t, testMetric) - - hist, ok := testMetric.Data.(metricdata.Histogram[float64]) - assert.True(t, ok) - assert.GreaterOrEqual(t, len(hist.DataPoints), 1) - - // Check that timing was recorded - var found bool - for _, dp := range hist.DataPoints { - if dp.Count > 0 { - found = true - assert.GreaterOrEqual(t, dp.Sum, float64(10)) // At least 10ms - - // Check attributes - attrs := dp.Attributes.ToSlice() - hasOperation := false - hasSuccess := false - for _, attr := range attrs { - if attr.Key == "operation" && attr.Value.AsString() == "test" { - hasOperation = true - } - if attr.Key == "success" && attr.Value.AsBool() == true { - hasSuccess = true - } - } - assert.True(t, hasOperation) - assert.True(t, hasSuccess) - } - } - assert.True(t, found) - }) -} - -func TestFloatMetrics(t *testing.T) { - ctx := context.Background() - reader := metric.NewManualReader() - - res, err := resource.New(ctx, - resource.WithAttributes( - attribute.String("service.name", "test-service"), - ), - ) - require.NoError(t, err) - - provider := metric.NewMeterProvider( - metric.WithReader(reader), - metric.WithResource(res), - ) - defer provider.Shutdown(ctx) - - tel := telemetry.NewWithMeter(provider.Meter("test-service")) - - t.Run("FloatCounter", func(t *testing.T) { - counter, err := tel.NewFloatCounter(telemetry.FloatCounterConfig{ - Name: "test_float_counter", - Description: "Test float counter", - Unit: "seconds", - }) - require.NoError(t, err) - - // Record some values - counter.Add(ctx, 3.14) - counter.Add(ctx, 2.86) - - // Collect metrics - rm := metricdata.ResourceMetrics{} - err = reader.Collect(ctx, &rm) - require.NoError(t, err) - - // Verify the metric exists and has correct sum - found := false - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == "test_float_counter" { - found = true - sum, ok := m.Data.(metricdata.Sum[float64]) - assert.True(t, ok) - assert.True(t, sum.IsMonotonic) - - var total float64 - for _, dp := range sum.DataPoints { - total += dp.Value - } - assert.InDelta(t, 6.0, total, 0.01) // 3.14 + 2.86 - } - } - } - assert.True(t, found) - }) - - t.Run("FloatGauge", func(t *testing.T) { - gauge, err := tel.NewFloatGauge(telemetry.FloatGaugeConfig{ - Name: "test_float_gauge", - Description: "Test float gauge", - Unit: "%", - }) - require.NoError(t, err) - - // Record some values - gauge.Record(ctx, 75.5) - gauge.Record(ctx, 82.3) - - // Collect metrics - rm := metricdata.ResourceMetrics{} - err = reader.Collect(ctx, &rm) - require.NoError(t, err) - - // Verify the metric exists - found := false - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == "test_float_gauge" { - found = true - gaugeData, ok := m.Data.(metricdata.Gauge[float64]) - assert.True(t, ok) - assert.GreaterOrEqual(t, len(gaugeData.DataPoints), 1) - - // Should have the last recorded value - for _, dp := range gaugeData.DataPoints { - assert.Equal(t, 82.3, dp.Value) - } - } - } - } - assert.True(t, found) - }) -} - -func TestHistogramMetric(t *testing.T) { - ctx := context.Background() - reader := metric.NewManualReader() - - res, err := resource.New(ctx, - resource.WithAttributes( - attribute.String("service.name", "test-service"), - ), - ) - require.NoError(t, err) - - provider := metric.NewMeterProvider( - metric.WithReader(reader), - metric.WithResource(res), - ) - defer provider.Shutdown(ctx) - - tel := telemetry.NewWithMeter(provider.Meter("test-service")) - - histogram, err := tel.NewHistogram(telemetry.HistogramConfig{ - Name: "test_histogram", - Description: "Test histogram", - Unit: "bytes", - Boundaries: []float64{1024, 2048, 4096, 8192}, - }) - require.NoError(t, err) - - // Record some values - histogram.Record(ctx, 512) // bucket 0 - histogram.Record(ctx, 1536) // bucket 1 - histogram.Record(ctx, 3072) // bucket 2 - histogram.Record(ctx, 10240) // bucket 4 (overflow) - - // Collect metrics - rm := metricdata.ResourceMetrics{} - err = reader.Collect(ctx, &rm) - require.NoError(t, err) - - // Find and verify the metric - found := false - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == "test_histogram" { - found = true - hist, ok := m.Data.(metricdata.Histogram[int64]) - assert.True(t, ok) - assert.GreaterOrEqual(t, len(hist.DataPoints), 1) - - var totalCount uint64 - var totalSum int64 - for _, dp := range hist.DataPoints { - totalCount += dp.Count - totalSum += dp.Sum - } - assert.Equal(t, uint64(4), totalCount) - assert.Equal(t, int64(15360), totalSum) // 512 + 1536 + 3072 + 10240 - } - } - } - assert.True(t, found) -} - -func TestInfo(t *testing.T) { - ctx := context.Background() - reader := metric.NewManualReader() - - res, err := resource.New(ctx, - resource.WithAttributes( - attribute.String("service.name", "test-service"), - ), - ) - require.NoError(t, err) - - provider := metric.NewMeterProvider( - metric.WithReader(reader), - metric.WithResource(res), - ) - defer provider.Shutdown(ctx) - - tel := telemetry.NewWithMeter(provider.Meter("test-service")) - - t.Run("Info metric always records 1", func(t *testing.T) { - info, err := tel.NewInfo(telemetry.InfoConfig{ - Name: "test_info", - Description: "Test info metric", - Labels: map[string]string{ - "version": "v1.0.0", - "commit": "abc123", - }, - }) - require.NoError(t, err) - - // Record the info - info.Record(ctx) - - // Collect metrics - rm := metricdata.ResourceMetrics{} - err = reader.Collect(ctx, &rm) - require.NoError(t, err) - - // Find and verify the metric - found := false - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == "test_info" { - found = true - gauge, ok := m.Data.(metricdata.Gauge[int64]) - assert.True(t, ok) - assert.Len(t, gauge.DataPoints, 1) - - dp := gauge.DataPoints[0] - assert.Equal(t, int64(1), dp.Value) - - // Check labels - attrs := dp.Attributes.ToSlice() - assert.Len(t, attrs, 2) - - hasVersion := false - hasCommit := false - for _, attr := range attrs { - if attr.Key == "version" && attr.Value.AsString() == "v1.0.0" { - hasVersion = true - } - if attr.Key == "commit" && attr.Value.AsString() == "abc123" { - hasCommit = true - } - } - assert.True(t, hasVersion) - assert.True(t, hasCommit) - } - } - } - assert.True(t, found) - }) - - t.Run("Info metric update", func(t *testing.T) { - info, err := tel.NewInfo(telemetry.InfoConfig{ - Name: "test_info_update", - Description: "Test info metric update", - Labels: map[string]string{ - "address": "0x1234", - "network": "mainnet", - }, - }) - require.NoError(t, err) - - // Record initial values - info.Record(ctx) - - // Update with new values - info.Record(ctx, telemetry.StringAttr("address", "0x5678")) - - // Collect metrics - rm := metricdata.ResourceMetrics{} - err = reader.Collect(ctx, &rm) - require.NoError(t, err) - - // Verify updated values - found := false - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - if m.Name == "test_info_update" { - found = true - gauge, ok := m.Data.(metricdata.Gauge[int64]) - assert.True(t, ok) - - // Find the data point with updated values - foundUpdated := false - for _, dp := range gauge.DataPoints { - assert.Equal(t, int64(1), dp.Value) - - // Check if this is the updated data point - attrs := dp.Attributes.ToSlice() - addressCorrect := false - networkCorrect := false - - for _, attr := range attrs { - if attr.Key == "address" && attr.Value.AsString() == "0x5678" { - addressCorrect = true - } - if attr.Key == "network" && attr.Value.AsString() == "mainnet" { - networkCorrect = true - } - } - - if addressCorrect && networkCorrect { - foundUpdated = true - break - } - } - assert.True(t, foundUpdated, "Updated data point not found") - } - } - } - assert.True(t, found) - }) -} diff --git a/pkg/telemetry/timer.go b/pkg/telemetry/timer.go deleted file mode 100644 index 60028e9b..00000000 --- a/pkg/telemetry/timer.go +++ /dev/null @@ -1,146 +0,0 @@ -package telemetry - -import ( - "context" - "fmt" - "time" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" -) - -type Timer struct { - histogram metric.Float64Histogram - attrs []attribute.KeyValue -} - -type TimerConfig struct { - Name string - Description string - Unit string - Attributes map[string]string - Boundaries []float64 -} - -func NewTimer(meter metric.Meter, cfg TimerConfig) (*Timer, error) { - opts := []metric.Float64HistogramOption{ - metric.WithDescription(cfg.Description), - } - - if cfg.Unit == "" { - cfg.Unit = "ms" - } - opts = append(opts, metric.WithUnit(cfg.Unit)) - - if len(cfg.Boundaries) > 0 { - opts = append(opts, metric.WithExplicitBucketBoundaries(cfg.Boundaries...)) - } - - histogram, err := meter.Float64Histogram(cfg.Name, opts...) - if err != nil { - return nil, fmt.Errorf("failed to create timer %s: %w", cfg.Name, err) - } - - attrs := make([]attribute.KeyValue, 0, len(cfg.Attributes)) - for k, v := range cfg.Attributes { - attrs = append(attrs, attribute.String(k, v)) - } - - return &Timer{ - histogram: histogram, - attrs: attrs, - }, nil -} - -func (t *Timer) Record(ctx context.Context, duration time.Duration, attrs ...attribute.KeyValue) { - allAttrs := append(t.attrs, attrs...) - t.histogram.Record(ctx, float64(duration.Milliseconds()), metric.WithAttributes(allAttrs...)) -} - -func (t *Timer) RecordFloat(ctx context.Context, value float64, attrs ...attribute.KeyValue) { - allAttrs := append(t.attrs, attrs...) - t.histogram.Record(ctx, value, metric.WithAttributes(allAttrs...)) -} - -func (t *Timer) WithAttributes(attrs ...attribute.KeyValue) *Timer { - return &Timer{ - histogram: t.histogram, - attrs: append(t.attrs, attrs...), - } -} - -type TimedContext struct { - ctx context.Context - timer *Timer - startTime time.Time - attrs []attribute.KeyValue -} - -func (t *Timer) Start(ctx context.Context, attrs ...attribute.KeyValue) *TimedContext { - return &TimedContext{ - ctx: ctx, - timer: t, - startTime: time.Now(), - attrs: attrs, - } -} - -func (tc *TimedContext) End(attrs ...attribute.KeyValue) { - duration := time.Since(tc.startTime) - allAttrs := append(tc.attrs, attrs...) - tc.timer.Record(tc.ctx, duration, allAttrs...) -} - -type Histogram struct { - histogram metric.Int64Histogram - attrs []attribute.KeyValue -} - -type HistogramConfig struct { - Name string - Description string - Unit string - Attributes map[string]string - Boundaries []float64 -} - -func NewHistogram(meter metric.Meter, cfg HistogramConfig) (*Histogram, error) { - opts := []metric.Int64HistogramOption{ - metric.WithDescription(cfg.Description), - } - - if cfg.Unit != "" { - opts = append(opts, metric.WithUnit(cfg.Unit)) - } - - if len(cfg.Boundaries) > 0 { - opts = append(opts, metric.WithExplicitBucketBoundaries(cfg.Boundaries...)) - } - - histogram, err := meter.Int64Histogram(cfg.Name, opts...) - if err != nil { - return nil, fmt.Errorf("failed to create histogram %s: %w", cfg.Name, err) - } - - attrs := make([]attribute.KeyValue, 0, len(cfg.Attributes)) - for k, v := range cfg.Attributes { - attrs = append(attrs, attribute.String(k, v)) - } - - return &Histogram{ - histogram: histogram, - attrs: attrs, - }, nil -} - -func (h *Histogram) Record(ctx context.Context, value int64, attrs ...attribute.KeyValue) { - allAttrs := append(h.attrs, attrs...) - h.histogram.Record(ctx, value, metric.WithAttributes(allAttrs...)) -} - -func (h *Histogram) WithAttributes(attrs ...attribute.KeyValue) *Histogram { - return &Histogram{ - histogram: h.histogram, - attrs: append(h.attrs, attrs...), - } -} diff --git a/pkg/telemetry/tracer.go b/pkg/telemetry/tracer.go deleted file mode 100644 index 6abcddef..00000000 --- a/pkg/telemetry/tracer.go +++ /dev/null @@ -1,39 +0,0 @@ -package telemetry - -import ( - "context" - "fmt" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" - "go.opentelemetry.io/otel/propagation" - sdkresource "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" -) - -func newTraceProvider(ctx context.Context, res *sdkresource.Resource, opts []otlptracehttp.Option) (*sdktrace.TracerProvider, error) { - if len(opts) == 0 { - return nil, fmt.Errorf("trace endpoint is required") - } - - exporter, err := otlptracehttp.New(ctx, opts...) - if err != nil { - return nil, fmt.Errorf("failed to create trace exporter: %w", err) - } - - tp := sdktrace.NewTracerProvider( - sdktrace.WithBatcher(exporter), - sdktrace.WithResource(res), - sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.NeverSample())), - ) - - otel.SetTracerProvider(tp) - otel.SetTextMapPropagator( - propagation.NewCompositeTextMapPropagator( - propagation.TraceContext{}, - propagation.Baggage{}, - ), - ) - - return tp, nil -}