Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 47 additions & 24 deletions cmd/cli/serve/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/storacha/go-ucanto/did"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"

"go.uber.org/fx"
"go.uber.org/fx/fxevent"
"go.uber.org/zap/zapcore"

"github.com/storacha/piri/cmd/cli/setup"
"github.com/storacha/piri/pkg/build"

"github.com/storacha/piri/cmd/cliutil"
"github.com/storacha/piri/pkg/config"
"github.com/storacha/piri/pkg/fx/app"
appconfig "github.com/storacha/piri/pkg/config/app"
fxapp "github.com/storacha/piri/pkg/fx/app"
"github.com/storacha/piri/pkg/presets"
"github.com/storacha/piri/pkg/telemetry"
)
Expand Down Expand Up @@ -320,14 +321,14 @@ func fullServer(cmd *cobra.Command, _ []string) error {
return fmt.Errorf("parsing config: %w", err)
}

if err := initTelemetry(cmd.Context(), telemetry.Config{
ServiceName: "piri",
ServiceVersion: build.Version,
Environment: userCfg.Network,
InstanceID: appCfg.Identity.Signer.DID().String(),
Endpoint: appCfg.Server.PublicURL.String(),
}); err != nil {
log.Warnf("failed to initialize telemetry: %s", err)
if err := initTelemetry(
cmd.Context(),
appCfg.Identity.Signer.DID().String(),
userCfg.Network,
appCfg.Storage.DataDir,
appCfg.Telemetry,
); err != nil {
return fmt.Errorf("initializing telemetry: %w", err)
}
// build our beloved Piri node
piri := fx.New(
Expand All @@ -348,13 +349,13 @@ func fullServer(cmd *cobra.Command, _ []string) error {
// - identity
// - http server
// - databases & datastores
app.CommonModules(appCfg),
fxapp.CommonModules(appCfg),

// ucan service dependencies:
// - http handlers
// - ucan specific handlers, blob allocate and accept, replicate, etc.
// - blob, claim, publisher, replicator, and storage services
app.UCANModule,
fxapp.UCANModule,

// pdp service dependencies:
// - lotus, eth, and contract clients
Expand All @@ -363,7 +364,7 @@ func fullServer(cmd *cobra.Command, _ []string) error {
// - http handlers
// - create proof set, add root, upload piece, etc.
// - address wallet
app.PDPModule,
fxapp.PDPModule,

// Post-startup operations: print server info and record telemetry
fx.Invoke(func(lc fx.Lifecycle) {
Expand All @@ -374,13 +375,18 @@ func fullServer(cmd *cobra.Command, _ []string) error {
cmd.Println("Piri Running on: " + appCfg.Server.Host + ":" + strconv.Itoa(int(appCfg.Server.Port)))
cmd.Println("Piri Public Endpoint: " + appCfg.Server.PublicURL.String())

// Record server telemetry
telemetry.RecordServerInfo(ctx, "full",
telemetry.StringAttr("did", appCfg.Identity.Signer.DID().String()),
telemetry.StringAttr("owner_address", appCfg.PDPService.OwnerAddress.String()),
telemetry.StringAttr("public_url", appCfg.Server.PublicURL.String()),
telemetry.Int64Attr("proof_set", int64(appCfg.UCANService.ProofSetID)),
)
// Record server metadata
if err := telemetry.RecordServerInfo(otel.GetMeterProvider().Meter("github."+
"com/storacha/piri/cli/serve"),
ctx,
"full",
attribute.String("did", appCfg.Identity.Signer.DID().String()),
attribute.String("owner_address", appCfg.PDPService.OwnerAddress.String()),
attribute.String("public_url", appCfg.Server.PublicURL.String()),
attribute.Int64("proof_set", int64(appCfg.UCANService.ProofSetID)),
); err != nil {
log.Warnw("Failed to record server info", "error", err)
}
return nil
},
OnStop: func(ctx context.Context) error {
Expand All @@ -403,10 +409,27 @@ func fullServer(cmd *cobra.Command, _ []string) error {
return nil
}

func initTelemetry(ctx context.Context, cfg telemetry.Config) error {
// bail if this has been disabled.
func initTelemetry(ctx context.Context, instanceID, network string, dataDir string, cfg appconfig.TelemetryConfig) error {
// bail if this has been disabled globally.
// backwards compatible env var
if os.Getenv("PIRI_DISABLE_ANALYTICS") != "" {
return nil
}
return telemetry.Initialize(ctx, cfg)
if cfg.DisableStorachaAnalytics {
return nil
}

t, err := telemetry.Setup(ctx, network, instanceID)
if err != nil {
return fmt.Errorf("setting up telemetry: %w", err)
}

if err := telemetry.StartHostMetrics(
ctx,
t.Metrics.Meter("github.com/storacha/piri/cli/serve"),
dataDir,
); err != nil {
return fmt.Errorf("setting up telemetry host metrics: %w", err)
}
return nil
}
10 changes: 0 additions & 10 deletions cmd/cli/serve/ucan.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/storacha/piri/pkg/service/retrieval"
"github.com/storacha/piri/pkg/service/storage"
"github.com/storacha/piri/pkg/store/blobstore"
"github.com/storacha/piri/pkg/telemetry"
)

var (
Expand Down Expand Up @@ -310,15 +309,6 @@ func startServer(cmd *cobra.Command, _ []string) error {

defer storageSvc.Close(ctx)

telemetry.RecordServerInfo(ctx, "ucan",
telemetry.StringAttr("did", id.DID().String()),
telemetry.StringAttr("indexing_did", indexingServiceDID.String()),
telemetry.StringAttr("indexing_url", indexingServiceURL.String()),
telemetry.StringAttr("upload_did", uploadServiceDID.String()),
telemetry.StringAttr("upload_url", uploadServiceURL.String()),
telemetry.Int64Attr("proof_set", int64(cfg.UCANService.ProofSetID)),
)

errHandler := func(err ucanserver.HandlerExecutionError[any]) {
l := log.With("error", err.Error())
if s := err.Stack(); s != "" {
Expand Down
2 changes: 1 addition & 1 deletion deploy/full-node/deployment-types/multi-instance/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ module "piri_instances" {
service_pem_content = lookup(each.value, "service_pem_content", "")
wallet_hex_content = lookup(each.value, "wallet_hex_content", "")
operator_email = each.value.operator_email
use_letsencrypt_staging = var.environment != "production" && var.environment != "prod" && var.environment != "forge-prod"
use_letsencrypt_staging = false #var.environment != "production" && var.environment != "prod" && var.environment !="forge-prod"

tags = {
Owner = var.owner
Expand Down
10 changes: 9 additions & 1 deletion lib/jobqueue/jobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,15 @@ func New[T any](name string, db *sql.DB, ser serializer.Serializer[T], opts ...O
}

// instantiate worker which consumes from queue
w := worker.New[T](q, ser, worker.WithLog(c.Logger), worker.WithLimit(int(c.MaxWorkers)), worker.WithExtend(c.ExtendDelay))
w, err := worker.New[T](q, ser,
worker.WithLog(c.Logger),
worker.WithLimit(int(c.MaxWorkers)),
worker.WithExtend(c.ExtendDelay),
worker.WithQueueName(name),
)
if err != nil {
return nil, fmt.Errorf("failed to create worker: %w", err)
}

return &JobQueue[T]{
queue: q,
Expand Down
130 changes: 130 additions & 0 deletions lib/jobqueue/worker/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package worker

import (
"context"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"

"github.com/storacha/piri/lib/telemetry"
)

// jobDurationBounds covering 5ms up to 30 minutes.
var jobDurationBounds = []float64{
(5 * time.Millisecond).Seconds(),
(10 * time.Millisecond).Seconds(),
(100 * time.Millisecond).Seconds(),
(time.Second).Seconds(),
(3 * time.Second).Seconds(),
(5 * time.Second).Seconds(),
(10 * time.Second).Seconds(),
(30 * time.Second).Seconds(),
(time.Minute).Seconds(),
(2 * time.Minute).Seconds(),
(3 * time.Minute).Seconds(),
(5 * time.Minute).Seconds(),
(6 * time.Minute).Seconds(),
(7 * time.Minute).Seconds(),
(8 * time.Minute).Seconds(),
(9 * time.Minute).Seconds(),
(10 * time.Minute).Seconds(),
(30 * time.Minute).Seconds(),
}

type metricsRecorder struct {
activeJobs *telemetry.UpDownCounter
queuedJobs *telemetry.UpDownCounter
failedJobsCounter *telemetry.Counter
jobDurationTimer *telemetry.Timer
}

func newMetrics() (*metricsRecorder, error) {
meter := otel.GetMeterProvider().Meter("lib/jobqueue/worker")
activeJobs, err := telemetry.NewUpDownCounter(
meter,
"active_jobs",
"number of jobs running",
"1",
)
if err != nil {
return nil, err
}
queuedJobs, err := telemetry.NewUpDownCounter(
meter,
"queued_jobs",
"number of jobs queued (includes active)",
"1",
)
if err != nil {
return nil, err
}
failedJobs, err := telemetry.NewCounter(
meter,
"failed_jobs",
"number of jobs that failed permanently",
"1",
)
jobDuration, err := telemetry.NewTimer(
meter,
"job_duration",
"records duration of a jobs runtime",
jobDurationBounds,
)
return &metricsRecorder{
activeJobs: activeJobs,
queuedJobs: queuedJobs,
failedJobsCounter: failedJobs,
jobDurationTimer: jobDuration,
}, nil
}

func (m *metricsRecorder) recordQueuedDelta(ctx context.Context, queueName, jobName string, delta int64) {
if m == nil || m.queuedJobs == nil {
return
}
m.queuedJobs.Add(ctx, delta, attribute.String("queue", queueName), attribute.String("job", jobName))
}

func (m *metricsRecorder) recordActiveDelta(ctx context.Context, queueName, jobName string, delta int64) {
if m == nil || m.activeJobs == nil {
return
}
m.activeJobs.Add(ctx, delta, attribute.String("queue", queueName), attribute.String("job", jobName))
}

func (m *metricsRecorder) recordJobFailure(ctx context.Context, queueName, jobName, reason string, attempt int) {
if m == nil || m.failedJobsCounter == nil || queueName == "" || jobName == "" {
return
}

attrs := []attribute.KeyValue{
attribute.String("queue", queueName),
attribute.String("job", jobName),
}
if reason != "" {
attrs = append(attrs, attribute.String("failure_reason", reason))
}
if attempt > 0 {
attrs = append(attrs, attribute.Int("attempt", attempt))
}

m.failedJobsCounter.Inc(ctx, attrs...)
}

func (m *metricsRecorder) recordJobDuration(ctx context.Context, queueName, jobName, status string, attempt int, duration time.Duration) {
if m == nil || m.jobDurationTimer == nil || queueName == "" || jobName == "" {
return
}

attrs := []attribute.KeyValue{
attribute.String("queue", queueName),
attribute.String("job", jobName),
attribute.String("status", status),
}
if attempt > 0 {
attrs = append(attrs, attribute.Int("attempt", attempt))
}

m.jobDurationTimer.Record(ctx, duration, attrs...)
}
Loading