Skip to content

Commit ecf00b2

Browse files
committed
fix: auto-batch full sync backlog
1 parent 45e847e commit ecf00b2

6 files changed

Lines changed: 350 additions & 39 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ All notable changes to `discrawl` will be documented in this file.
44

55
## 0.2.0 - Unreleased
66

7+
- `sync --full` now auto-batches incomplete message channels from the local archive instead of stalling on large forum/thread catalogs
78
- offline member-profile search via `member_fts`
89
- `members search` now matches archived profile fields in addition to names
910
- `members show` now accepts ids or queries and shows recent messages plus message stats when uniquely resolved

internal/store/query.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,46 @@ func (s *Store) Channels(ctx context.Context, guildID string) ([]ChannelRow, err
241241
return out, rows.Err()
242242
}
243243

244+
func (s *Store) IncompleteMessageChannelIDs(ctx context.Context, guildID string) ([]string, error) {
245+
args := []any{}
246+
query := `
247+
select c.id
248+
from channels c
249+
where c.kind in ('text', 'news', 'announcement', 'thread_public', 'thread_private', 'thread_news', 'thread_announcement')
250+
`
251+
if guildID != "" {
252+
query += ` and c.guild_id = ?`
253+
args = append(args, guildID)
254+
}
255+
query += `
256+
and not exists (
257+
select 1
258+
from sync_state s
259+
where s.scope = 'channel:' || c.id || ':history_complete'
260+
)
261+
and not exists (
262+
select 1
263+
from sync_state s
264+
where s.scope = 'channel:' || c.id || ':unavailable'
265+
)
266+
order by c.id
267+
`
268+
rows, err := s.db.QueryContext(ctx, query, args...)
269+
if err != nil {
270+
return nil, err
271+
}
272+
defer func() { _ = rows.Close() }()
273+
var out []string
274+
for rows.Next() {
275+
var id string
276+
if err := rows.Scan(&id); err != nil {
277+
return nil, err
278+
}
279+
out = append(out, id)
280+
}
281+
return out, rows.Err()
282+
}
283+
244284
func (s *Store) Status(ctx context.Context, dbPath, defaultGuildID string) (Status, error) {
245285
status := Status{DBPath: dbPath, DefaultGuildID: defaultGuildID}
246286
queries := map[string]*int{

internal/syncer/channel_catalog.go

Lines changed: 112 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,21 @@ func (s *Syncer) liveChannelList(ctx context.Context, guildID string) ([]*discor
7070
for _, channel := range channels {
7171
allChannels[channel.ID] = channel
7272
}
73-
if err := s.appendThreadCatalog(ctx, allChannels, threadParentIDs(channels)); err != nil {
73+
var storedRows []store.ChannelRow
74+
if s.store != nil {
75+
rows, err := s.store.Channels(ctx, guildID)
76+
if err != nil {
77+
return nil, err
78+
}
79+
storedRows = rows
80+
mergeStoredThreadChannels(allChannels, rows)
81+
}
82+
parentIDs := fullSyncThreadParentIDs(channels, storedRows)
83+
if len(storedThreadParentIDs(storedRows)) == 0 {
84+
if err := s.appendThreadCatalog(ctx, allChannels, parentIDs); err != nil {
85+
return nil, err
86+
}
87+
} else if err := s.appendActiveThreadCatalog(ctx, allChannels, parentIDs); err != nil {
7488
return nil, err
7589
}
7690
return mapsToSlice(allChannels), nil
@@ -82,11 +96,8 @@ func (s *Syncer) appendThreadCatalog(ctx context.Context, allChannels map[string
8296
if !isThreadParent(channel) {
8397
continue
8498
}
85-
active, err := s.client.ThreadsActive(ctx, channel.ID)
86-
if err == nil {
87-
for _, thread := range active {
88-
allChannels[thread.ID] = thread
89-
}
99+
if err := s.appendActiveThreads(ctx, allChannels, channel.ID); err != nil {
100+
return err
90101
}
91102
for _, private := range []bool{false, true} {
92103
archived, err := s.client.ThreadsArchived(ctx, channel.ID, private)
@@ -102,6 +113,30 @@ func (s *Syncer) appendThreadCatalog(ctx context.Context, allChannels map[string
102113
return nil
103114
}
104115

116+
func (s *Syncer) appendActiveThreadCatalog(ctx context.Context, allChannels map[string]*discordgo.Channel, parents []string) error {
117+
for _, parentID := range uniqueIDs(parents) {
118+
channel := allChannels[parentID]
119+
if !isThreadParent(channel) {
120+
continue
121+
}
122+
if err := s.appendActiveThreads(ctx, allChannels, channel.ID); err != nil {
123+
return err
124+
}
125+
}
126+
return nil
127+
}
128+
129+
func (s *Syncer) appendActiveThreads(ctx context.Context, allChannels map[string]*discordgo.Channel, channelID string) error {
130+
active, err := s.client.ThreadsActive(ctx, channelID)
131+
if err != nil {
132+
return err
133+
}
134+
for _, thread := range active {
135+
allChannels[thread.ID] = thread
136+
}
137+
return nil
138+
}
139+
105140
func mapsToSlice(in map[string]*discordgo.Channel) []*discordgo.Channel {
106141
out := make([]*discordgo.Channel, 0, len(in))
107142
for _, channel := range in {
@@ -111,6 +146,18 @@ func mapsToSlice(in map[string]*discordgo.Channel) []*discordgo.Channel {
111146
return out
112147
}
113148

149+
func mergeStoredThreadChannels(allChannels map[string]*discordgo.Channel, rows []store.ChannelRow) {
150+
for _, row := range rows {
151+
if !strings.HasPrefix(row.Kind, "thread_") {
152+
continue
153+
}
154+
if _, ok := allChannels[row.ID]; ok {
155+
continue
156+
}
157+
allChannels[row.ID] = channelFromRow(row)
158+
}
159+
}
160+
114161
func selectStoredChannels(rows []store.ChannelRow, requested map[string]struct{}) map[string]*discordgo.Channel {
115162
if len(rows) == 0 || len(requested) == 0 {
116163
return nil
@@ -120,30 +167,34 @@ func selectStoredChannels(rows []store.ChannelRow, requested map[string]struct{}
120167
if _, ok := requested[row.ID]; !ok {
121168
continue
122169
}
123-
channelType := channelTypeFromKind(row.Kind)
124-
var threadMeta *discordgo.ThreadMetadata
125-
if strings.HasPrefix(row.Kind, "thread_") {
126-
threadMeta = &discordgo.ThreadMetadata{
127-
Archived: row.IsArchived,
128-
Locked: row.IsLocked,
129-
ArchiveTimestamp: row.ArchiveTimestamp,
130-
}
131-
}
132-
out[row.ID] = &discordgo.Channel{
133-
ID: row.ID,
134-
GuildID: row.GuildID,
135-
ParentID: row.ParentID,
136-
Name: row.Name,
137-
Topic: row.Topic,
138-
Position: row.Position,
139-
NSFW: row.IsNSFW,
140-
Type: channelType,
141-
ThreadMetadata: threadMeta,
142-
}
170+
out[row.ID] = channelFromRow(row)
143171
}
144172
return out
145173
}
146174

175+
func channelFromRow(row store.ChannelRow) *discordgo.Channel {
176+
channelType := channelTypeFromKind(row.Kind)
177+
var threadMeta *discordgo.ThreadMetadata
178+
if strings.HasPrefix(row.Kind, "thread_") {
179+
threadMeta = &discordgo.ThreadMetadata{
180+
Archived: row.IsArchived,
181+
Locked: row.IsLocked,
182+
ArchiveTimestamp: row.ArchiveTimestamp,
183+
}
184+
}
185+
return &discordgo.Channel{
186+
ID: row.ID,
187+
GuildID: row.GuildID,
188+
ParentID: row.ParentID,
189+
Name: row.Name,
190+
Topic: row.Topic,
191+
Position: row.Position,
192+
NSFW: row.IsNSFW,
193+
Type: channelType,
194+
ThreadMetadata: threadMeta,
195+
}
196+
}
197+
147198
func canUseStoredTargets(storedByID map[string]*discordgo.Channel, requested map[string]struct{}) bool {
148199
if len(requested) == 0 || len(storedByID) != len(requested) {
149200
return false
@@ -232,6 +283,41 @@ func threadParentIDs(channels []*discordgo.Channel) []string {
232283
return parents
233284
}
234285

286+
func fullSyncThreadParentIDs(topLevel []*discordgo.Channel, storedRows []store.ChannelRow) []string {
287+
storedParents := storedThreadParentIDs(storedRows)
288+
if len(storedParents) == 0 {
289+
return threadParentIDs(topLevel)
290+
}
291+
parents := make([]string, 0, len(topLevel))
292+
for _, channel := range topLevel {
293+
if channel == nil {
294+
continue
295+
}
296+
if channel.Type == discordgo.ChannelTypeGuildForum {
297+
parents = append(parents, channel.ID)
298+
continue
299+
}
300+
if _, ok := storedParents[channel.ID]; ok && isThreadParent(channel) {
301+
parents = append(parents, channel.ID)
302+
}
303+
}
304+
return uniqueIDs(parents)
305+
}
306+
307+
func storedThreadParentIDs(rows []store.ChannelRow) map[string]struct{} {
308+
if len(rows) == 0 {
309+
return nil
310+
}
311+
parents := make(map[string]struct{})
312+
for _, row := range rows {
313+
if !strings.HasPrefix(row.Kind, "thread_") || row.ParentID == "" {
314+
continue
315+
}
316+
parents[row.ParentID] = struct{}{}
317+
}
318+
return parents
319+
}
320+
235321
func uniqueIDs(ids []string) []string {
236322
set := make(map[string]struct{}, len(ids))
237323
out := make([]string, 0, len(ids))

internal/syncer/channel_catalog_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,89 @@ func TestSyncSkipsUnchangedThreadsWhenHistoryComplete(t *testing.T) {
186186
require.Zero(t, stats.Messages)
187187
require.Zero(t, client.messageCalls["t1"])
188188
}
189+
190+
func TestFullSyncReusesStoredThreadParents(t *testing.T) {
191+
t.Parallel()
192+
193+
ctx := context.Background()
194+
s, err := store.Open(ctx, filepath.Join(t.TempDir(), "discrawl.db"))
195+
require.NoError(t, err)
196+
defer func() { _ = s.Close() }()
197+
198+
require.NoError(t, s.UpsertGuild(ctx, store.GuildRecord{ID: "g1", Name: "Guild", RawJSON: `{}`}))
199+
require.NoError(t, s.UpsertChannel(ctx, store.ChannelRecord{
200+
ID: "c1",
201+
GuildID: "g1",
202+
Kind: "text",
203+
Name: "general",
204+
RawJSON: `{"id":"c1"}`,
205+
}))
206+
require.NoError(t, s.UpsertChannel(ctx, store.ChannelRecord{
207+
ID: "t1",
208+
GuildID: "g1",
209+
ParentID: "c1",
210+
Kind: "thread_public",
211+
Name: "bug-report",
212+
RawJSON: `{"id":"t1"}`,
213+
}))
214+
215+
client := &fakeClient{
216+
guilds: []*discordgo.UserGuild{{ID: "g1", Name: "Guild"}},
217+
guildByID: map[string]*discordgo.Guild{
218+
"g1": {ID: "g1", Name: "Guild"},
219+
},
220+
channels: map[string][]*discordgo.Channel{
221+
"g1": {
222+
{ID: "c1", GuildID: "g1", Name: "general", Type: discordgo.ChannelTypeGuildText},
223+
{ID: "c2", GuildID: "g1", Name: "random", Type: discordgo.ChannelTypeGuildText},
224+
},
225+
},
226+
messages: map[string][]*discordgo.Message{
227+
"t1": {{
228+
ID: "10",
229+
GuildID: "g1",
230+
ChannelID: "t1",
231+
Content: "stored thread body",
232+
Timestamp: time.Now().UTC(),
233+
Author: &discordgo.User{ID: "u1", Username: "user"},
234+
}},
235+
},
236+
}
237+
238+
svc := New(client, s, nil)
239+
stats, err := svc.Sync(ctx, SyncOptions{Full: true, GuildIDs: []string{"g1"}})
240+
require.NoError(t, err)
241+
require.Equal(t, 1, stats.Threads)
242+
require.Equal(t, 1, stats.Messages)
243+
require.Zero(t, client.threadCalls)
244+
require.Equal(t, 1, client.messageCalls["t1"])
245+
}
246+
247+
func TestFullSyncFallsBackToBroadThreadDiscoveryWithoutStoredThreads(t *testing.T) {
248+
t.Parallel()
249+
250+
ctx := context.Background()
251+
s, err := store.Open(ctx, filepath.Join(t.TempDir(), "discrawl.db"))
252+
require.NoError(t, err)
253+
defer func() { _ = s.Close() }()
254+
255+
require.NoError(t, s.UpsertGuild(ctx, store.GuildRecord{ID: "g1", Name: "Guild", RawJSON: `{}`}))
256+
257+
client := &fakeClient{
258+
guilds: []*discordgo.UserGuild{{ID: "g1", Name: "Guild"}},
259+
guildByID: map[string]*discordgo.Guild{
260+
"g1": {ID: "g1", Name: "Guild"},
261+
},
262+
channels: map[string][]*discordgo.Channel{
263+
"g1": {
264+
{ID: "c1", GuildID: "g1", Name: "general", Type: discordgo.ChannelTypeGuildText},
265+
{ID: "c2", GuildID: "g1", Name: "random", Type: discordgo.ChannelTypeGuildText},
266+
},
267+
},
268+
}
269+
270+
svc := New(client, s, nil)
271+
_, err = svc.Sync(ctx, SyncOptions{Full: true, GuildIDs: []string{"g1"}})
272+
require.NoError(t, err)
273+
require.Equal(t, 6, client.threadCalls)
274+
}

0 commit comments

Comments
 (0)