Skip to content

feat: enhance message retrieval logic in MessageManager and TakeoutMa… #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 54 additions & 9 deletions packages/core/src/adapter/client/message-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,33 @@ export class MessageManager {
let offsetId = 0
let hasMore = true
let processedCount = 0
const batchSize = 100 // 每次请求的固定数量
let lastBatchSize = 0

try {
while (hasMore) {
// 计算本次请求的实际限制数量
const requestLimit = Math.min(
batchSize,
limit ? limit - processedCount : batchSize,
)

this.logger.debug('获取普通消息', {
offsetId,
minId: options?.minId,
maxId: options?.maxId,
processedCount,
limit,
requestLimit,
lastBatchSize,
})

// Get messages using normal API with retry
const apiResponse = await this.errorHandler.withRetry(
() => this.client.getMessages(chatId, {
limit: limit || 100,
limit: requestLimit,
offsetId,
addOffset: 0, // 添加 addOffset 参数,与 takeout 模式保持一致
minId: options?.minId || 0,
maxId: options?.maxId || 0,
}),
Expand All @@ -82,11 +101,27 @@ export class MessageManager {
}

const messages = apiResponse.data
lastBatchSize = messages.length

// If we got fewer messages than requested, there are no more
hasMore = messages.length === (limit || 100)
// 如果没有获取到消息,或者获取到的消息数量小于请求数量,说明没有更多消息了
if (messages.length === 0 || messages.length < requestLimit) {
hasMore = false
}

let batchProcessedCount = 0 // 记录这一批次处理的消息数量
for (const message of messages) {
// 更新下一次请求的 offsetId
offsetId = message.id

// 检查是否达到边界条件
if (options?.maxId && message.id >= options.maxId) {
continue
}
if (options?.minId && message.id <= options.minId) {
hasMore = false
break
}

// Skip empty messages
if (message instanceof Api.MessageEmpty) {
continue
Expand All @@ -95,7 +130,8 @@ export class MessageManager {
// Check time range
const messageTime = new Date(message.date * 1000)
if (options?.startTime && messageTime < options.startTime) {
continue
hasMore = false
break
}
if (options?.endTime && messageTime > options.endTime) {
continue
Expand All @@ -120,20 +156,29 @@ export class MessageManager {

yield converted
processedCount++

// Update offsetId to current message ID
offsetId = message.id
batchProcessedCount++

// Check if we've reached the limit
if (options?.limit && processedCount >= options.limit) {
return
if (limit && processedCount >= limit) {
hasMore = false
break
}
}
catch (error) {
// Log error but continue with next message
this.errorHandler.handleError(this.toError(error), '转换消息', `处理消息 ${message.id} 时出错,跳过该消息`)
}
}

// 如果这批次的消息都被过滤掉了,但还有更多消息,继续获取
if (batchProcessedCount === 0 && hasMore && lastBatchSize > 0) {
this.logger.debug('当前批次所有消息都被过滤,继续获取下一批', {
offsetId,
lastBatchSize,
processedCount,
})
continue
}
}
}
catch (error) {
Expand Down
78 changes: 59 additions & 19 deletions packages/core/src/adapter/client/takeout-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ export class TakeoutManager {
let offsetId = 0
let hasMore = true
let processedCount = 0
let hash: bigint = BigInt(0)
let hash: number = 0
let lastBatchSize = 0
const batchSize = 100 // 每次请求的消息数量

try {
// Initialize takeout session
Expand All @@ -202,24 +204,36 @@ export class TakeoutManager {
}

while (hasMore) {
// 计算本次请求的实际限制数量
const requestLimit = Math.min(batchSize, limit - processedCount)

// https://core.telegram.org/api/offsets
const id = BigInt(chatId)
hash = hash ^ (hash >> 21n)
hash = hash ^ (hash << 35n)
hash = hash ^ (hash >> 4n)
const id = (chatId)
hash = hash ^ (hash >> 21)
hash = hash ^ (hash << 35)
hash = hash ^ (hash >> 4)
hash = hash + id
this.logger.debug(`get takeout message ${options?.minId}-${options?.maxId}`)

this.logger.debug('获取takeout消息', {
offsetId,
minId: options?.minId,
maxId: options?.maxId,
processedCount,
limit,
requestLimit,
lastBatchSize,
})

// Get messages using takeout
const query = new Api.messages.GetHistory({
peer: await this.client.getInputEntity(chatId),
offsetId,
addOffset: 0,
limit,
maxId: options?.maxId || 0, // 支持到特定ID结束
minId: options?.minId || 0, // 支持增量导出从特定ID开始
hash: bigInt(hash.toString()),
limit: requestLimit,
maxId: options?.maxId || 0,
minId: options?.minId || 0,
hash: bigInt(hash),
})
this.logger.debug(`message query ${JSON.stringify(query)}`)

// Use error handler for API requests
const apiResponse = await this.errorHandler.withRetry(
Expand Down Expand Up @@ -251,11 +265,27 @@ export class TakeoutManager {
}

const messages = result.messages as Api.Message[]
lastBatchSize = messages.length

// If we got fewer messages than requested, there are no more
hasMore = messages.length === limit
// 如果没有获取到消息,或者消息数量小于请求数量,说明没有更多消息了
if (messages.length === 0 || messages.length < requestLimit) {
hasMore = false
}

let batchProcessedCount = 0 // 记录这一批次处理的消息数量
for (const message of messages) {
// 更新下一次请求的 offsetId
offsetId = message.id

// 检查是否达到边界条件
if (options?.maxId && message.id >= options.maxId) {
continue
}
if (options?.minId && message.id <= options.minId) {
hasMore = false
break
}

// Skip empty messages
if (message instanceof Api.MessageEmpty) {
continue
Expand All @@ -264,7 +294,8 @@ export class TakeoutManager {
// Check time range
const messageTime = new Date(message.date * 1000)
if (options?.startTime && messageTime < options.startTime) {
continue
hasMore = false
break
}
if (options?.endTime && messageTime > options.endTime) {
continue
Expand All @@ -283,15 +314,24 @@ export class TakeoutManager {

yield converted
processedCount++

// Update offsetId to current message ID
offsetId = message.id
batchProcessedCount++

// Check if we've reached the limit
if (options?.limit && processedCount >= options.limit) {
return
if (processedCount >= limit) {
hasMore = false
break
}
}

// 如果这批次的消息都被过滤掉了,但还有更多消息,继续获取
if (batchProcessedCount === 0 && hasMore && lastBatchSize > 0) {
this.logger.debug('当前批次所有消息都被过滤,继续获取下一批', {
offsetId,
lastBatchSize,
processedCount,
})
continue
}
}
}
catch (error: any) {
Expand Down
58 changes: 46 additions & 12 deletions packages/core/src/services/export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,45 @@ export class ExportService {
let count = 0
let failedCount = 0
let messages: TelegramMessage[] = []
logger.debug(`history ${JSON.stringify(history)}`)

// 计算预期的消息总数
let expectedTotal: number
if (limit) {
expectedTotal = limit
}
else if (incremental && startId && exportMaxId) {
// 增量导出时,计算ID范围内的消息数量
expectedTotal = exportMaxId - startId
logger.debug('增量导出预期消息数', {
startId,
exportMaxId,
expectedTotal,
method,
})
}
else {
// 全量导出时使用历史记录总数
expectedTotal = history.count || 100
logger.debug('全量导出预期消息数', {
historyCount: history.count,
expectedTotal,
method,
})
}

const total = limit || history.count - 1 || 100
function isSkipMedia(type: DatabaseMessageType) {
return !messageTypes.includes(type)
}
if (incremental && exportMaxId && (exportMaxId - 1) === startId) {
onProgress?.(100, '无需导出', {

// 检查是否需要导出
if (incremental && exportMaxId && startId && (exportMaxId - 1) <= startId) {
logger.debug('无需导出,当前消息已是最新', {
startId,
exportMaxId,
method,
})
onProgress?.(100, '无需导出,当前消息已是最新', {
chatId,
format,
path: exportPath,
Expand All @@ -227,17 +259,18 @@ export class ExportService {
})
return { count: 0, failedCount: 0 }
}

try {
// Try to export messages
for await (const message of this.client.getMessages(chatId, undefined, {
for await (const message of this.client.getMessages(chatId, expectedTotal, {
skipMedia: isSkipMedia('photo') || isSkipMedia('video') || isSkipMedia('document') || isSkipMedia('sticker'),
startTime,
endTime,
limit,
messageTypes,
method,
minId: startId, // 使用增量导出的起始ID
maxId: exportMaxId, // 使用传入的最大ID限制
minId: startId,
maxId: exportMaxId,
})) {
// 在获取第一条消息时记录日志
if (count === 0) {
Expand All @@ -246,6 +279,7 @@ export class ExportService {
messageType: message.type,
createdAt: message.createdAt,
minIdUsed: startId,
method,
})
}

Expand All @@ -259,7 +293,7 @@ export class ExportService {
messages = []

// Report progress
const progress = Math.min(95, Math.floor((count / total) * 90) + 5)
const progress = Math.min(95, Math.floor((count / expectedTotal) * 90) + 5)
onProgress?.(progress, `已处理 ${count} 条消息`, {
chatId,
format,
Expand All @@ -273,7 +307,7 @@ export class ExportService {
limit,
batchSize,
method,
totalMessages: incremental ? count : total,
totalMessages: expectedTotal,
processedMessages: count,
failedMessages: failedCount > 0 ? failedCount : undefined,
})
Expand Down Expand Up @@ -337,7 +371,7 @@ export class ExportService {
limit,
batchSize,
method,
totalMessages: incremental ? count : total,
totalMessages: expectedTotal,
processedMessages: count,
failedMessages: failedCount > 0 ? failedCount : undefined,
})
Expand Down Expand Up @@ -365,7 +399,7 @@ export class ExportService {
limit,
batchSize,
method,
totalMessages: incremental ? count : total,
totalMessages: expectedTotal,
processedMessages: count,
failedMessages: failedCount > 0 ? failedCount : undefined,
})
Expand All @@ -381,8 +415,8 @@ export class ExportService {
type: 'waiting',
waitSeconds,
resumeTime: new Date(Date.now() + waitSeconds * 1000).toISOString(),
remainingCount: total - count,
totalMessages: incremental ? count : total,
remainingCount: expectedTotal - count,
totalMessages: expectedTotal,
processedMessages: count,
failedMessages: failedCount > 0 ? failedCount : undefined,
})
Expand Down
Empty file added [email protected]
Empty file.