add ai_content_security filter#919
Conversation
a77bbdd to
1339c1f
Compare
|
What a big PR! Please excuse me for taking a long time to review. BTW, could you make the lint pass? |
ebfbfeb to
0e2ffe0
Compare
Passed. |
| if conf.ModerationTimeout > 0 { | ||
| conf.moderationTimeout = time.Duration(conf.ModerationTimeout) * time.Millisecond | ||
| } else { | ||
| conf.moderationTimeout = time.Duration(3 * int(time.Second)) |
There was a problem hiding this comment.
| conf.moderationTimeout = time.Duration(3 * int(time.Second)) | |
| conf.moderationTimeout = 3 * time.Second |
is enough. No need to add extra conversion.
|
|
||
| func (conf *config) Init(cb api.ConfigCallbackHandler) error { | ||
| if conf.ModerationTimeout > 0 { | ||
| conf.moderationTimeout = time.Duration(conf.ModerationTimeout) * time.Millisecond |
There was a problem hiding this comment.
It is more go-style to use a string with unit like 30ms and parsing it with ParseDuration.
There was a problem hiding this comment.
Modified, and I plan to use regular expressions to validate the time format
string moderation_timeout = 1 [(validate.rules).string.pattern = "^\\d+(ms|s)$"];
| overlapCountDelayed: true, | ||
| initialCapacity: 2049, | ||
| shrinkFactor: 2, | ||
| resizeFactor: 1.3, |
There was a problem hiding this comment.
Why choose 2049 over 2048, and 1.3 over 2?
There was a problem hiding this comment.
The current approach doesn’t rely on these parameters.
| acc.accumulate() | ||
| acc.check(3, []string{"事件一然后是事件", "是事件二三"}) | ||
| }) | ||
| } |
There was a problem hiding this comment.
- Do we have a test for parsing non-utf8 chars?
- Could you add a test to fuzz the content buffer? Like generating random chars and checking the result many times.
| if !ok { | ||
| return false | ||
| } | ||
| return strings.HasPrefix(contentType, "text/event-stream") |
There was a problem hiding this comment.
"text/event-streamwhatever" could also pass this check?
| buffer := NewContentBuffer(WithMaxChars(10), WithOverlapCharNum(0)) | ||
| acc := newResultAccumulator(t, buffer) | ||
|
|
||
| buffer.Write([]byte("事件A1")) |
There was a problem hiding this comment.
It would be better to use English as the test input so more people can understand it
| assert.Equal(t, "line 1\nline 2", parsed.Data) | ||
| } | ||
|
|
||
| // TestBufferManagement_PruneAndShrink 旨在测试缓冲区的内部管理逻辑, |
There was a problem hiding this comment.
Let's use English as the comment
| n = len(p.eventBoundaries) | ||
| } | ||
|
|
||
| if n == 0 { |
| remainingSize := len(c.buffer) - c.currStart | ||
| copy(c.buffer, c.buffer[c.currStart:]) | ||
| c.buffer = c.buffer[:remainingSize] | ||
| c.boundaries = c.boundaries[:0] |
There was a problem hiding this comment.
Do we need to deal with boundaries's capacity?
There was a problem hiding this comment.
I ran some benchmarks under high load. boundaries only takes up about 1% of the memory, so I don’t think we need to worry about its capacity.
spacewander
left a comment
There was a problem hiding this comment.
It just comes to my mind that some providers will generate multiple contents in a response. Like reasoning_content in deepseek: https://api-docs.deepseek.com/guides/reasoning_model.
Feel free to add a note in doc and solve it in another PRs.
Added in the doc |
acd027f to
42d0ca4
Compare
|
It seems there is an issue with the e2e testing |
The dataplane failed to ready. Would you check if it is caused by the change? |
1307731 to
656e9dd
Compare
|
Ready for review. |
|
|
||
| type Extractor interface { | ||
|
|
||
| // SetData parse the raw data and prepare the internal state for subsequent extraction calls. |
There was a problem hiding this comment.
| // SetData parse the raw data and prepare the internal state for subsequent extraction calls. | |
| // SetData parses the raw data and prepares the internal state for subsequent extraction calls. |
| } | ||
|
|
||
| func (f *filter) EncodeResponse(headers api.ResponseHeaderMap, data api.BufferInstance, trailers api.ResponseTrailerMap) api.ResultAction { | ||
| return f.streamDataHandler(data, true) |
There was a problem hiding this comment.
I think we should rename streamDataHandler to sseDataHandler, since it will be used to parse buffered SSE messages.
| ) | ||
|
|
||
| func TestRequestModeration_live(t *testing.T) { | ||
| t.Skip("Skipping live test to avoid dependency on external services") |
There was a problem hiding this comment.
Why do we need the live test here? We already have the integration test.
There was a problem hiding this comment.
I think this can help users debug the locally built reasoning API.
There was a problem hiding this comment.
Hasn't the integration test already done the same thing? And the live test is not run in the CI at all.
There was a problem hiding this comment.
That's true, i will remove it.
| - AUDIT_PORT=10902 | ||
| restart: unless-stopped | ||
| volumes: | ||
| - ./ai_content_security/app.py:/app/app.py # 挂载本地文件到容器 |
There was a problem hiding this comment.
| - ./ai_content_security/app.py:/app/app.py # 挂载本地文件到容器 | |
| - ./ai_content_security/app.py:/app/app.py # Mount local files into the container |
f9b1a81 to
8fcdd15
Compare
…newline variations,optimized newline handling logic
This commit introduces a new filter plugin,
ai_content_security, designed to perform content moderation on both request and response bodies in HTTP streams.Key features include:
text/event-stream) and non-streamed content.moderatorandextractorcomponents.ContentBuffer.The plugin ensures that all moderated content is validated before being forwarded to the client or upstream service.
Unit and integration tests are included to ensure correctness and reliability.
Note: This is the MVP implementation.