Skip to content

Deploy issues #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ yarn.lock
# Sentry Auth Token
.sentryclirc
.DS_Store
public/
public/
.qodo
164 changes: 150 additions & 14 deletions src/bot.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/* eslint-disable import/first */
import * as Events from 'events'
Events.EventEmitter.defaultMaxListeners = 30

import { Sentry } from './monitoring/instrument'
import express from 'express'
import asyncHandler from 'express-async-handler'
Expand Down Expand Up @@ -41,11 +45,11 @@ import prometheusRegister, { PrometheusMetrics } from './metrics/prometheus'
import { chatService, statsService } from './database/services'
import { AppDataSource } from './database/datasource'
import { autoRetry } from '@grammyjs/auto-retry'
import { run } from '@grammyjs/runner'
import { run, type RunnerHandle } from '@grammyjs/runner'
import { runBotHeartBit } from './monitoring/monitoring'
import { type BotPaymentLog } from './database/stats.service'
import { TelegramPayments } from './modules/telegram_payment'
import * as Events from 'events'

import { ES } from './es'
import { hydrateFiles } from '@grammyjs/files'
import { VoiceTranslateBot } from './modules/voice-translate'
Expand All @@ -60,8 +64,7 @@ import { llmModelManager } from './modules/llms/utils/llmModelsManager'
import { HmnyBot } from './modules/hmny'
import { LumaBot } from './modules/llms/lumaBot'
import { XaiBot } from './modules/llms/xaiBot'

Events.EventEmitter.defaultMaxListeners = 30
import { DeepSeekBot } from './modules/llms/deepSeekBot'

const logger = pino({
name: 'bot',
Expand Down Expand Up @@ -216,6 +219,7 @@ const lumaBot = new LumaBot(payments)
const claudeBot = new ClaudeBot(payments)
const vertexBot = new VertexBot(payments, [llamaAgent])
const xaiBot = new XaiBot(payments)
const deepSeekBot = new DeepSeekBot(payments)
const oneCountryBot = new OneCountryBot(payments)
const translateBot = new TranslateBot()
const telegramPayments = new TelegramPayments(payments)
Expand Down Expand Up @@ -347,6 +351,7 @@ const PayableBots: Record<string, PayableBotConfig> = {
vertexBot: { bot: vertexBot },
lumaBot: { bot: lumaBot },
aixBot: { bot: xaiBot },
deepSeekBot: { bot: deepSeekBot },
openAiBot: {
enabled: (ctx: OnMessageContext) => ctx.session.dalle.isEnabled,
bot: openAiBot
Expand All @@ -361,7 +366,7 @@ const UtilityBots: Record<string, UtilityBot> = {
}

const executeOrRefund = async (ctx: OnMessageContext, price: number, bot: PayableBot): Promise<void> => {
const refund = (reason?: string): void => {}
const refund = (reason?: string): void => { }
await bot.onEvent(ctx, refund).catch((ex: any) => {
Sentry.captureException(ex)
logger.error(ex?.message ?? 'Unknown error')
Expand Down Expand Up @@ -701,19 +706,114 @@ app.get('/metrics', asyncHandler(async (req, res): Promise<void> => {
async function bootstrap (): Promise<void> {
const httpServer = app.listen(config.port, () => {
logger.info(`Bot listening on port ${config.port}`)
// bot.start({
// allowed_updates: ["callback_query"], // Needs to be set for menu middleware, but bot doesn't work with current configuration.
// });
})

await AppDataSource.initialize()
payments.bootstrap()
// Database connection retry logic
const connectToDatabase = async (maxRetries = 5): Promise<boolean> => {
let retries = 0
let connected = false

const prometheusMetrics = new PrometheusMetrics()
await prometheusMetrics.bootstrap()
while (!connected && retries < maxRetries) {
try {
logger.info(`Database connection attempt ${retries + 1}/${maxRetries}...`)
// Check if already initialized
if (AppDataSource.isInitialized) {
logger.info('Database already initialized')
return true
}
await AppDataSource.initialize()
logger.info('Database initalizated')
connected = true
return true
} catch (error) {
retries++
logger.error(`Database connection failed: ${(error as Error).message}`)
if (retries >= maxRetries) {
logger.error('Maximum database connection retry attempts reached')
return false
}
// Exponential backoff
const delay = Math.min(1000 * (2 ** retries), 30000)
logger.info(`Waiting ${delay / 1000} seconds before retrying database connection...`)
await new Promise(resolve => setTimeout(resolve, delay))
}
}
return connected
}

// Connect to database with retries
const dbConnected = await connectToDatabase()
if (!dbConnected) {
logger.error('Failed to connect to database after multiple attempts')
// Continue running but bot functionality will be limited
}

try {
payments.bootstrap()
} catch (error) {
logger.error(`Payments bootstrap error: ${error}`)
// Continue despite payment initialization errors
}

// Only try to initialize Prometheus metrics if database is connected
let prometheusMetrics: PrometheusMetrics | null = null
if (dbConnected) {
try {
prometheusMetrics = new PrometheusMetrics()
await prometheusMetrics.bootstrap()
} catch (error) {
logger.error(`Prometheus metrics bootstrap error: ${error}`)
// Continue despite metrics initialization errors
}
} else {
logger.warn('Skipping Prometheus metrics initialization due to database connection failure')
}

const runner = run(bot)
// Telegram connection retry logic
const connectToTelegram = async (maxRetries = 5): Promise<RunnerHandle | undefined> => {
let retries = 0
let connected = false
let runner: RunnerHandle | undefined

logger.info('Starting Telegram bot connection attempt...')

while (!connected && retries < maxRetries) {
try {
logger.info(`Attempt ${retries + 1}/${maxRetries} to connect to Telegram API...`)
// First test the connection to Telegram's API
await bot.api.getMe()
logger.info('Successfully connected to Telegram API')
// If connection test succeeded, initialize the runner
runner = run(bot)
logger.info('Bot runner started successfully')
connected = true
return runner
} catch (error) {
retries++
logger.error(`Failed to connect to Telegram: ${(error as Error).message}`)
if (retries >= maxRetries) {
logger.error('Maximum Telegram connection retry attempts reached')
return undefined
}
// Calculate backoff delay
const delay = Math.min(1000 * (2 ** retries), 30000)
logger.info(`Waiting ${delay / 1000} seconds before retry ${retries + 1}...`)
await new Promise(resolve => setTimeout(resolve, delay))
}
}
return undefined
}

// Try to connect to Telegram with retry logic
let runner: RunnerHandle | undefined
try {
runner = await connectToTelegram()
} catch (error) {
logger.error(`Error in Telegram connection retry process: ${(error as Error).message}`)
// Don't exit yet, we'll handle this gracefully
}

// Set up application shutdown handler
const stopApplication = async (): Promise<void> => {
console.warn('Terminating the bot...')

Expand All @@ -739,10 +839,12 @@ async function bootstrap (): Promise<void> {
}
}

// Setup signal handlers
process.on('SIGINT', () => { stopApplication().catch(logger.error) })
process.on('SIGTERM', () => { stopApplication().catch(logger.error) })

if (config.betteruptime.botHeartBitId) {
// Setup heartbeat monitor if configured
if (config.betteruptime.botHeartBitId && runner) {
const task = await runBotHeartBit(runner, config.betteruptime.botHeartBitId)
const stopHeartBit = (): void => {
logger.info('heart bit stopping')
Expand All @@ -751,6 +853,40 @@ async function bootstrap (): Promise<void> {
process.once('SIGINT', stopHeartBit)
process.once('SIGTERM', stopHeartBit)
}

// If both database and Telegram had connection issues, schedule a restart
if (!dbConnected || !runner) {
logger.error('Critical services failed to initialize.')
// Schedule a restart after a delay to avoid rapid restart cycles
const restartDelay = 5 * 60 * 1000 // 5 minutes
logger.info(`Scheduling application restart in ${restartDelay / 1000} seconds...`)
setTimeout(() => {
logger.info('Executing scheduled restart after initialization failures')
process.exit(1) // This will trigger a restart by Fly.io
}, restartDelay)
}

try {
payments.bootstrap()
} catch (error) {
logger.error(`Payments bootstrap error: ${error}`)
// Continue despite payment initialization errors
}

if (dbConnected && AppDataSource.driver) {
try {
// Handle any driver-level errors that bubble up
const driver = AppDataSource.driver as any
// Only attach to events if they're available, don't force it
if (driver.eventEmitter && typeof driver.eventEmitter.on === 'function') {
driver.eventEmitter.on('error', (error: any) => {
logger.error(`Database error event: ${error.message}`)
})
}
} catch (error) {
logger.warn(`Could not set up database error listener: ${(error as Error).message}`)
}
}
}

bootstrap().catch((error) => {
Expand Down
131 changes: 131 additions & 0 deletions src/modules/llms/api/deepseek.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import axios, { type AxiosResponse } from 'axios'
import { type Readable } from 'stream'
import { GrammyError } from 'grammy'
import { pino } from 'pino'

import config from '../../../config'
import { type OnCallBackQueryData, type OnMessageContext, type ChatConversation } from '../../types'
import { type LlmCompletion } from './llmApi'
import { headersStream } from './helper'
import { LlmModelsEnum } from '../utils/llmModelsManager'
import { type ModelParameters } from '../utils/types'
import { prepareConversation } from './openai'

const logger = pino({
name: 'deepSeek - llmsBot',
transport: {
target: 'pino-pretty',
options: { colorize: true }
}
})

const hasValidContent = (text: string): boolean => {
const trimmed = text.trim()
return trimmed.length > 0 && trimmed !== '\n' && !/^\s+$/.test(trimmed)
}

const API_ENDPOINT = config.llms.apiEndpoint // 'http://127.0.0.1:5000' // config.llms.apiEndpoint

export const deepSeekStreamCompletion = async (
conversation: ChatConversation[],
model = LlmModelsEnum.GPT_35_TURBO,
ctx: OnMessageContext | OnCallBackQueryData,
msgId: number,
limitTokens = true,
parameters?: ModelParameters
): Promise<LlmCompletion> => {
logger.info(`Handling ${model} stream completion`)
parameters = parameters ?? {
system: ctx.session.currentPrompt,
max_tokens: +config.openAi.chatGpt.maxTokens
}
const data = {
model,
stream: true,
max_tokens: limitTokens ? parameters.max_tokens : undefined,
messages: prepareConversation(conversation, model, ctx)
}
let wordCount = 0
let wordCountMinimum = 2
const url = `${API_ENDPOINT}/deepseek/completions`
if (!ctx.chat?.id) {
throw new Error('Context chat id should not be empty after openAI streaming')
}
const response: AxiosResponse = await axios.post(url, data, headersStream)

const completionStream: Readable = response.data
let completion = ''
let outputTokens = ''
let inputTokens = ''
let message = ''
for await (const chunk of completionStream) {
const msg = chunk.toString()
if (msg) {
if (msg.includes('Input Tokens:')) {
const tokenMsg = msg.split('Input Tokens: ')[1]
inputTokens = tokenMsg.split('Output Tokens: ')[0]
outputTokens = tokenMsg.split('Output Tokens: ')[1]
completion += msg.split('Input Tokens: ')[0]
completion = completion.split('Input Tokens: ')[0]
} else if (msg.includes('Output Tokens: ')) {
outputTokens = msg.split('Output Tokens: ')[1]
completion = completion.split('Output Tokens: ')[0]
} else {
wordCount++
completion += msg
if (wordCount > wordCountMinimum) {
if (wordCountMinimum < 64) {
wordCountMinimum *= 2
}
completion = completion.replaceAll('...', '')
completion += '...'
wordCount = 0
if (ctx.chat?.id && message !== completion) {
message = completion
await ctx.api
.editMessageText(ctx.chat?.id, msgId, completion)
.catch(async (e: any) => {
if (e instanceof GrammyError) {
if (e.error_code !== 400) {
throw e
} else {
logger.error(e.message)
}
} else {
throw e
}
})
}
}
}
}
}
completion = completion.replaceAll('...', '')
hasValidContent(completion) && await ctx.api
.editMessageText(ctx.chat?.id, msgId, completion)
.catch((e: any) => {
if (e instanceof GrammyError) {
if (e.error_code !== 400) {
throw e
} else {
logger.error(e)
}
} else {
throw e
}
})
const totalOutputTokens = outputTokens // response.headers['x-openai-output-tokens']
const totalInputTokens = inputTokens // response.headers['x-openai-input-tokens']
return {
completion: {
content: completion,
role: 'assistant',
model,
timestamp: Date.now()
},
usage: parseInt(totalOutputTokens, 10) + parseInt(totalInputTokens, 10),
price: 0,
inputTokens: parseInt(totalInputTokens, 10),
outputTokens: parseInt(totalOutputTokens, 10)
}
}
4 changes: 2 additions & 2 deletions src/modules/llms/api/openai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ export async function alterGeneratedImg (

type ConversationOutput = Omit<ChatConversation, 'timestamp' | 'model' | 'id' | 'author' | 'numSubAgents'>

const prepareConversation = (conversation: ChatConversation[], model: string, ctx: OnMessageContext | OnCallBackQueryData): ConversationOutput[] => {
export const prepareConversation = (conversation: ChatConversation[], model: string, ctx: OnMessageContext | OnCallBackQueryData): ConversationOutput[] => {
const messages = conversation.filter(c => c.model === model).map(m => { return { content: m.content, role: m.role } })
if (messages.length !== 1 || model === LlmModelsEnum.O1) {
if (messages.length !== 1 || model === LlmModelsEnum.O3 || model.includes('deep')) {
return messages
}
const systemMessage = {
Expand Down
2 changes: 1 addition & 1 deletion src/modules/llms/api/vertex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export const vertexStreamCompletion = async (
): Promise<LlmCompletion> => {
parameters = parameters ?? {
system: ctx.session.currentPrompt,
max_tokens: +config.openAi.chatGpt.maxTokens
max_tokens_to_sample: +config.openAi.chatGpt.maxTokens
}
const data = {
model,
Expand Down
Loading
Loading