Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay tidy #991

Merged
merged 10 commits into from
Mar 19, 2025
5 changes: 3 additions & 2 deletions HACKING.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@

## git repo contents

Run with, eg, `go run ./cmd/bigsky`):
Run with, eg, `go run ./cmd/relay`):

- `cmd/bigsky`: Relay+indexer daemon
- `cmd/bigsky`: relay daemon
- `cmd/relay`: new (sync v1.1) relay daemon
- `cmd/palomar`: search indexer and query servcie (OpenSearch)
- `cmd/gosky`: client CLI for talking to a PDS
- `cmd/lexgen`: codegen tool for lexicons (Lexicon JSON to Go package)
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ help: ## Print info about all commands
build: ## Build all executables
go build ./cmd/gosky
go build ./cmd/bigsky
go build ./cmd/relay
go build ./cmd/beemo
go build ./cmd/lexgen
go build ./cmd/stress
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Some Bluesky software is developed in Typescript, and lives in the [bluesky-soci

**Go Services:**

- **bigsky** ([README](./cmd/bigsky/README.md)): "Big Graph Service" (BGS) reference implementation, running at `bsky.network`
- **bigsky** ([README](./cmd/bigsky/README.md)): relay reference implementation, running at `bsky.network`
- **palomar** ([README](./cmd/palomar/README.md)): fulltext search service for <https://bsky.app>
- **hepa** ([README](./cmd/hepa/README.md)): auto-moderation bot for [Ozone](https://ozone.tools)

Expand Down
99 changes: 16 additions & 83 deletions cmd/relay/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
atproto Relay Service
===============================

*NOTE: "Relays" used to be called "Big Graph Servers", or "BGS", or "bigsky". Many variables and packages still reference "bgs"*
*NOTE: "relays" used to be called "Big Graph Servers", or "BGS", or "bigsky". Many variables and packages still reference "bgs"*

This is the implementation of an atproto Relay which is running in the production network, written and operated by Bluesky.
This is the implementation of an atproto relay which is running in the production network, written and operated by Bluesky.

In atproto, a Relay subscribes to multiple PDS hosts and outputs a combined "firehose" event stream. Downstream services can subscribe to this single firehose a get all relevant events for the entire network, or a specific sub-graph of the network. The Relay maintains a mirror of repo data from all accounts on the upstream PDS instances, and verifies repo data structure integrity and identity signatures. It is agnostic to applications, and does not validate data against atproto Lexicon schemas.
In atproto, a relay subscribes to multiple PDS hosts and outputs a combined "firehose" event stream. Downstream services can subscribe to this single firehose a get all relevant events for the entire network, or a specific sub-graph of the network. The relay maintains a mirror of repo data from all accounts on the upstream PDS instances, and verifies repo data structure integrity and identity signatures. It is agnostic to applications, and does not validate data against atproto Lexicon schemas.

This Relay implementation is designed to subscribe to the entire global network. The current state of the codebase is informally expected to scale to around 50 million accounts in the network, and thousands of repo events per second (peak).
This relay implementation is designed to subscribe to the entire global network. The current state of the codebase is informally expected to scale to around 100 million accounts in the network, and tens of thousands of repo events per second (peak).

Features and design decisions:

Expand All @@ -20,35 +20,35 @@ Features and design decisions:
- observability: logging, prometheus metrics, OTEL traces
- admin web interface: configure limits, add upstream PDS instances, etc

This software is not as packaged, documented, and supported for self-hosting as our PDS distribution or Ozone service. But it is relatively simple and inexpensive to get running.
This software is not yet as packaged, documented, and supported for self-hosting as our PDS distribution or Ozone service. But it is relatively simple and inexpensive to get running.

A note and reminder about Relays in general are that they are more of a convenience in the protocol than a hard requirement. The "firehose" API is the exact same on the PDS and on a Relay. Any service which subscribes to the Relay could instead connect to one or more PDS instances directly.
A note and reminder about relays in general are that they are more of a convenience in the protocol than a hard requirement. The "firehose" API is the exact same on the PDS and on a relay. Any service which subscribes to the relay could instead connect to one or more PDS instances directly.


## Development Tips

The README and Makefile at the top level of this git repo have some generic helpers for testing, linting, formatting code, etc.

To re-build and run the Relay locally:
To re-build and run the relay locally:

make run-dev-relay

You can re-build and run the command directly to get a list of configuration flags and env vars; env vars will be loaded from `.env` if that file exists:

RELAY_ADMIN_KEY=localdev go run ./cmd/relay/ --help

By default, the daemon will use sqlite for databases (in the directory `./data/bigsky/`), CAR data will be stored as individual shard files in `./data/bigsky/carstore/`), and the HTTP API will be bound to localhost port 2470.
By default, the daemon will use sqlite for databases (in the directory `./data/relay/`) and the HTTP API will be bound to localhost port 2470.

When the daemon isn't running, sqlite database files can be inspected with:

sqlite3 data/bigsky/bgs.sqlite
sqlite3 data/relay/relay.sqlite
[...]
sqlite> .schema

Wipe all local data:

# careful! double-check this destructive command
rm -rf ./data/bigsky/*
rm -rf ./data/relay/*

There is a basic web dashboard, though it will not be included unless built and copied to a local directory `./public/`. Run `make build-relay-ui`, and then when running the daemon the dashboard will be available at: <http://localhost:2470/dash/>. Paste in the admin key, eg `localdev`.

Expand All @@ -63,58 +63,42 @@ Request crawl of an individual PDS instance like:

## Docker Containers

One way to deploy is running a docker image. You can pull and/or run a specific version of bigsky, referenced by git commit, from the Bluesky Github container registry. For example:
One way to deploy is running a docker image. You can pull and/or run a specific version of relay, referenced by git commit, from the Bluesky Github container registry. For example:

docker pull ghcr.io/bluesky-social/indigo:relay-fd66f93ce1412a3678a1dd3e6d53320b725978a6
docker run ghcr.io/bluesky-social/indigo:relay-fd66f93ce1412a3678a1dd3e6d53320b725978a6

There is a Dockerfile in this directory, which can be used to build customized/patched versions of the Relay as a container, republish them, run locally, deploy to servers, deploy to an orchestrated cluster, etc. See docs and guides for docker and cluster management systems for details.
There is a Dockerfile in this directory, which can be used to build customized/patched versions of the relay as a container, republish them, run locally, deploy to servers, deploy to an orchestrated cluster, etc. See docs and guides for docker and cluster management systems for details.


## Database Setup

PostgreSQL and Sqlite are both supported. When using Sqlite, separate files are used for Relay metadata and CarStore metadata. With PostgreSQL a single database server, user, and logical database can all be reused: table names will not conflict.

Database configuration is passed via the `DATABASE_URL` and `CARSTORE_DATABASE_URL` environment variables, or the corresponding CLI args.
PostgreSQL and Sqlite are both supported. Database configuration is passed via the `DATABASE_URL` environment variable, or the corresponding CLI arg.

For PostgreSQL, the user and database must already be configured. Some example SQL commands are:

CREATE DATABASE bgs;
CREATE DATABASE carstore;
CREATE DATABASE relay;

CREATE USER ${username} WITH PASSWORD '${password}';
GRANT ALL PRIVILEGES ON DATABASE bgs TO ${username};
GRANT ALL PRIVILEGES ON DATABASE carstore TO ${username};
GRANT ALL PRIVILEGES ON DATABASE relay TO ${username};

This service currently uses `gorm` to automatically run database migrations as the regular user. There is no concept of running a separate set of migrations under more privileged database user.


## Deployment

*NOTE: this is not a complete guide to operating a Relay. There are decisions to be made and communicated about policies, bandwidth use, PDS crawling and rate-limits, financial sustainability, etc, which are not covered here. This is just a quick overview of how to technically get a relay up and running.*
*NOTE: this is not a complete guide to operating a relay. There are decisions to be made and communicated about policies, bandwidth use, PDS crawling and rate-limits, financial sustainability, etc, which are not covered here. This is just a quick overview of how to technically get a relay up and running.*

In a real-world system, you will probably want to use PostgreSQL.

Some notable configuration env vars to set:

- `ENVIRONMENT`: eg, `production`
- `DATABASE_URL`: see section below
- `DATA_DIR`: misc data will go in a subdirectory
- `GOLOG_LOG_LEVEL`: log verbosity
- `RESOLVE_ADDRESS`: DNS server to use
- `FORCE_DNS_UDP`: recommend "true"

There is a health check endpoint at `/xrpc/_health`. Prometheus metrics are exposed by default on port 2471, path `/metrics`. The service logs fairly verbosely to stderr; use `GOLOG_LOG_LEVEL` to control log volume.

As a rough guideline for the compute resources needed to run a full-network Relay, in June 2024 an example Relay for over 5 million repositories used:

- roughly 1 TByte of disk for PostgreSQL
- roughly 1 TByte of disk for event playback buffer
- roughly 5k disk I/O operations per second (all combined)
- roughly 100% of one CPU core (quite low CPU utilization)
- roughly 5GB of RAM for `relay`, and as much RAM as available for PostgreSQL and page cache
- on the order of 1 megabit inbound bandwidth (crawling PDS instances) and 1 megabit outbound per connected client. 1 mbit continuous is approximately 350 GByte/month

Be sure to double-check bandwidth usage and pricing if running a public relay! Bandwidth prices can vary widely between providers, and popular cloud services (AWS, Google Cloud, Azure) are very expensive compared to alternatives like OVH or Hetzner.


Expand Down Expand Up @@ -202,26 +186,6 @@ POST `{"did": "did:..."}` to take-down a bad repo; deletes all local data for th

POST `?did={did:...}` to reverse a repo take-down

### /admin/repo/compact

POST `?did={did:...}` to compact a repo. Optionally `&fast=true`. HTTP blocks until the compaction finishes.

### /admin/repo/compactAll

POST to begin compaction of all repos. Optional query params:

* `fast=true`
* `limit={int}` maximum number of repos to compact (biggest first) (default 50)
* `threhsold={int}` minimum number of shard files a repo must have on disk to merit compaction (default 20)

### /admin/repo/reset

POST `?did={did:...}` deletes all local data for the repo

### /admin/repo/verify

POST `?did={did:...}` checks that all repo data is accessible. HTTP blocks until done.

### /admin/pds/requestCrawl

POST `{"hostname":"pds host"}` to start crawling a PDS
Expand Down Expand Up @@ -254,37 +218,6 @@ GET returns JSON list of records
}, ...]
```

### /admin/pds/resync

POST `?host={host}` to start a resync of a PDS

GET `?host={host}` to get status of a PDS resync, return

```json
{"resync": {
"pds": {
"Host": string,
"Did": string,
"SSL": bool,
"Cursor": int,
"Registered": bool,
"Blocked": bool,
"RateLimit": float,
"CrawlRateLimit": float,
"RepoCount": int,
"RepoLimit": int,
"HourlyEventLimit": int,
"DailyEventLimit": int,
},
"numRepoPages": int,
"numRepos": int,
"numReposChecked": int,
"numReposToResync": int,
"status": string,
"statusChangedAt": time,
}}
```

### /admin/pds/changeLimits

POST to set the limits for a PDS. body:
Expand Down
11 changes: 1 addition & 10 deletions cmd/relay/bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/cmd/relay/events"
"github.com/bluesky-social/indigo/cmd/relay/models"
lexutil "github.com/bluesky-social/indigo/lex/util"
"github.com/bluesky-social/indigo/xrpc"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -330,7 +329,7 @@ var homeMessage string = `
.##....##..##.......##.......##.....##....##...
.##.....##.########.########.##.....##....##...

This is an atproto [https://atproto.com] relay instance, running the 'bigsky' codebase [https://github.com/bluesky-social/indigo]
This is an atproto [https://atproto.com] relay instance, running the 'relay' codebase [https://github.com/bluesky-social/indigo]

The firehose WebSocket path is at: /xrpc/com.atproto.sync.subscribeRepos
`
Expand Down Expand Up @@ -713,14 +712,6 @@ func (bgs *BGS) lookupUserByUID(ctx context.Context, uid models.Uid) (*Account,
return &u, nil
}

func stringLink(lnk *lexutil.LexLink) string {
if lnk == nil {
return "<nil>"
}

return lnk.String()
}

// handleFedEvent() is the callback passed to Slurper called from Slurper.handleConnection()
func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *events.XRPCStreamEvent) error {
ctx, span := tracer.Start(ctx, "handleFedEvent")
Expand Down
4 changes: 2 additions & 2 deletions cmd/relay/events/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions cmd/relay/events/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"net"
"time"

"github.com/RussellLuo/slidingwindow"
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/prometheus/client_golang/prometheus"

"github.com/RussellLuo/slidingwindow"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
)

type RepoStreamCallbacks struct {
Expand Down
2 changes: 1 addition & 1 deletion cmd/relay/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/cmd/relay/models"
lexutil "github.com/bluesky-social/indigo/lex/util"
"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/client_golang/prometheus"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel"
)
Expand Down
81 changes: 0 additions & 81 deletions cmd/relay/events/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package events

import (
"context"
"fmt"
"sync"

"github.com/bluesky-social/indigo/cmd/relay/models"
)
Expand All @@ -18,82 +16,3 @@ type EventPersistence interface {

SetEventBroadcaster(func(*XRPCStreamEvent))
}

// MemPersister is the most naive implementation of event persistence
// This EventPersistence option works fine with all event types
// ill do better later
type MemPersister struct {
buf []*XRPCStreamEvent
lk sync.Mutex
seq int64

broadcast func(*XRPCStreamEvent)
}

func NewMemPersister() *MemPersister {
return &MemPersister{}
}

func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error {
mp.lk.Lock()
defer mp.lk.Unlock()
mp.seq++
switch {
case e.RepoCommit != nil:
e.RepoCommit.Seq = mp.seq
case e.RepoHandle != nil:
e.RepoHandle.Seq = mp.seq
case e.RepoIdentity != nil:
e.RepoIdentity.Seq = mp.seq
case e.RepoAccount != nil:
e.RepoAccount.Seq = mp.seq
case e.RepoMigrate != nil:
e.RepoMigrate.Seq = mp.seq
case e.RepoTombstone != nil:
e.RepoTombstone.Seq = mp.seq
case e.LabelLabels != nil:
e.LabelLabels.Seq = mp.seq
default:
panic("no event in persist call")
}
mp.buf = append(mp.buf, e)

mp.broadcast(e)

return nil
}

func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error {
mp.lk.Lock()
l := len(mp.buf)
mp.lk.Unlock()

if since >= int64(l) {
return nil
}

// TODO: abusing the fact that buf[0].seq is currently always 1
for _, e := range mp.buf[since:l] {
if err := cb(e); err != nil {
return err
}
}

return nil
}

func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error {
return fmt.Errorf("repo takedowns not currently supported by memory persister, test usage only")
}

func (mp *MemPersister) Flush(ctx context.Context) error {
return nil
}

func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) {
mp.broadcast = brc
}

func (mp *MemPersister) Shutdown(context.Context) error {
return nil
}
Loading
Loading