-
Notifications
You must be signed in to change notification settings - Fork 389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add flusher plugin for datahub & odps. (#2144) #2151
base: main
Are you sure you want to change the base?
Conversation
Change-Id: I4af3daf5a4b63006c08f8431855b0e0348299798
Change-Id: I49dd13b25472e911f8920e2323d9a19333264d8a
Change-Id: I90e59b0397711ee711c972f9884a31af9ca98ddb
Change-Id: Iaa57c25ce273506567a0b9a14925864c29953be6
Change-Id: Ia8338c73fb48c1dc7b325efe1a126215fbcaf4fd
Change-Id: I5fcf0ed41b6aa5113305e158e44488dbe155eefe
Change-Id: I5fcf0ed41b6aa5113305e158e44488dbe155eefe
Change-Id: If172c4b4c5431088f7a26fc6824a4b561dae7626
Change-Id: Id72f925374e6ffcbf189f8b37f85b077f93122b2
Change-Id: I9234b65194bae32231b5a6e4cd317c554bbd9e20
Change-Id: I140153d7ff5d328a58ed75cb7c238adfd3f1c59a
Change-Id: I00c037a637c3fb0df1f036833af56a315aaa7b93
Change-Id: I64932a9b5bb8dc8f0c17077628977c3c0f2a8e51
var res *datahub.PutRecordsByShardResult | ||
for retry := 0; retry < maxRetryCount; retry++ { | ||
start := time.Now() | ||
shardID, res, err = pi.DoSend(records, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
外层有重试maxRetryCount次,DoSend内部失败的时候又在递归。到底哪个是主要的?
另外,DoSend递归的合理性?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
外层的重试考虑的是可能由于网络抖动或者服务端短暂的异常,是client一般都会有的公共重试部分。
内部递归只有在partition变化的场景下才会触发,这种场景下刷新一下partition并内部递归一般是能解决的,不应该走到外面的重试逻辑,并且这个最多递归一次,是合理的。
} | ||
|
||
func (rb *RecordBuilderImpl) addLevelExtraInfo(logGroup *protocol.LogGroup, log *protocol.Log, record datahub.IRecord, level int) { | ||
if level == 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
level 是什么含义?什么时候是1,什么时候是2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个原本是设置了参数,用户可以配置extraInfo的属性等级,但是后来想着先让用户以尽可能简单的方式能够成功flush,就把这个参数写死而不加到配置参数里了。
但是我觉的这个逻辑可以先保留了,给后面灵活性留个口子。
func (rb *RecordBuilderImpl) addLevelExtraInfo(logGroup *protocol.LogGroup, log *protocol.Log, record datahub.IRecord, level int) { | ||
if level == 1 { | ||
record.SetAttribute(HostIPKey, rb.hostIP) | ||
if val, ok := findLogTag(logGroup.LogTags, LogtailPath); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里是在查tag,已经有通用的机制了Convert convertConfig,不需要在自定义一份实现。
} | ||
|
||
func (ts *TunnelSenderImpl) Flush() error { | ||
partitions, writers := ts.sessionCache.GetAllWriter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
相对于datahub没有任何的异常处理?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
94行这里是本地的操作,只是构建Writer,下面真正写数据的逻辑里是有异常处理的。
config of flusher_datahub
flushers:
AccessKeyId:
AccessKeySecret:
Endpoint:
ProjectName:
TopicName:
config of flusher_odps
flushers:
AccessKeyId:
AccessKeySecret:
Endpoint:
PartitionConfig:
ProjectName:
TableName: