diff --git a/HACKING.md b/HACKING.md index 333b9234c..0632da2c2 100644 --- a/HACKING.md +++ b/HACKING.md @@ -1,9 +1,10 @@ ## git repo contents -Run with, eg, `go run ./cmd/bigsky`): +Run with, eg, `go run ./cmd/relay`): -- `cmd/bigsky`: Relay+indexer daemon +- `cmd/bigsky`: relay daemon +- `cmd/relay`: new (sync v1.1) relay daemon - `cmd/palomar`: search indexer and query servcie (OpenSearch) - `cmd/gosky`: client CLI for talking to a PDS - `cmd/lexgen`: codegen tool for lexicons (Lexicon JSON to Go package) diff --git a/Makefile b/Makefile index 2dfbb79dd..bff6fd451 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,7 @@ help: ## Print info about all commands build: ## Build all executables go build ./cmd/gosky go build ./cmd/bigsky + go build ./cmd/relay go build ./cmd/beemo go build ./cmd/lexgen go build ./cmd/stress diff --git a/README.md b/README.md index ff0d8ed3c..8836a45ec 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Some Bluesky software is developed in Typescript, and lives in the [bluesky-soci **Go Services:** -- **bigsky** ([README](./cmd/bigsky/README.md)): "Big Graph Service" (BGS) reference implementation, running at `bsky.network` +- **bigsky** ([README](./cmd/bigsky/README.md)): relay reference implementation, running at `bsky.network` - **palomar** ([README](./cmd/palomar/README.md)): fulltext search service for - **hepa** ([README](./cmd/hepa/README.md)): auto-moderation bot for [Ozone](https://ozone.tools) diff --git a/cmd/relay/README.md b/cmd/relay/README.md index ae6f3cb83..fc5a28338 100644 --- a/cmd/relay/README.md +++ b/cmd/relay/README.md @@ -2,13 +2,13 @@ atproto Relay Service =============================== -*NOTE: "Relays" used to be called "Big Graph Servers", or "BGS", or "bigsky". Many variables and packages still reference "bgs"* +*NOTE: "relays" used to be called "Big Graph Servers", or "BGS", or "bigsky". Many variables and packages still reference "bgs"* -This is the implementation of an atproto Relay which is running in the production network, written and operated by Bluesky. +This is the implementation of an atproto relay which is running in the production network, written and operated by Bluesky. -In atproto, a Relay subscribes to multiple PDS hosts and outputs a combined "firehose" event stream. Downstream services can subscribe to this single firehose a get all relevant events for the entire network, or a specific sub-graph of the network. The Relay maintains a mirror of repo data from all accounts on the upstream PDS instances, and verifies repo data structure integrity and identity signatures. It is agnostic to applications, and does not validate data against atproto Lexicon schemas. +In atproto, a relay subscribes to multiple PDS hosts and outputs a combined "firehose" event stream. Downstream services can subscribe to this single firehose a get all relevant events for the entire network, or a specific sub-graph of the network. The relay maintains a mirror of repo data from all accounts on the upstream PDS instances, and verifies repo data structure integrity and identity signatures. It is agnostic to applications, and does not validate data against atproto Lexicon schemas. -This Relay implementation is designed to subscribe to the entire global network. The current state of the codebase is informally expected to scale to around 50 million accounts in the network, and thousands of repo events per second (peak). +This relay implementation is designed to subscribe to the entire global network. The current state of the codebase is informally expected to scale to around 100 million accounts in the network, and tens of thousands of repo events per second (peak). Features and design decisions: @@ -20,16 +20,16 @@ Features and design decisions: - observability: logging, prometheus metrics, OTEL traces - admin web interface: configure limits, add upstream PDS instances, etc -This software is not as packaged, documented, and supported for self-hosting as our PDS distribution or Ozone service. But it is relatively simple and inexpensive to get running. +This software is not yet as packaged, documented, and supported for self-hosting as our PDS distribution or Ozone service. But it is relatively simple and inexpensive to get running. -A note and reminder about Relays in general are that they are more of a convenience in the protocol than a hard requirement. The "firehose" API is the exact same on the PDS and on a Relay. Any service which subscribes to the Relay could instead connect to one or more PDS instances directly. +A note and reminder about relays in general are that they are more of a convenience in the protocol than a hard requirement. The "firehose" API is the exact same on the PDS and on a relay. Any service which subscribes to the relay could instead connect to one or more PDS instances directly. ## Development Tips The README and Makefile at the top level of this git repo have some generic helpers for testing, linting, formatting code, etc. -To re-build and run the Relay locally: +To re-build and run the relay locally: make run-dev-relay @@ -37,18 +37,18 @@ You can re-build and run the command directly to get a list of configuration fla RELAY_ADMIN_KEY=localdev go run ./cmd/relay/ --help -By default, the daemon will use sqlite for databases (in the directory `./data/bigsky/`), CAR data will be stored as individual shard files in `./data/bigsky/carstore/`), and the HTTP API will be bound to localhost port 2470. +By default, the daemon will use sqlite for databases (in the directory `./data/relay/`) and the HTTP API will be bound to localhost port 2470. When the daemon isn't running, sqlite database files can be inspected with: - sqlite3 data/bigsky/bgs.sqlite + sqlite3 data/relay/relay.sqlite [...] sqlite> .schema Wipe all local data: # careful! double-check this destructive command - rm -rf ./data/bigsky/* + rm -rf ./data/relay/* There is a basic web dashboard, though it will not be included unless built and copied to a local directory `./public/`. Run `make build-relay-ui`, and then when running the daemon the dashboard will be available at: . Paste in the admin key, eg `localdev`. @@ -63,35 +63,31 @@ Request crawl of an individual PDS instance like: ## Docker Containers -One way to deploy is running a docker image. You can pull and/or run a specific version of bigsky, referenced by git commit, from the Bluesky Github container registry. For example: +One way to deploy is running a docker image. You can pull and/or run a specific version of relay, referenced by git commit, from the Bluesky Github container registry. For example: docker pull ghcr.io/bluesky-social/indigo:relay-fd66f93ce1412a3678a1dd3e6d53320b725978a6 docker run ghcr.io/bluesky-social/indigo:relay-fd66f93ce1412a3678a1dd3e6d53320b725978a6 -There is a Dockerfile in this directory, which can be used to build customized/patched versions of the Relay as a container, republish them, run locally, deploy to servers, deploy to an orchestrated cluster, etc. See docs and guides for docker and cluster management systems for details. +There is a Dockerfile in this directory, which can be used to build customized/patched versions of the relay as a container, republish them, run locally, deploy to servers, deploy to an orchestrated cluster, etc. See docs and guides for docker and cluster management systems for details. ## Database Setup -PostgreSQL and Sqlite are both supported. When using Sqlite, separate files are used for Relay metadata and CarStore metadata. With PostgreSQL a single database server, user, and logical database can all be reused: table names will not conflict. - -Database configuration is passed via the `DATABASE_URL` and `CARSTORE_DATABASE_URL` environment variables, or the corresponding CLI args. +PostgreSQL and Sqlite are both supported. Database configuration is passed via the `DATABASE_URL` environment variable, or the corresponding CLI arg. For PostgreSQL, the user and database must already be configured. Some example SQL commands are: - CREATE DATABASE bgs; - CREATE DATABASE carstore; + CREATE DATABASE relay; CREATE USER ${username} WITH PASSWORD '${password}'; - GRANT ALL PRIVILEGES ON DATABASE bgs TO ${username}; - GRANT ALL PRIVILEGES ON DATABASE carstore TO ${username}; + GRANT ALL PRIVILEGES ON DATABASE relay TO ${username}; This service currently uses `gorm` to automatically run database migrations as the regular user. There is no concept of running a separate set of migrations under more privileged database user. ## Deployment -*NOTE: this is not a complete guide to operating a Relay. There are decisions to be made and communicated about policies, bandwidth use, PDS crawling and rate-limits, financial sustainability, etc, which are not covered here. This is just a quick overview of how to technically get a relay up and running.* +*NOTE: this is not a complete guide to operating a relay. There are decisions to be made and communicated about policies, bandwidth use, PDS crawling and rate-limits, financial sustainability, etc, which are not covered here. This is just a quick overview of how to technically get a relay up and running.* In a real-world system, you will probably want to use PostgreSQL. @@ -99,22 +95,10 @@ Some notable configuration env vars to set: - `ENVIRONMENT`: eg, `production` - `DATABASE_URL`: see section below -- `DATA_DIR`: misc data will go in a subdirectory - `GOLOG_LOG_LEVEL`: log verbosity -- `RESOLVE_ADDRESS`: DNS server to use -- `FORCE_DNS_UDP`: recommend "true" There is a health check endpoint at `/xrpc/_health`. Prometheus metrics are exposed by default on port 2471, path `/metrics`. The service logs fairly verbosely to stderr; use `GOLOG_LOG_LEVEL` to control log volume. -As a rough guideline for the compute resources needed to run a full-network Relay, in June 2024 an example Relay for over 5 million repositories used: - -- roughly 1 TByte of disk for PostgreSQL -- roughly 1 TByte of disk for event playback buffer -- roughly 5k disk I/O operations per second (all combined) -- roughly 100% of one CPU core (quite low CPU utilization) -- roughly 5GB of RAM for `relay`, and as much RAM as available for PostgreSQL and page cache -- on the order of 1 megabit inbound bandwidth (crawling PDS instances) and 1 megabit outbound per connected client. 1 mbit continuous is approximately 350 GByte/month - Be sure to double-check bandwidth usage and pricing if running a public relay! Bandwidth prices can vary widely between providers, and popular cloud services (AWS, Google Cloud, Azure) are very expensive compared to alternatives like OVH or Hetzner. @@ -202,26 +186,6 @@ POST `{"did": "did:..."}` to take-down a bad repo; deletes all local data for th POST `?did={did:...}` to reverse a repo take-down -### /admin/repo/compact - -POST `?did={did:...}` to compact a repo. Optionally `&fast=true`. HTTP blocks until the compaction finishes. - -### /admin/repo/compactAll - -POST to begin compaction of all repos. Optional query params: - - * `fast=true` - * `limit={int}` maximum number of repos to compact (biggest first) (default 50) - * `threhsold={int}` minimum number of shard files a repo must have on disk to merit compaction (default 20) - -### /admin/repo/reset - -POST `?did={did:...}` deletes all local data for the repo - -### /admin/repo/verify - -POST `?did={did:...}` checks that all repo data is accessible. HTTP blocks until done. - ### /admin/pds/requestCrawl POST `{"hostname":"pds host"}` to start crawling a PDS @@ -254,37 +218,6 @@ GET returns JSON list of records }, ...] ``` -### /admin/pds/resync - -POST `?host={host}` to start a resync of a PDS - -GET `?host={host}` to get status of a PDS resync, return - -```json -{"resync": { - "pds": { - "Host": string, - "Did": string, - "SSL": bool, - "Cursor": int, - "Registered": bool, - "Blocked": bool, - "RateLimit": float, - "CrawlRateLimit": float, - "RepoCount": int, - "RepoLimit": int, - "HourlyEventLimit": int, - "DailyEventLimit": int, - }, - "numRepoPages": int, - "numRepos": int, - "numReposChecked": int, - "numReposToResync": int, - "status": string, - "statusChangedAt": time, -}} -``` - ### /admin/pds/changeLimits POST to set the limits for a PDS. body: diff --git a/cmd/relay/bgs/bgs.go b/cmd/relay/bgs/bgs.go index 0b5444bb0..f1cb94593 100644 --- a/cmd/relay/bgs/bgs.go +++ b/cmd/relay/bgs/bgs.go @@ -21,7 +21,6 @@ import ( comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/cmd/relay/events" "github.com/bluesky-social/indigo/cmd/relay/models" - lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/xrpc" "github.com/gorilla/websocket" @@ -330,7 +329,7 @@ var homeMessage string = ` .##....##..##.......##.......##.....##....##... .##.....##.########.########.##.....##....##... -This is an atproto [https://atproto.com] relay instance, running the 'bigsky' codebase [https://github.com/bluesky-social/indigo] +This is an atproto [https://atproto.com] relay instance, running the 'relay' codebase [https://github.com/bluesky-social/indigo] The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos ` @@ -713,14 +712,6 @@ func (bgs *BGS) lookupUserByUID(ctx context.Context, uid models.Uid) (*Account, return &u, nil } -func stringLink(lnk *lexutil.LexLink) string { - if lnk == nil { - return "" - } - - return lnk.String() -} - // handleFedEvent() is the callback passed to Slurper called from Slurper.handleConnection() func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *events.XRPCStreamEvent) error { ctx, span := tracer.Start(ctx, "handleFedEvent") diff --git a/cmd/relay/events/cbor_gen.go b/cmd/relay/events/cbor_gen.go index 8e13f8339..01ff8ec6b 100644 --- a/cmd/relay/events/cbor_gen.go +++ b/cmd/relay/events/cbor_gen.go @@ -8,9 +8,9 @@ import ( "math" "sort" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" - xerrors "golang.org/x/xerrors" + "golang.org/x/xerrors" ) var _ = xerrors.Errorf diff --git a/cmd/relay/events/consumer.go b/cmd/relay/events/consumer.go index 6c832afc6..08a119c7f 100644 --- a/cmd/relay/events/consumer.go +++ b/cmd/relay/events/consumer.go @@ -8,11 +8,11 @@ import ( "net" "time" - "github.com/RussellLuo/slidingwindow" comatproto "github.com/bluesky-social/indigo/api/atproto" - "github.com/prometheus/client_golang/prometheus" + "github.com/RussellLuo/slidingwindow" "github.com/gorilla/websocket" + "github.com/prometheus/client_golang/prometheus" ) type RepoStreamCallbacks struct { diff --git a/cmd/relay/events/events.go b/cmd/relay/events/events.go index d447d68f9..ab608878c 100644 --- a/cmd/relay/events/events.go +++ b/cmd/relay/events/events.go @@ -13,8 +13,8 @@ import ( comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/cmd/relay/models" lexutil "github.com/bluesky-social/indigo/lex/util" - "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus" cbg "github.com/whyrusleeping/cbor-gen" "go.opentelemetry.io/otel" ) diff --git a/cmd/relay/events/persist.go b/cmd/relay/events/persist.go index 82d57f8fe..05edda309 100644 --- a/cmd/relay/events/persist.go +++ b/cmd/relay/events/persist.go @@ -2,8 +2,6 @@ package events import ( "context" - "fmt" - "sync" "github.com/bluesky-social/indigo/cmd/relay/models" ) @@ -18,82 +16,3 @@ type EventPersistence interface { SetEventBroadcaster(func(*XRPCStreamEvent)) } - -// MemPersister is the most naive implementation of event persistence -// This EventPersistence option works fine with all event types -// ill do better later -type MemPersister struct { - buf []*XRPCStreamEvent - lk sync.Mutex - seq int64 - - broadcast func(*XRPCStreamEvent) -} - -func NewMemPersister() *MemPersister { - return &MemPersister{} -} - -func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error { - mp.lk.Lock() - defer mp.lk.Unlock() - mp.seq++ - switch { - case e.RepoCommit != nil: - e.RepoCommit.Seq = mp.seq - case e.RepoHandle != nil: - e.RepoHandle.Seq = mp.seq - case e.RepoIdentity != nil: - e.RepoIdentity.Seq = mp.seq - case e.RepoAccount != nil: - e.RepoAccount.Seq = mp.seq - case e.RepoMigrate != nil: - e.RepoMigrate.Seq = mp.seq - case e.RepoTombstone != nil: - e.RepoTombstone.Seq = mp.seq - case e.LabelLabels != nil: - e.LabelLabels.Seq = mp.seq - default: - panic("no event in persist call") - } - mp.buf = append(mp.buf, e) - - mp.broadcast(e) - - return nil -} - -func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { - mp.lk.Lock() - l := len(mp.buf) - mp.lk.Unlock() - - if since >= int64(l) { - return nil - } - - // TODO: abusing the fact that buf[0].seq is currently always 1 - for _, e := range mp.buf[since:l] { - if err := cb(e); err != nil { - return err - } - } - - return nil -} - -func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error { - return fmt.Errorf("repo takedowns not currently supported by memory persister, test usage only") -} - -func (mp *MemPersister) Flush(ctx context.Context) error { - return nil -} - -func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) { - mp.broadcast = brc -} - -func (mp *MemPersister) Shutdown(context.Context) error { - return nil -} diff --git a/cmd/relay/main.go b/cmd/relay/main.go index 299affd26..1a623d356 100644 --- a/cmd/relay/main.go +++ b/cmd/relay/main.go @@ -1,44 +1,33 @@ package main import ( - "context" "crypto/rand" "encoding/base64" - "errors" "fmt" - "github.com/bluesky-social/indigo/atproto/identity" - "github.com/bluesky-social/indigo/cmd/relay/events/diskpersist" - "gorm.io/gorm" "io" "log/slog" - _ "net/http/pprof" "net/url" "os" "os/signal" "path/filepath" - "strconv" "strings" "syscall" "time" + _ "github.com/joho/godotenv/autoload" + _ "go.uber.org/automaxprocs" + _ "net/http/pprof" + + "github.com/bluesky-social/indigo/atproto/identity" libbgs "github.com/bluesky-social/indigo/cmd/relay/bgs" "github.com/bluesky-social/indigo/cmd/relay/events" + "github.com/bluesky-social/indigo/cmd/relay/events/diskpersist" "github.com/bluesky-social/indigo/util" "github.com/bluesky-social/indigo/util/cliutil" "github.com/bluesky-social/indigo/xrpc" - _ "github.com/joho/godotenv/autoload" - _ "go.uber.org/automaxprocs" - "github.com/carlmjohnson/versioninfo" "github.com/urfave/cli/v2" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/exporters/jaeger" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" - "go.opentelemetry.io/otel/sdk/resource" - tracesdk "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "gorm.io/plugin/opentelemetry/tracing" ) @@ -63,8 +52,8 @@ func run(args []string) error { }, &cli.StringFlag{ Name: "db-url", - Usage: "database connection string for BGS database", - Value: "sqlite://./data/bigsky/bgs.sqlite", + Usage: "database connection string for relay database", + Value: "sqlite://./data/relay/relay.sqlite", EnvVars: []string{"DATABASE_URL"}, }, &cli.BoolFlag{ @@ -168,73 +157,6 @@ func run(args []string) error { return app.Run(os.Args) } -func setupOTEL(cctx *cli.Context) error { - - env := cctx.String("env") - if env == "" { - env = "dev" - } - if cctx.Bool("jaeger") { - jaegerUrl := "http://localhost:14268/api/traces" - exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerUrl))) - if err != nil { - return err - } - tp := tracesdk.NewTracerProvider( - // Always be sure to batch in production. - tracesdk.WithBatcher(exp), - // Record information about this application in a Resource. - tracesdk.WithResource(resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceNameKey.String("bgs"), - attribute.String("env", env), // DataDog - attribute.String("environment", env), // Others - attribute.Int64("ID", 1), - )), - ) - - otel.SetTracerProvider(tp) - } - - // Enable OTLP HTTP exporter - // For relevant environment variables: - // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables - // At a minimum, you need to set - // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 - if ep := cctx.String("otel-exporter-otlp-endpoint"); ep != "" { - slog.Info("setting up trace exporter", "endpoint", ep) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - exp, err := otlptracehttp.New(ctx) - if err != nil { - slog.Error("failed to create trace exporter", "error", err) - os.Exit(1) - } - defer func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - if err := exp.Shutdown(ctx); err != nil { - slog.Error("failed to shutdown trace exporter", "error", err) - } - }() - - tp := tracesdk.NewTracerProvider( - tracesdk.WithBatcher(exp), - tracesdk.WithResource(resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceNameKey.String("bgs"), - attribute.String("env", env), // DataDog - attribute.String("environment", env), // Others - attribute.Int64("ID", 1), - )), - ) - otel.SetTracerProvider(tp) - } - - return nil -} - func runRelay(cctx *cli.Context) error { // Trap SIGINT to trigger a shutdown. signals := make(chan os.Signal, 1) @@ -334,7 +256,7 @@ func runRelay(cctx *cli.Context) error { ratelimitBypass := cctx.String("bsky-social-rate-limit-skip") - logger.Info("constructing bgs") + logger.Info("constructing relay service") bgsConfig := libbgs.DefaultBGSConfig() bgsConfig.SSL = !cctx.Bool("crawl-insecure-ws") bgsConfig.ConcurrencyPerPDS = cctx.Int64("concurrency-per-pds") @@ -390,16 +312,16 @@ func runRelay(cctx *cli.Context) error { logger.Info("received shutdown signal") errs := bgs.Shutdown() for err := range errs { - logger.Error("error during BGS shutdown", "err", err) + logger.Error("error during shutdown", "err", err) } case err := <-bgsErr: if err != nil { - logger.Error("error during BGS startup", "err", err) + logger.Error("error during startup", "err", err) } logger.Info("shutting down") errs := bgs.Shutdown() for err := range errs { - logger.Error("error during BGS shutdown", "err", err) + logger.Error("error during shutdown", "err", err) } } @@ -426,53 +348,3 @@ func makePdsClientSetup(ratelimitBypass string) func(c *xrpc.Client) { } } } - -// RelaySetting is a gorm model -type RelaySetting struct { - Name string `gorm:"primarykey"` - Value string -} - -func getRelaySetting(db *gorm.DB, name string) (value string, found bool, err error) { - var setting RelaySetting - dbResult := db.First(&setting, "name = ?", name) - if errors.Is(dbResult.Error, gorm.ErrRecordNotFound) { - return "", false, nil - } - if dbResult.Error != nil { - return "", false, dbResult.Error - } - return setting.Value, true, nil -} - -func setRelaySetting(db *gorm.DB, name string, value string) error { - return db.Transaction(func(tx *gorm.DB) error { - var setting RelaySetting - found := tx.First(&setting, "name = ?", name) - if errors.Is(found.Error, gorm.ErrRecordNotFound) { - // ok! create it - setting.Name = name - setting.Value = value - return tx.Create(&setting).Error - } else if found.Error != nil { - return found.Error - } - setting.Value = value - return tx.Save(&setting).Error - }) -} - -func getRelaySettingBool(db *gorm.DB, name string) (value bool, found bool, err error) { - strval, found, err := getRelaySetting(db, name) - if err != nil || !found { - return false, found, err - } - value, err = strconv.ParseBool(strval) - if err != nil { - return false, false, err - } - return value, true, nil -} -func setRelaySettingBool(db *gorm.DB, name string, value bool) error { - return setRelaySetting(db, name, strconv.FormatBool(value)) -} diff --git a/cmd/relay/models/models.go b/cmd/relay/models/models.go index f38107c63..2788d7ece 100644 --- a/cmd/relay/models/models.go +++ b/cmd/relay/models/models.go @@ -4,6 +4,7 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "github.com/ipfs/go-cid" "gorm.io/gorm" ) diff --git a/cmd/relay/otel.go b/cmd/relay/otel.go new file mode 100644 index 000000000..810e55e5f --- /dev/null +++ b/cmd/relay/otel.go @@ -0,0 +1,84 @@ +package main + +import ( + "context" + "log/slog" + "os" + "time" + + "github.com/urfave/cli/v2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" +) + +func setupOTEL(cctx *cli.Context) error { + + env := cctx.String("env") + if env == "" { + env = "dev" + } + if cctx.Bool("jaeger") { + jaegerUrl := "http://localhost:14268/api/traces" + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerUrl))) + if err != nil { + return err + } + tp := tracesdk.NewTracerProvider( + // Always be sure to batch in production. + tracesdk.WithBatcher(exp), + // Record information about this application in a Resource. + tracesdk.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("bgs"), + attribute.String("env", env), // DataDog + attribute.String("environment", env), // Others + attribute.Int64("ID", 1), + )), + ) + + otel.SetTracerProvider(tp) + } + + // Enable OTLP HTTP exporter + // For relevant environment variables: + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables + // At a minimum, you need to set + // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 + if ep := cctx.String("otel-exporter-otlp-endpoint"); ep != "" { + slog.Info("setting up trace exporter", "endpoint", ep) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exp, err := otlptracehttp.New(ctx) + if err != nil { + slog.Error("failed to create trace exporter", "error", err) + os.Exit(1) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := exp.Shutdown(ctx); err != nil { + slog.Error("failed to shutdown trace exporter", "error", err) + } + }() + + tp := tracesdk.NewTracerProvider( + tracesdk.WithBatcher(exp), + tracesdk.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String("bgs"), + attribute.String("env", env), // DataDog + attribute.String("environment", env), // Others + attribute.Int64("ID", 1), + )), + ) + otel.SetTracerProvider(tp) + } + + return nil +} diff --git a/cmd/relay/settings.go b/cmd/relay/settings.go new file mode 100644 index 000000000..c550be2cf --- /dev/null +++ b/cmd/relay/settings.go @@ -0,0 +1,58 @@ +package main + +import ( + "errors" + "strconv" + + "gorm.io/gorm" +) + +// RelaySetting is a gorm model +type RelaySetting struct { + Name string `gorm:"primarykey"` + Value string +} + +func getRelaySetting(db *gorm.DB, name string) (value string, found bool, err error) { + var setting RelaySetting + dbResult := db.First(&setting, "name = ?", name) + if errors.Is(dbResult.Error, gorm.ErrRecordNotFound) { + return "", false, nil + } + if dbResult.Error != nil { + return "", false, dbResult.Error + } + return setting.Value, true, nil +} + +func setRelaySetting(db *gorm.DB, name string, value string) error { + return db.Transaction(func(tx *gorm.DB) error { + var setting RelaySetting + found := tx.First(&setting, "name = ?", name) + if errors.Is(found.Error, gorm.ErrRecordNotFound) { + // ok! create it + setting.Name = name + setting.Value = value + return tx.Create(&setting).Error + } else if found.Error != nil { + return found.Error + } + setting.Value = value + return tx.Save(&setting).Error + }) +} + +func getRelaySettingBool(db *gorm.DB, name string) (value bool, found bool, err error) { + strval, found, err := getRelaySetting(db, name) + if err != nil || !found { + return false, found, err + } + value, err = strconv.ParseBool(strval) + if err != nil { + return false, false, err + } + return value, true, nil +} +func setRelaySettingBool(db *gorm.DB, name string, value bool) error { + return setRelaySetting(db, name, strconv.FormatBool(value)) +}