Skip to content

Commit dcedde2

Browse files
authored
feat: producer support custom logger (#316)
1 parent 642c6a8 commit dcedde2

File tree

10 files changed

+195
-36
lines changed

10 files changed

+195
-36
lines changed

consumer/README.md

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ git clone [email protected]:aliyun/aliyun-log-go-sdk.git
3333

3434
## 使用步骤
3535

36-
1.**配置LogHubConfig**
36+
### 1.**配置LogHubConfig**
3737

3838
LogHubConfig是提供给用户的配置类,用于配置消费策略,您可以根据不同的需求设定不同的值,各参数含义如其中所示
3939
|参数|含义|详情|
@@ -54,7 +54,7 @@ LogHubConfig是提供给用户的配置类,用于配置消费策略,您可
5454
|MaxFetchLogGroupCount|数据一次拉取的log group数量|非必填,默认为1000|
5555
|CursorStartTime|数据点位的时间戳|非必填,CursorPosition为SPECIAL_TIME_CURSOR时需填写|
5656
|InOrder|shard分裂后是否in order消费|非必填,默认为false,当为true时,分裂shard会在老的read only shard消费完后再继续消费|
57-
|Logger|自定义日志Logger|非必填,此logger只用于记录消费者自身状态。如果为 nil,会使用默认的logger。若指定了自定义logger,会忽略 AllowLogLevelLogFileNameLogMaxSizeLogMaxBackups、LogCompass等参数|
57+
|Logger|自定义日志Logger|非必填,此logger只用于记录消费者自身状态。<ul><li>如果非 nil,会忽略 AllowLogLevel / LogFileName/ IsJsonType/ LogMaxSize/ LogMaxBackups/ LogCompass 参数。</li><li>如果为 nil,会根据 AllowLogLevel / LogFileName/ IsJsonType/ LogMaxSize/ LogMaxBackups/ LogCompass 参数自动创建一个 logger 用于记录本地运行日志。</li></ul>|
5858
|AllowLogLevel|允许的日志级别|非必填,默认为info,日志级别由低到高为debug, info, warn, error,仅高于此AllowLogLevel的才会被log出来|
5959
|LogFileName|程序运行日志文件名称|非必填,默认为stdout|
6060
|IsJsonType|是否为json类型|非必填,默认为logfmt格式,true时为json格式|
@@ -67,7 +67,14 @@ LogHubConfig是提供给用户的配置类,用于配置消费策略,您可
6767
|AutoCommitIntervalInMS|自动提交checkpoint的时间间隔|非必填,单位为MS,默认时间为60s|
6868
|Query|过滤规则 基于规则消费时必须设置对应规则 如 *| where a = 'xxx'|非必填|
6969

70-
2.**覆写消费逻辑**
70+
71+
**自定义 logger**
72+
73+
consumer 支持将 consumer 自身本地运行日志写入到自定义 logger 中,可参考 [demo](../example/consumer/custom_logger/with_custom_logger.go)
74+
75+
76+
77+
### 2.**覆写消费逻辑**
7178

7279
```
7380
func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) {
@@ -99,7 +106,7 @@ type Processor interface {
99106
100107
```
101108

102-
3.**创建消费者并开始消费**
109+
### 3.**创建消费者并开始消费**
103110

104111
```
105112
// option是LogHubConfig的实例
@@ -113,7 +120,7 @@ consumerWorker.Start()
113120
114121
调用InitConsumerWorkwer方法,将配置实例对象和消费函数传递到参数中生成消费者实例对象,调用Start方法进行消费。
115122

116-
4.**关闭消费者**
123+
### 4.**关闭消费者**
117124

118125
```
119126
ch := make(chan os.Signal, 1) //将os信号值作为信道
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"os/signal"
7+
"syscall"
8+
9+
sls "github.com/aliyun/aliyun-log-go-sdk"
10+
consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer"
11+
"github.com/go-kit/kit/log"
12+
"github.com/go-kit/kit/log/level"
13+
"gopkg.in/natefinch/lumberjack.v2"
14+
)
15+
16+
// README :
17+
// This is a very simple example of creating a consumer worker with custom logger.
18+
19+
func main() {
20+
//
21+
writer := &lumberjack.Logger{
22+
Filename: "producer.log",
23+
MaxSize: 100,
24+
MaxBackups: 10,
25+
Compress: false,
26+
}
27+
logger := log.NewLogfmtLogger(writer) // or write to stdout by: logger := log.NewLogfmtLogger(os.Stdout)
28+
// set log level
29+
logger = level.NewFilter(logger, level.AllowInfo())
30+
// add log time and caller info
31+
logger = log.With(logger, "time", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
32+
33+
option := consumerLibrary.LogHubConfig{
34+
Endpoint: "",
35+
AccessKeyID: "",
36+
AccessKeySecret: "",
37+
Project: "",
38+
Logstore: "",
39+
ConsumerGroupName: "",
40+
ConsumerName: "",
41+
CursorPosition: consumerLibrary.END_CURSOR,
42+
Logger: logger, // set producer logger
43+
}
44+
45+
consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process)
46+
ch := make(chan os.Signal, 1)
47+
signal.Notify(ch, syscall.SIGTERM)
48+
consumerWorker.Start()
49+
if _, ok := <-ch; ok {
50+
level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName)
51+
consumerWorker.StopAndWait()
52+
}
53+
}
54+
55+
// Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value,
56+
// otherwise you will report errors.
57+
func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) {
58+
fmt.Println(shardId, logGroupList)
59+
checkpointTracker.SaveCheckPoint(false)
60+
return "", nil
61+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import (
4+
"os"
5+
"time"
6+
7+
sls "github.com/aliyun/aliyun-log-go-sdk"
8+
"github.com/aliyun/aliyun-log-go-sdk/producer"
9+
"github.com/go-kit/kit/log"
10+
"github.com/go-kit/kit/log/level"
11+
"google.golang.org/protobuf/proto"
12+
"gopkg.in/natefinch/lumberjack.v2"
13+
)
14+
15+
// README :
16+
// This is a very simple example of creating a producer with custom logger
17+
18+
func main() {
19+
// write to file
20+
writer := &lumberjack.Logger{
21+
Filename: "producer.log",
22+
MaxSize: 100,
23+
MaxBackups: 10,
24+
Compress: false,
25+
}
26+
logger := log.NewLogfmtLogger(writer) // or write to stdout by: logger := log.NewLogfmtLogger(os.Stdout)
27+
// set log level
28+
logger = level.NewFilter(logger, level.AllowInfo())
29+
// add log time and caller info
30+
logger = log.With(logger, "time", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
31+
32+
producerConfig := producer.GetDefaultProducerConfig()
33+
producerConfig.Logger = logger // set producer logger
34+
producerConfig.Endpoint = os.Getenv("Endpoint")
35+
producerConfig.AccessKeyID = os.Getenv("AccessKeyID")
36+
producerConfig.AccessKeySecret = os.Getenv("AccessKeySecret")
37+
producerInstance, err := producer.NewProducer(producerConfig)
38+
if err != nil {
39+
panic(err)
40+
}
41+
producerInstance.Start()
42+
43+
defer producerInstance.SafeClose()
44+
45+
for {
46+
log := &sls.Log{
47+
Time: proto.Uint32(uint32(time.Now().Unix())),
48+
Contents: []*sls.LogContent{{Key: proto.String("hello"), Value: proto.String("world")}},
49+
}
50+
producerInstance.SendLog("project", "logstore", "topic", "source", log)
51+
time.Sleep(time.Second)
52+
}
53+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ require (
3333
github.com/clbanning/mxj/v2 v2.5.5 // indirect
3434
github.com/davecgh/go-spew v1.1.1 // indirect
3535
github.com/frankban/quicktest v1.10.2 // indirect
36-
github.com/go-logfmt/logfmt v0.5.0 // indirect
36+
github.com/go-logfmt/logfmt v0.5.1 // indirect
3737
github.com/google/go-cmp v0.5.2 // indirect
3838
github.com/json-iterator/go v1.1.10 // indirect
3939
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect

go.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,9 @@ github.com/go-kit/kit v0.10.0 h1:dXFJfIHVvUcpSgDOV+Ne6t7jXri8Tfv2uOLHUZ2XNuo=
9898
github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o=
9999
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
100100
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
101-
github.com/go-logfmt/logfmt v0.5.0 h1:TrB8swr/68K7m9CcGut2g3UOihhbcbiMAYiuTXdEih4=
102101
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
102+
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
103+
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
103104
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
104105
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
105106
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=

producer/README.md

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,6 @@ func(callback *Callback)Fail(result *producer.Result){
127127
| MaxRetryBackoffMs | Int64 | 重试的最大退避时间,默认为 50 秒。 |
128128
| AdjustShargHash | Bool | 如果调用 send 方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为 true。 |
129129
| Buckets | Int | 当且仅当 adjustShardHash 为 true 时,该参数才生效。此时,producer 会自动将 shardHash 重新分组,分组数量为 buckets。<br/>如果两条数据的 shardHash 不同,它们是无法合并到一起发送的,会降低 producer 吞吐量。将 shardHash 重新分组后,能让数据有更多地机会被批量发送。该参数的取值范围是 [1, 256],且必须是 2 的整数次幂,默认为 64。 |
130-
| AllowLogLevel | String | 设置日志输出级别,默认值是Info,consumer中一共有4种日志输出级别,分别为debug,info,warn和error。 |
131-
| LogFileName | String | 日志文件输出路径,不设置的话默认输出到stdout。 |
132-
| IsJsonType | Bool | 是否格式化文件输出格式,默认为false。 |
133-
| LogMaxSize | Int | 单个日志存储数量,默认为10M。 |
134-
| LogMaxBackups | Int | 日志轮转数量,默认为10。 |
135-
| LogCompass | Bool | 是否使用gzip 压缩日志,默认为false。 |
136130
| Endpoint | String | 服务入口,关于如何确定project对应的服务入口可参考文章[服务入口](https://help.aliyun.com/document_detail/29008.html?spm=a2c4e.11153940.blogcont682761.14.446e7720gs96LB)|
137131
| AccessKeyID | String | 账户的AK id。 |
138132
| AccessKeySecret | String | 账户的AK 密钥。 |
@@ -143,6 +137,18 @@ func(callback *Callback)Fail(result *producer.Result){
143137
| Region | String | 日志服务的区域,当签名版本使用 AuthV4 时必选。 例如cn-hangzhou。 |
144138
| AuthVersion | String | 使用的签名版本,可选枚举值为 AuthV1, AuthV4。AuthV4 签名示例可参考程序 [producer_test.go](producer_test.go)|
145139
| UseMetricStoreURL | bool | 使用 Metricstore地址进行发送日志,可以提升大基数时间线下的查询性能。 |
140+
| Logger | log.Logger | 自定义 logger,该 logger 用于记录 producer 运行时产生的本地日志,不会被上传到服务端。 <ul><li>如果非 nil,会忽略 AllowLogLevel / LogFileName/ IsJsonType/ LogMaxSize/ LogMaxBackups/ LogCompass 参数。</li><li>如果为 nil,producer 会根据 AllowLogLevel / LogFileName/ IsJsonType/ LogMaxSize/ LogMaxBackups/ LogCompass 参数自动创建一个 logger 用于记录本地运行日志。</li></ul> |
141+
| AllowLogLevel | String | 设置日志输出级别,默认值是Info,consumer中一共有4种日志输出级别,分别为debug,info,warn和error。 |
142+
| LogFileName | String | 日志文件输出路径,不设置的话默认输出到stdout。 |
143+
| IsJsonType | Bool | 是否格式化文件输出格式,默认为false。 |
144+
| LogMaxSize | Int | 单个日志存储数量,默认为10M。 |
145+
| LogMaxBackups | Int | 日志轮转数量,默认为10。 |
146+
| LogCompass | Bool | 是否使用gzip 压缩日志,默认为false。 |
147+
148+
149+
### 自定义 logger
150+
producer 支持将 producer 自身本地运行日志写入到自定义 logger 中,可参考 [demo](../example/producer/custom_logger/with_custom_logger.go)
151+
146152

147153
## 关于性能
148154

producer/logger.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
package producer
22

33
import (
4+
"io"
5+
"os"
6+
47
"github.com/go-kit/kit/log"
58
"github.com/go-kit/kit/log/level"
69
"gopkg.in/natefinch/lumberjack.v2"
7-
"os"
8-
"io"
910
)
1011

11-
func logConfig(producerConfig *ProducerConfig) log.Logger {
12+
func getProducerLogger(producerConfig *ProducerConfig) log.Logger {
13+
// if producerConfig.Logger is not nil, use it as the logger
14+
// and ignore AllowLogLevel/LogFileName/LogMaxSize/LogMaxBackups/LogCompass
15+
if producerConfig.Logger != nil {
16+
return producerConfig.Logger
17+
}
1218
var writer io.Writer
1319
if producerConfig.LogFileName == "" {
1420
writer = log.NewSyncWriter(os.Stdout)

producer/producer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type Producer struct {
3131
}
3232

3333
func NewProducer(producerConfig *ProducerConfig) (*Producer, error) {
34-
logger := logConfig(producerConfig)
34+
logger := getProducerLogger(producerConfig)
3535
finalProducerConfig := validateProducerConfig(producerConfig, logger)
3636

3737
client, err := createClient(finalProducerConfig, false, logger)
@@ -43,7 +43,7 @@ func NewProducer(producerConfig *ProducerConfig) (*Producer, error) {
4343

4444
// Deprecated: use NewProducer instead.
4545
func InitProducer(producerConfig *ProducerConfig) *Producer {
46-
logger := logConfig(producerConfig)
46+
logger := getProducerLogger(producerConfig)
4747
finalProducerConfig := validateProducerConfig(producerConfig, logger)
4848

4949
client, _ := createClient(finalProducerConfig, true, logger)

producer/producer_config.go

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,52 @@ import (
55
"time"
66

77
sls "github.com/aliyun/aliyun-log-go-sdk"
8+
"github.com/go-kit/kit/log"
89
)
910

1011
const Delimiter = "|"
1112

1213
type UpdateStsTokenFunc = func() (accessKeyID, accessKeySecret, securityToken string, expireTime time.Time, err error)
1314

1415
type ProducerConfig struct {
15-
TotalSizeLnBytes int64
16-
MaxIoWorkerCount int64
17-
MaxBlockSec int
18-
MaxBatchSize int64
19-
MaxBatchCount int
20-
LingerMs int64
21-
Retries int
22-
MaxReservedAttempts int
23-
BaseRetryBackoffMs int64
24-
MaxRetryBackoffMs int64
25-
AdjustShargHash bool
26-
Buckets int
27-
AllowLogLevel string
28-
LogFileName string
29-
IsJsonType bool
30-
LogMaxSize int
31-
LogMaxBackups int
32-
LogCompress bool
16+
TotalSizeLnBytes int64
17+
MaxIoWorkerCount int64
18+
MaxBlockSec int
19+
MaxBatchSize int64
20+
MaxBatchCount int
21+
LingerMs int64
22+
Retries int
23+
MaxReservedAttempts int
24+
BaseRetryBackoffMs int64
25+
MaxRetryBackoffMs int64
26+
AdjustShargHash bool
27+
Buckets int
28+
29+
// Optional, defaults to nil.
30+
// The logger is used to record the runtime status of the consumer.
31+
// The logs generated by the logger will only be stored locally.
32+
// The parameters AllowLogLevel/LogFileName/LogMaxSize/LogMaxBackups/LogCompass
33+
// are ignored if the Logger is not nil.
34+
Logger log.Logger
35+
// Optional, defaults to info.
36+
// AllowLogLevel can be debug/info/warn/error, set the minimum level of the log to be recorded.
37+
AllowLogLevel string
38+
// Optional.
39+
// Setting Log File Path,eg: "/root/log/log_file.log". if not set, the log will go to stdout.
40+
LogFileName string
41+
// Optional, defaults to false.
42+
// Set whether the log output type is JSON.
43+
IsJsonType bool
44+
// Optional, defaults to 100, in megabytes.
45+
// MaxSize is the maximum size in megabytes of the log file before it gets rotated.
46+
LogMaxSize int
47+
// Optional, defaults to 10.
48+
// MaxBackups is the maximum number of old log files to retain.
49+
LogMaxBackups int
50+
// Optional, defaults to false.
51+
// Compress determines if the rotated log files should be compressed using gzip.
52+
LogCompress bool
53+
3354
Endpoint string
3455
NoRetryStatusCodeList []int
3556
HTTPClient *http.Client

producer/producer_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"time"
99

1010
sls "github.com/aliyun/aliyun-log-go-sdk"
11+
"github.com/go-kit/kit/log"
12+
"github.com/go-kit/kit/log/level"
1113
"github.com/gogo/protobuf/proto"
1214
)
1315

@@ -48,6 +50,8 @@ func printShardId(shardId string) string {
4850

4951
func TestProducer(t *testing.T) {
5052
config := GetDefaultProducerConfig()
53+
config.Logger = log.NewLogfmtLogger(os.Stdout)
54+
config.Logger = level.NewFilter(config.Logger, level.AllowDebug())
5155
config.Endpoint = os.Getenv("LOG_TEST_ENDPOINT")
5256
provider := sls.NewStaticCredentialsProvider(os.Getenv("LOG_TEST_ACCESS_KEY_ID"), os.Getenv("LOG_TEST_ACCESS_KEY_SECRET"), "")
5357
config.CredentialsProvider = provider

0 commit comments

Comments
 (0)