Skip to content

Commit 5e41899

Browse files
committed
cleanup upstream url handling
--splitter-host should work for any http/https/ws/wss url
1 parent d9a74f6 commit 5e41899

File tree

1 file changed

+38
-11
lines changed

1 file changed

+38
-11
lines changed

splitter/splitter.go

+38-11
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,22 @@ func (sc *SplitterConfig) XrpcRootUrl() string {
7575
return "https://" + sc.UpstreamHost
7676
}
7777

78+
func (sc *SplitterConfig) UpstreamUrl() string {
79+
if strings.HasPrefix(sc.UpstreamHost, "http://") {
80+
return "http://" + sc.UpstreamHost[7:]
81+
}
82+
if strings.HasPrefix(sc.UpstreamHost, "https://") {
83+
return "https://" + sc.UpstreamHost[8:]
84+
}
85+
if strings.HasPrefix(sc.UpstreamHost, "ws://") {
86+
return sc.UpstreamHost
87+
}
88+
if strings.HasPrefix(sc.UpstreamHost, "wss://") {
89+
return sc.UpstreamHost
90+
}
91+
return "wss://" + sc.UpstreamHost
92+
}
93+
7894
func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) {
7995
var nextCrawlerURLs []*url.URL
8096
log := slog.Default().With("system", "splitter")
@@ -90,6 +106,11 @@ func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error)
90106
}
91107
}
92108

109+
_, err := url.Parse(conf.UpstreamUrl())
110+
if err != nil {
111+
return nil, fmt.Errorf("failed to parse upstream url %#v: %w", conf.UpstreamUrl(), err)
112+
}
113+
93114
s := &Splitter{
94115
conf: conf,
95116
consumers: make(map[uint64]*SocketConsumer),
@@ -153,7 +174,7 @@ func (s *Splitter) Start(addr string) error {
153174
return fmt.Errorf("loading cursor failed: %w", err)
154175
}
155176

156-
go s.subscribeWithRedialer(context.Background(), s.conf.UpstreamHost, curs)
177+
go s.subscribeWithRedialer(context.Background(), curs)
157178

158179
li, err := lc.Listen(ctx, "tcp", addr)
159180
if err != nil {
@@ -570,10 +591,14 @@ func sleepForBackoff(b int) time.Duration {
570591
return time.Second * 5
571592
}
572593

573-
func (s *Splitter) subscribeWithRedialer(ctx context.Context, host string, cursor int64) {
594+
func (s *Splitter) subscribeWithRedialer(ctx context.Context, cursor int64) {
574595
d := websocket.Dialer{}
575596

576-
protocol := "wss"
597+
upstreamUrl, err := url.Parse(s.conf.UpstreamUrl())
598+
if err != nil {
599+
panic(err) // this should have been checked in NewSplitter
600+
}
601+
upstreamUrl = upstreamUrl.JoinPath("/xrpc/com.atproto.sync.subscribeRepos")
577602

578603
var backoff int
579604
for {
@@ -587,15 +612,17 @@ func (s *Splitter) subscribeWithRedialer(ctx context.Context, host string, curso
587612
"User-Agent": []string{"bgs-rainbow-v0"},
588613
}
589614

590-
var url string
615+
var uurl string
591616
if cursor < 0 {
592-
url = fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos", protocol, host)
617+
upstreamUrl.RawQuery = ""
618+
uurl = upstreamUrl.String()
593619
} else {
594-
url = fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host, cursor)
620+
upstreamUrl.RawQuery = fmt.Sprintf("cursor=%d", cursor)
621+
uurl = upstreamUrl.String()
595622
}
596-
con, res, err := d.DialContext(ctx, url, header)
623+
con, res, err := d.DialContext(ctx, uurl, header)
597624
if err != nil {
598-
s.log.Warn("dialing failed", "host", host, "err", err, "backoff", backoff)
625+
s.log.Warn("dialing failed", "host", uurl, "err", err, "backoff", backoff)
599626
time.Sleep(sleepForBackoff(backoff))
600627
backoff++
601628

@@ -604,13 +631,13 @@ func (s *Splitter) subscribeWithRedialer(ctx context.Context, host string, curso
604631

605632
s.log.Info("event subscription response", "code", res.StatusCode)
606633

607-
if err := s.handleConnection(ctx, host, con, &cursor); err != nil {
608-
s.log.Warn("connection failed", "host", host, "err", err)
634+
if err := s.handleConnection(ctx, con, &cursor); err != nil {
635+
s.log.Warn("connection failed", "host", uurl, "err", err)
609636
}
610637
}
611638
}
612639

613-
func (s *Splitter) handleConnection(ctx context.Context, host string, con *websocket.Conn, lastCursor *int64) error {
640+
func (s *Splitter) handleConnection(ctx context.Context, con *websocket.Conn, lastCursor *int64) error {
614641
ctx, cancel := context.WithCancel(ctx)
615642
defer cancel()
616643

0 commit comments

Comments
 (0)