Skip to content

Commit a92baf0

Browse files
authored
Merge pull request #162 from APIParkLab/feature/ai-balance
Feature/ai balance
2 parents d7e28c9 + 46e2edb commit a92baf0

File tree

46 files changed

+1101
-346
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1101
-346
lines changed

.gitlab-ci.yml

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
variables:
2+
PATH: /opt/go-1.21/go/bin/:/opt/node/node/bin/:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin
3+
GOROOT: /opt/go-1.21/go
4+
GOPROXY: https://goproxy.cn
5+
VERSION: $CI_COMMIT_SHORT_SHA
6+
APP: apipark
7+
APP_PRE: ${APP}_${VERSION}
8+
BUILD_DIR: ${APP}-build
9+
DEPLOY_DESC: "DEV 环境"
10+
VIEW_ADDR: http://172.18.166.219:8288
11+
SAVE_DIR: /opt/${APP}
12+
NODE_OPTIONS: --max_old_space_size=8192
13+
14+
stages:
15+
- notice
16+
- prefix
17+
- build
18+
- deploy
19+
- webhook
20+
21+
feishu-informer: # 飞书回调
22+
stage: notice
23+
variables:
24+
DIFF_URL: "$CI_MERGE_REQUEST_PROJECT_URL/-/merge_requests/$CI_MERGE_REQUEST_IID/diffs"
25+
rules:
26+
- if: $CI_PIPELINE_SOURCE=="merge_request_event" && $CI_COMMIT_BRANCH =~ "main"
27+
script:
28+
- echo "merge request"
29+
- |
30+
curl -X POST -H "Content-Type: application/json" \
31+
-d "{\"msg_type\":\"text\",\"content\":{\"text\":\"项目:${CI_PROJECT_NAME}\\n提交人:${GITLAB_USER_NAME}\\n提交信息:${CI_MERGE_REQUEST_TITLE}\\n合并分支信息:${CI_MERGE_REQUEST_SOURCE_BRANCH_NAME} -> ${CI_MERGE_REQUEST_TARGET_BRANCH_NAME}\\n差异性地址:${DIFF_URL}\\n请及时review代码\"}}" \
32+
https://open.feishu.cn/open-apis/bot/v2/hook/1c334752-2874-41a1-8f1b-3060f2d46b6c
33+
34+
prebuild:
35+
stage: prefix
36+
rules:
37+
- if: $CI_COMMIT_BRANCH == "main"
38+
script:
39+
- echo "prebuild"
40+
- chmod +x ./scripts/prefix.sh
41+
- ./scripts/prefix.sh
42+
43+
builder:
44+
stage: build
45+
rules:
46+
- if: $CI_COMMIT_BRANCH == "main"
47+
script:
48+
- set -e
49+
- |
50+
if [ ! -d "../artifacts" ]; then
51+
mkdir -p ../artifacts
52+
fi
53+
if [ -d "../artifacts/dist" ]; then
54+
cp -r ../artifacts/dist frontend/dist
55+
fi
56+
- |
57+
if [ -n "$(git diff --name-status HEAD~1 HEAD -- frontend)" ]; then
58+
./scripts/build.sh $BUILD_DIR ${VERSION} all ""
59+
else
60+
./scripts/build.sh $BUILD_DIR ${VERSION}
61+
fi
62+
if [ -d "frontend/dist" ]; then
63+
echo "copy frontend/dist to artifacts/dist"
64+
rm -fr ../artifacts/dist
65+
cp -r frontend/dist ../artifacts/dist
66+
fi
67+
cp $BUILD_DIR/${APP_PRE}_linux_amd64.tar.gz ${SAVE_DIR}
68+
69+
deployer:
70+
stage: deploy
71+
rules:
72+
- if: $CI_COMMIT_BRANCH == "main"
73+
variables:
74+
APIPARK_GUEST_MODE: allow
75+
APIPARK_GUEST_ID: dklejrfbhjqwdh
76+
script:
77+
- cd ${SAVE_DIR};mkdir -p ${APP_PRE};tar -zxvf ${APP_PRE}_linux_amd64.tar.gz -C ${APP_PRE};cd ${APP_PRE};./install.sh ${SAVE_DIR};./run.sh restart;cd ${SAVE_DIR} && ./clean.sh ${APP_PRE}
78+
when: on_success
79+
success:
80+
stage: webhook
81+
rules:
82+
- if: $CI_COMMIT_BRANCH == "main"
83+
script:
84+
- |
85+
curl -X POST -H "Content-Type: application/json" \
86+
-d "{\"msg_type\":\"text\",\"content\":{\"text\":\"最近一次提交:${CI_COMMIT_TITLE}\\n提交人:${GITLAB_USER_NAME}\\n项目:${CI_PROJECT_NAME}\\n环境:${DEPLOY_DESC}\\n更新部署完成.\\n访问地址:${VIEW_ADDR}\\n工作流地址:${CI_PIPELINE_URL}\"}}" \
87+
https://open.feishu.cn/open-apis/bot/v2/hook/c3672932-4dfa-4989-8023-0128bae59338
88+
when: on_success
89+
failure:
90+
stage: webhook
91+
rules:
92+
- if: $CI_COMMIT_BRANCH == "main"
93+
script:
94+
- |
95+
curl -X POST -H "Content-Type: application/json" \
96+
-d "{\"msg_type\":\"text\",\"content\":{\"text\":\"最近一次提交:${CI_COMMIT_TITLE}\\n提交人:${GITLAB_USER_NAME}\\n项目:${CI_PROJECT_NAME}\\n环境:${DEPLOY_DESC}\\n更新部署失败,请及时到gitlab上查看\\n工作流地址:${CI_PIPELINE_URL}\"}}" \
97+
https://open.feishu.cn/open-apis/bot/v2/hook/c3672932-4dfa-4989-8023-0128bae59338
98+
when: on_failure

app/ai-event-handler/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/config.yml

app/ai-event-handler/main.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"log"
7+
"os"
8+
"os/signal"
9+
"syscall"
10+
11+
"github.com/eolinker/go-common/autowire"
12+
nsq "github.com/nsqio/go-nsq"
13+
14+
"github.com/eolinker/go-common/cftool"
15+
16+
_ "github.com/eolinker/go-common/store/store_mysql"
17+
_ "github.com/go-sql-driver/mysql"
18+
)
19+
20+
var (
21+
version string
22+
confPath string
23+
)
24+
25+
func init() {
26+
flag.StringVar(&confPath, "c", "config.yml", "`config` file path for server ")
27+
}
28+
29+
type ServerConfig struct {
30+
Port int `yaml:"port"`
31+
}
32+
33+
func main() {
34+
// 1. 连接 MySQL 数据库
35+
cftool.Register[ServerConfig](fmt.Sprintf("root:%s", confPath))
36+
cftool.ReadFile(confPath)
37+
38+
handler := &NSQHandler{}
39+
autowire.Autowired(handler)
40+
err := autowire.CheckComplete()
41+
if err != nil {
42+
log.Fatal("check autowired:", err)
43+
return
44+
}
45+
// 2. 创建 NSQ 消费者
46+
config := nsq.NewConfig()
47+
hostname, err := os.Hostname()
48+
if err != nil {
49+
log.Fatalf("Failed to get hostname: %v", err)
50+
return
51+
}
52+
nsqConfig := handler.nsqConfig
53+
consumer, err := nsq.NewConsumer(fmt.Sprintf("%s_ai_event", nsqConfig.TopicPrefix), hostname, config)
54+
if err != nil {
55+
log.Fatalf("Failed to create NSQ consumer: %v", err)
56+
}
57+
58+
consumer.AddHandler(handler)
59+
60+
// 4. 连接到 NSQ
61+
//nsqAddress := "172.18.166.219:9150" // NSQ 地址
62+
err = consumer.ConnectToNSQD(nsqConfig.Addr)
63+
if err != nil {
64+
log.Fatalf("Failed to connect to NSQ: %v", err)
65+
}
66+
log.Println("Connected to NSQ")
67+
68+
// 5. 捕获系统信号,优雅关闭
69+
sigChan := make(chan os.Signal, 1)
70+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
71+
<-sigChan
72+
73+
// 优雅停止消费者
74+
consumer.Stop()
75+
<-consumer.StopChan
76+
log.Println("NSQ Consumer stopped")
77+
}

app/ai-event-handler/nsq.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"log"
7+
"strings"
8+
"time"
9+
10+
"github.com/eolinker/go-common/cftool"
11+
12+
ai_dto "github.com/APIParkLab/APIPark/module/ai/dto"
13+
14+
"github.com/eolinker/go-common/store"
15+
16+
"github.com/APIParkLab/APIPark/service/ai"
17+
18+
ai_key "github.com/APIParkLab/APIPark/service/ai-key"
19+
20+
nsq "github.com/nsqio/go-nsq"
21+
22+
ai_api "github.com/APIParkLab/APIPark/service/ai-api"
23+
)
24+
25+
func init() {
26+
cftool.Register[NSQConfig]("nsq")
27+
}
28+
29+
type NSQConfig struct {
30+
Addr string `json:"addr"`
31+
TopicPrefix string `json:"topic_prefix"`
32+
}
33+
34+
// 定义 NSQ 消息结构
35+
type AIProviderStatus struct {
36+
Provider string `json:"provider"`
37+
Model string `json:"model"`
38+
Key string `json:"key"`
39+
Status string `json:"status"`
40+
}
41+
42+
type AIInfo struct {
43+
Model string `json:"ai_model"`
44+
Cost interface{} `json:"ai_model_cost"`
45+
InputToken interface{} `json:"ai_model_input_token"`
46+
OutputToken interface{} `json:"ai_model_output_token"`
47+
TotalToken interface{} `json:"ai_model_total_token"`
48+
Provider string `json:"ai_provider"`
49+
ProviderStats []AIProviderStatus `json:"ai_provider_statuses"`
50+
}
51+
52+
type NSQMessage struct {
53+
AI AIInfo `json:"ai"`
54+
API string `json:"api"`
55+
Provider string `json:"provider"`
56+
RequestID string `json:"request_id"`
57+
TimeISO8601 string `json:"time_iso8601"`
58+
}
59+
60+
// NSQHandler 处理 NSQ 消息并写入 MySQL
61+
type NSQHandler struct {
62+
apiUseService ai_api.IAPIUseService `autowired:""`
63+
aiKeyService ai_key.IKeyService `autowired:""`
64+
aiService ai.IProviderService `autowired:""`
65+
transaction store.ITransaction `autowired:""`
66+
nsqConfig *NSQConfig `autowired:""`
67+
ctx context.Context
68+
}
69+
70+
func convertInt(value interface{}) int {
71+
switch v := value.(type) {
72+
case int:
73+
return v
74+
case float64:
75+
return int(v)
76+
default:
77+
return 0
78+
}
79+
}
80+
81+
// HandleMessage 处理从 NSQ 读取的消息
82+
func (h *NSQHandler) HandleMessage(message *nsq.Message) error {
83+
log.Printf("Received message: %s", string(message.Body))
84+
85+
// 解析消息为结构体
86+
var data NSQMessage
87+
err := json.Unmarshal(message.Body, &data)
88+
if err != nil {
89+
log.Printf("Failed to unmarshal message: %v", err)
90+
return err
91+
}
92+
93+
// 将时间字符串转换为 time.Time
94+
timestamp, err := time.Parse(time.RFC3339, data.TimeISO8601)
95+
if err != nil {
96+
log.Printf("Failed to parse timestamp: %v", err)
97+
return err
98+
}
99+
100+
day := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), 0, 0, 0, 0, timestamp.Location())
101+
hour := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), timestamp.Hour(), 0, 0, 0, timestamp.Location())
102+
minute := time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), timestamp.Hour(), timestamp.Minute(), 0, 0, timestamp.Location())
103+
return h.transaction.Transaction(context.Background(), func(ctx context.Context) error {
104+
finalStatus := &AIProviderStatus{}
105+
for _, s := range data.AI.ProviderStats {
106+
status := ToKeyStatus(s.Status).Int()
107+
keys := strings.Split(s.Key, "@")
108+
key := keys[0]
109+
err = h.aiKeyService.Save(ctx, key, &ai_key.Edit{
110+
Status: &status,
111+
})
112+
if err != nil {
113+
log.Printf("Failed to save AI key: %v", err)
114+
return err
115+
}
116+
if s.Provider != data.AI.Provider {
117+
118+
pStatus := ai_dto.ProviderAbnormal.Int()
119+
err = h.aiService.Save(ctx, s.Provider, &ai.SetProvider{
120+
Status: &pStatus,
121+
})
122+
} else {
123+
pStatus := ai_dto.ProviderEnabled.Int()
124+
err = h.aiService.Save(ctx, s.Provider, &ai.SetProvider{
125+
Status: &pStatus,
126+
})
127+
}
128+
finalStatus = &s
129+
}
130+
if finalStatus != nil {
131+
keys := strings.Split(finalStatus.Key, "@")
132+
err = h.aiKeyService.IncrUseToken(ctx, keys[0], convertInt(data.AI.TotalToken))
133+
if err != nil {
134+
log.Printf("Failed to increment AI key token: %v", err)
135+
return err
136+
}
137+
}
138+
139+
// 调用 AI API 接口
140+
err = h.apiUseService.Incr(context.Background(), &ai_api.IncrAPIUse{
141+
API: data.API,
142+
Service: data.Provider,
143+
Provider: data.AI.Provider,
144+
Model: data.AI.Model,
145+
Day: day.Unix(),
146+
Hour: hour.Unix(),
147+
Minute: minute.Unix(),
148+
InputToken: convertInt(data.AI.InputToken),
149+
OutputToken: convertInt(data.AI.OutputToken),
150+
TotalToken: convertInt(data.AI.TotalToken),
151+
})
152+
if err != nil {
153+
log.Printf("Failed to call AI API: %v", err)
154+
return err
155+
}
156+
157+
log.Printf("Message processed and saved to MySQL: %+v", data)
158+
return nil
159+
})
160+
161+
}

app/ai-event-handler/status.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package main
2+
3+
import ai_key_dto "github.com/APIParkLab/APIPark/module/ai-key/dto"
4+
5+
var (
6+
StatusNormal = "normal"
7+
StatusInvalidRequest = "invalid request"
8+
StatusQuotaExhausted = "quota exhausted"
9+
StatusExpired = "expired"
10+
StatusExceeded = "exceeded"
11+
StatusInvalid = "invalid"
12+
StatusTimeout = "timeout"
13+
)
14+
15+
func ToKeyStatus(status string) ai_key_dto.KeyStatus {
16+
switch status {
17+
case StatusNormal:
18+
return ai_key_dto.KeyNormal
19+
case StatusInvalidRequest:
20+
return ai_key_dto.KeyNormal
21+
case StatusQuotaExhausted:
22+
return ai_key_dto.KeyExceed
23+
case StatusExpired:
24+
return ai_key_dto.KeyExpired
25+
case StatusExceeded:
26+
return ai_key_dto.KeyNormal
27+
case StatusInvalid:
28+
return ai_key_dto.KeyError
29+
case StatusTimeout:
30+
return ai_key_dto.KeyError
31+
default:
32+
return ai_key_dto.KeyNormal
33+
}
34+
}

0 commit comments

Comments
 (0)