Skip to content

Commit b63d998

Browse files
author
brucexc
committed
feat:rss Etag
1 parent dc80336 commit b63d998

5 files changed

Lines changed: 56 additions & 36 deletions

File tree

internal/service/hub/handler/dsl/distributor/distributor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (d *Distributor) DistributeRSSData(ctx context.Context, path, query string)
120120

121121
// generateRSSHubPath builds the path for RSSHub requests.
122122
func (d *Distributor) generateRSSHubPath(param, query string, nodes []*model.NodeEndpointCache) (map[common.Address]model.RequestMeta, error) {
123-
endpointMap, err := d.simpleRouter.BuildPath(http.MethodGet, fmt.Sprintf("/rss/%s?%s", param, query), nil, nodes, nil)
123+
endpointMap, err := d.simpleRouter.BuildPath(http.MethodGet, fmt.Sprintf("/rss/%s?%s", param, query), nil, nodes, nil, false)
124124
if err != nil {
125125
return nil, fmt.Errorf("build path: %w", err)
126126
}
@@ -130,7 +130,7 @@ func (d *Distributor) generateRSSHubPath(param, query string, nodes []*model.Nod
130130

131131
// generateRSSPath builds the path for RSS requests.
132132
func (d *Distributor) generateRSSPath(param, query string, nodes []*model.NodeEndpointCache) (map[common.Address]model.RequestMeta, error) {
133-
endpointMap, err := d.simpleRouter.BuildPath(http.MethodGet, fmt.Sprintf("/%s?%s", param, query), nil, nodes, nil)
133+
endpointMap, err := d.simpleRouter.BuildPath(http.MethodGet, fmt.Sprintf("/%s?%s", param, query), nil, nodes, nil, true)
134134
if err != nil {
135135
return nil, fmt.Errorf("build path: %w", err)
136136
}
@@ -140,7 +140,7 @@ func (d *Distributor) generateRSSPath(param, query string, nodes []*model.NodeEn
140140

141141
// generateAIPath builds the path for AI requests.
142142
func (d *Distributor) generateAIPath(param, query string, nodes []*model.NodeEndpointCache) (map[common.Address]model.RequestMeta, error) {
143-
endpointMap, err := d.simpleRouter.BuildPath(http.MethodGet, fmt.Sprintf("/agentdata/%s?%s", param, query), nil, nodes, nil)
143+
endpointMap, err := d.simpleRouter.BuildPath(http.MethodGet, fmt.Sprintf("/agentdata/%s?%s", param, query), nil, nodes, nil, false)
144144
//endpointMap, err := d.simpleRouter.BuildPath(http.MethodGet, fmt.Sprintf("/agentdata/%s%s", param, query), nil, nodes, nil)
145145
if err != nil {
146146
return nil, fmt.Errorf("build path: %w", err)
@@ -317,7 +317,7 @@ func (d *Distributor) generatePath(requestType, component string, request interf
317317
return nil, fmt.Errorf("invalid request type: %s", requestType)
318318
}
319319

320-
endpointMap, err := d.simpleRouter.BuildPath(method, path, params, nodes, body)
320+
endpointMap, err := d.simpleRouter.BuildPath(method, path, params, nodes, body, false)
321321
if err != nil {
322322
return nil, fmt.Errorf("build path: %w", err)
323323
}

internal/service/hub/handler/dsl/enforcer/handle_response.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,12 @@ func handleFullResponses(responses []*model.DataResponse, errResponseCount int)
568568

569569
// updateRequestBasedOnComparison updates the requests based on the comparison of the data.
570570
func updateRequestBasedOnComparison(responses []*model.DataResponse) {
571-
if isResponseIdentical(responses[0].Data, responses[1].Data) {
571+
// check if the responses are identical
572+
isMatch := responses[0].IsRssNode && responses[0].Etag == responses[1].Etag ||
573+
!responses[0].IsRssNode && isResponseIdentical(responses[0].Data, responses[1].Data)
574+
575+
// assign valid points
576+
if isMatch {
572577
responses[0].ValidPoint = 2 * validPointUnit
573578
responses[1].ValidPoint = validPointUnit
574579
} else {
@@ -590,7 +595,14 @@ func markErrorResponse(responses ...*model.DataResponse) {
590595
// compareAndAssignPoints compares the data for identity and assigns corresponding points.
591596
func compareAndAssignPoints(responses []*model.DataResponse, errResponseCount int) {
592597
d0, d1, d2 := responses[0].Data, responses[1].Data, responses[2].Data
593-
diff01, diff02, diff12 := isResponseIdentical(d0, d1), isResponseIdentical(d0, d2), isResponseIdentical(d1, d2)
598+
599+
var diff01, diff02, diff12 bool
600+
601+
if responses[0].IsRssNode || responses[1].IsRssNode || responses[2].IsRssNode {
602+
diff01, diff02, diff12 = responses[0].Etag == responses[1].Etag, responses[0].Etag == responses[2].Etag, responses[1].Etag == responses[2].Etag
603+
} else {
604+
diff01, diff02, diff12 = isResponseIdentical(d0, d1), isResponseIdentical(d0, d2), isResponseIdentical(d1, d2)
605+
}
594606

595607
switch errResponseCount {
596608
// responses contain 2 errors

internal/service/hub/handler/dsl/model/model.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,11 @@ type NodeEndpointCache struct {
9292
// DataResponse represents the response returned by a Node.
9393
// It is also used to store the verification result.
9494
type DataResponse struct {
95-
Address common.Address
96-
Endpoint string
97-
Data []byte
95+
Address common.Address
96+
Endpoint string
97+
IsRssNode bool
98+
Etag string
99+
Data []byte
98100
// A valid response must be non-null and non-error
99101
Valid bool
100102
Err error
@@ -109,6 +111,7 @@ type RequestMeta struct {
109111
Endpoint string
110112
AccessToken string
111113
Body []byte
114+
IsRssNode bool
112115
}
113116

114117
type ErrResponse struct {

internal/service/hub/handler/dsl/router/router.go

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type SimpleRouter struct {
2929
httpClient httputil.Client
3030
}
3131

32-
func (r *SimpleRouter) BuildPath(method, path string, query url.Values, nodes []*model.NodeEndpointCache, body []byte) (map[common.Address]model.RequestMeta, error) {
32+
func (r *SimpleRouter) BuildPath(method, path string, query url.Values, nodes []*model.NodeEndpointCache, body []byte, isRssNode bool) (map[common.Address]model.RequestMeta, error) {
3333
if method == http.MethodGet && query != nil {
3434
path = fmt.Sprintf("%s?%s", path, query.Encode())
3535
}
@@ -53,6 +53,7 @@ func (r *SimpleRouter) BuildPath(method, path string, query url.Values, nodes []
5353
Endpoint: fullURL,
5454
AccessToken: node.AccessToken,
5555
Body: body,
56+
IsRssNode: isRssNode,
5657
}
5758
}
5859

@@ -113,9 +114,7 @@ func (r *SimpleRouter) distribute(ctx context.Context, nodeMap map[common.Addres
113114

114115
response := &model.DataResponse{Address: address, Endpoint: requestMeta.Endpoint}
115116
// Fetch the data from the Node.
116-
body, _, err := r.httpClient.FetchWithMethod(ctx, requestMeta.Method, requestMeta.Endpoint, requestMeta.AccessToken, bytes.NewReader(requestMeta.Body))
117-
118-
// todo deal headers
117+
body, headers, err := r.httpClient.FetchWithMethod(ctx, requestMeta.Method, requestMeta.Endpoint, requestMeta.AccessToken, bytes.NewReader(requestMeta.Body))
119118

120119
if err != nil {
121120
zap.L().Error("failed to fetch request", zap.String("node", address.String()), zap.Error(err))
@@ -132,35 +131,41 @@ func (r *SimpleRouter) distribute(ctx context.Context, nodeMap map[common.Addres
132131

133132
response.Err = readErr
134133
} else {
135-
var v interface{}
136-
err = json.Unmarshal(data, &v)
137-
138-
if err != nil {
139-
zap.L().Error("failed to unmarshal response body", zap.String("node", address.String()), zap.Error(err))
140-
141-
response.Err = fmt.Errorf("invalid data")
142-
}
143-
144-
if _, ok := v.([]interface{}); ok {
145-
zap.L().Info("response is an array", zap.String("node", address.String()))
146-
134+
if requestMeta.IsRssNode {
147135
response.Data = data
136+
response.IsRssNode = requestMeta.IsRssNode
137+
response.Etag = headers.Get("Etag")
148138
} else {
149-
activity := &model.ActivityResponse{}
150-
activities := &model.ActivitiesResponse{}
139+
var v interface{}
140+
err = json.Unmarshal(data, &v)
151141

152-
// Check if the Node's data is valid.
153-
if !validateData(data, activity) && !validateData(data, activities) {
154-
zap.L().Error("failed to parse response", zap.String("node", address.String()))
142+
if err != nil {
143+
zap.L().Error("failed to unmarshal response body", zap.String("node", address.String()), zap.Error(err))
155144

156145
response.Err = fmt.Errorf("invalid data")
157-
} else {
158-
// If the data is non-null, set the result as valid.
159-
if activity.Data != nil || activities.Data != nil {
160-
response.Valid = true
161-
}
146+
}
147+
148+
if _, ok := v.([]interface{}); ok {
149+
zap.L().Info("response is an array", zap.String("node", address.String()))
162150

163151
response.Data = data
152+
} else {
153+
activity := &model.ActivityResponse{}
154+
activities := &model.ActivitiesResponse{}
155+
156+
// Check if the Node's data is valid.
157+
if !validateData(data, activity) && !validateData(data, activities) {
158+
zap.L().Error("failed to parse response", zap.String("node", address.String()))
159+
160+
response.Err = fmt.Errorf("invalid data")
161+
} else {
162+
// If the data is non-null, set the result as valid.
163+
if activity.Data != nil || activities.Data != nil {
164+
response.Valid = true
165+
}
166+
167+
response.Data = data
168+
}
164169
}
165170
}
166171
}

internal/service/hub/handler/dsl/rsshub.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ func (d *DSL) GetRSSHub(c echo.Context) error {
2525
return errorx.InternalError(c)
2626
}
2727

28-
return c.JSONBlob(http.StatusOK, data)
28+
return c.XMLBlob(http.StatusOK, data)
2929
}

0 commit comments

Comments
 (0)