Skip to content

Commit a29448e

Browse files
alxtkr77pavius
authored andcommitted
Add ngx binary response handling for GetItems (#21)
1 parent 9441f7c commit a29448e

16 files changed

+463
-41
lines changed

go.mod

100644100755
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ require (
1515
github.com/valyala/fasthttp v1.2.0
1616
go.uber.org/atomic v1.3.2 // indirect
1717
go.uber.org/multierr v1.1.0 // indirect
18+
zombiezen.com/go/capnproto2 v2.17.0+incompatible
1819
)

go.sum

100644100755
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
github.com/capnproto/go-capnproto2 v2.17.0+incompatible h1:vPbYlc2CBNdjzOMzHfwo7TbFNRBDaRKitlWiRs1riTw=
2+
github.com/capnproto/go-capnproto2 v2.17.0+incompatible/go.mod h1:T3/pxeK0qevFRlAASYZe90Ozs+JmlQTNY+VLc6+lJHw=
13
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
24
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
35
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -36,6 +38,9 @@ go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
3638
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
3739
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
3840
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
41+
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI=
3942
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
4043
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
4144
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
45+
zombiezen.com/go/capnproto2 v2.17.0+incompatible h1:sIoKPFGNlM38Qh+PBLa9Wzg1j99oInS/Qlk+5N/CHa4=
46+
zombiezen.com/go/capnproto2 v2.17.0+incompatible/go.mod h1:XO5Pr2SbXgqZwn0m0Ru54QBqpOf4K5AYBO+8LAOBQEQ=

pkg/dataplane/http/context.go

100644100755
Lines changed: 219 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ import (
1616
"strings"
1717
"sync/atomic"
1818
"time"
19+
"io"
1920

2021
"github.com/v3io/v3io-go/pkg/dataplane"
2122
"github.com/v3io/v3io-go/pkg/errors"
22-
23+
"github.com/v3io/v3io-go/pkg/dataplane/schemas/node/common"
2324
"github.com/nuclio/errors"
2425
"github.com/nuclio/logger"
2526
"github.com/valyala/fasthttp"
27+
"zombiezen.com/go/capnproto2"
2628
)
2729

2830
// TODO: Request should have a global pool
@@ -206,6 +208,11 @@ func (c *context) GetItem(getItemInput *v3io.GetItemInput,
206208
return c.sendRequestToWorker(getItemInput, context, responseChan)
207209
}
208210

211+
type attributeValuesSection struct {
212+
accumulatedPreviousSectionsLength int
213+
data node_common_capnp.VnObjectAttributeValuePtr_List
214+
}
215+
209216
// GetItemSync
210217
func (c *context) GetItemSync(getItemInput *v3io.GetItemInput) (*v3io.Response, error) {
211218

@@ -256,6 +263,7 @@ func (c *context) GetItems(getItemsInput *v3io.GetItemsInput,
256263
return c.sendRequestToWorker(getItemsInput, context, responseChan)
257264
}
258265

266+
259267
// GetItemSync
260268
func (c *context) GetItemsSync(getItemsInput *v3io.GetItemsInput) (*v3io.Response, error) {
261269

@@ -308,55 +316,23 @@ func (c *context) GetItemsSync(getItemsInput *v3io.GetItemsInput) (*v3io.Respons
308316
"PUT",
309317
getItemsInput.Path,
310318
"",
311-
getItemsHeaders,
319+
getItemsHeadersCapnp,
312320
marshalledBody,
313321
false)
314322

315323
if err != nil {
316324
return nil, err
317325
}
318326

319-
c.logger.DebugWithCtx(getItemsInput.Ctx, "Body", "body", string(response.Body()))
320-
321-
getItemsResponse := struct {
322-
Items []map[string]map[string]interface{}
323-
NextMarker string
324-
LastItemIncluded string
325-
}{}
326-
327-
// unmarshal the body into an ad hoc structure
328-
err = json.Unmarshal(response.Body(), &getItemsResponse)
329-
if err != nil {
330-
return nil, err
331-
}
332-
333-
//validate getItems response to avoid infinite loop
334-
if getItemsResponse.LastItemIncluded != "TRUE" && (getItemsResponse.NextMarker == "" || getItemsResponse.NextMarker == getItemsInput.Marker) {
335-
errMsg := fmt.Sprintf("Invalid getItems response: lastItemIncluded=false and nextMarker='%s', "+
336-
"startMarker='%s', probably due to object size bigger than 2M. Query is: %+v", getItemsResponse.NextMarker, getItemsInput.Marker, getItemsInput)
337-
c.logger.Warn(errMsg)
338-
}
339-
340-
getItemsOutput := v3io.GetItemsOutput{
341-
NextMarker: getItemsResponse.NextMarker,
342-
Last: getItemsResponse.LastItemIncluded == "TRUE",
343-
}
344-
345-
// iterate through the items and decode them
346-
for _, typedItem := range getItemsResponse.Items {
347-
348-
item, err := c.decodeTypedAttributes(typedItem)
349-
if err != nil {
350-
return nil, err
351-
}
327+
contentType := string(response.HeaderPeek("Content-Type"))
352328

353-
getItemsOutput.Items = append(getItemsOutput.Items, item)
329+
if contentType != "application/octet-capnp" {
330+
c.logger.DebugWithCtx(getItemsInput.Ctx, "Body", "body", string(response.Body()))
331+
response.Output, err = c.getItemsParseJSONResponse(response, getItemsInput)
332+
} else {
333+
response.Output, err = c.getItemsParseCAPNPResponse(response)
354334
}
355-
356-
// attach the output to the response
357-
response.Output = &getItemsOutput
358-
359-
return response, nil
335+
return response, err
360336
}
361337

362338
// PutItem
@@ -1141,3 +1117,205 @@ func (c *context) workerEntry(workerIndex int) {
11411117
request.ResponseChan <- &request.RequestResponse.Response
11421118
}
11431119
}
1120+
1121+
func readAllCapnpMessages(reader io.Reader) ([]*capnp.Message){
1122+
var capnpMessages []*capnp.Message
1123+
for {
1124+
msg, err := capnp.NewDecoder(reader).Decode()
1125+
if err != nil {
1126+
break
1127+
}
1128+
capnpMessages = append(capnpMessages, msg)
1129+
}
1130+
return capnpMessages
1131+
}
1132+
1133+
func getSectionAndIndex(values []attributeValuesSection, idx int) (section int, resIdx int){
1134+
if len(values) == 1 {
1135+
return 0, idx
1136+
}
1137+
for i := 1; i<len(values); i++ {
1138+
if values[i].accumulatedPreviousSectionsLength > idx {
1139+
return i,idx - values[i-1].accumulatedPreviousSectionsLength
1140+
}
1141+
}
1142+
return 0, idx
1143+
}
1144+
1145+
func decodeCapnpAttributes(keyValues node_common_capnp.VnObjectItemsGetMappedKeyValuePair_List, values []attributeValuesSection, attributeNames []string) (map[string]interface{}, error) {
1146+
attributes := map[string]interface{}{}
1147+
for j := 0; j < keyValues.Len(); j++ {
1148+
attrPtr := keyValues.At(j)
1149+
valIdx := int(attrPtr.ValueMapIndex())
1150+
attrIdx := int(attrPtr.KeyMapIndex())
1151+
1152+
attributeName := attributeNames[attrIdx]
1153+
sectIdx, valIdx := getSectionAndIndex(values, valIdx)
1154+
value, err := values[sectIdx].data.At(valIdx).Value()
1155+
if err != nil {
1156+
return attributes, errors.Wrapf(err,"values[%d].data.At(%d).Value",sectIdx, valIdx )
1157+
}
1158+
switch value.Which() {
1159+
case node_common_capnp.ExtAttrValue_Which_qword:
1160+
attributes[attributeName] = int(value.Qword())
1161+
case node_common_capnp.ExtAttrValue_Which_uqword:
1162+
attributes[attributeName] = int(value.Uqword())
1163+
case node_common_capnp.ExtAttrValue_Which_blob:
1164+
attributes[attributeName], err = value.Blob()
1165+
case node_common_capnp.ExtAttrValue_Which_str:
1166+
attributes[attributeName], err = value.Str()
1167+
case node_common_capnp.ExtAttrValue_Which_dfloat:
1168+
attributes[attributeName] = value.Dfloat()
1169+
case node_common_capnp.ExtAttrValue_Which_boolean:
1170+
attributes[attributeName] = value.Boolean()
1171+
case node_common_capnp.ExtAttrValue_Which_notExists:
1172+
{}
1173+
default:
1174+
return attributes, errors.Errorf("getItemsCapnp: %s type for %s attribute is not expected", value.Which().String(), attributeName)
1175+
}
1176+
}
1177+
return attributes, nil
1178+
}
1179+
1180+
func (c *context) getItemsParseJSONResponse(response *v3io.Response, getItemsInput *v3io.GetItemsInput) (*v3io.GetItemsOutput, error){
1181+
1182+
getItemsResponse := struct {
1183+
Items []map[string]map[string]interface{}
1184+
NextMarker string
1185+
LastItemIncluded string
1186+
}{}
1187+
1188+
// unmarshal the body into an ad hoc structure
1189+
err := json.Unmarshal(response.Body(), &getItemsResponse)
1190+
if err != nil {
1191+
return nil, err
1192+
}
1193+
1194+
//validate getItems response to avoid infinite loop
1195+
if getItemsResponse.LastItemIncluded != "TRUE" && (getItemsResponse.NextMarker == "" || getItemsResponse.NextMarker == getItemsInput.Marker) {
1196+
errMsg := fmt.Sprintf("Invalid getItems response: lastItemIncluded=false and nextMarker='%s', "+
1197+
"startMarker='%s', probably due to object size bigger than 2M. Query is: %+v", getItemsResponse.NextMarker, getItemsInput.Marker, getItemsInput)
1198+
c.logger.Warn(errMsg)
1199+
}
1200+
1201+
getItemsOutput := v3io.GetItemsOutput{
1202+
NextMarker: getItemsResponse.NextMarker,
1203+
Last: getItemsResponse.LastItemIncluded == "TRUE",
1204+
}
1205+
1206+
// iterate through the items and decode them
1207+
for _, typedItem := range getItemsResponse.Items {
1208+
1209+
item, err := c.decodeTypedAttributes(typedItem)
1210+
if err != nil {
1211+
return nil, err
1212+
}
1213+
1214+
getItemsOutput.Items = append(getItemsOutput.Items, item)
1215+
}
1216+
// attach the output to the response
1217+
return &getItemsOutput, nil
1218+
}
1219+
1220+
func (c *context) getItemsParseCAPNPResponse(response *v3io.Response) (*v3io.GetItemsOutput, error){
1221+
responseBodyReader := bytes.NewReader(response.Body())
1222+
capnpSections := readAllCapnpMessages(responseBodyReader )
1223+
if len(capnpSections) < 2 {
1224+
return nil, errors.Errorf("getItemsCapnp: Got only %v capnp sections. Expecting at least 2", len(capnpSections))
1225+
}
1226+
cookie := string(response.HeaderPeek("X-v3io-cookie"))
1227+
getItemsOutput := v3io.GetItemsOutput{
1228+
NextMarker: cookie,
1229+
Last: len(cookie) == 0,
1230+
}
1231+
if len(capnpSections) < 2 {
1232+
return nil, errors.Errorf("getItemsCapnp: Got only %v capnp sections. Expecting at least 2", len(capnpSections))
1233+
}
1234+
1235+
metadataPayload, err := node_common_capnp.ReadRootVnObjectItemsGetResponseMetadataPayload(capnpSections[len(capnpSections) - 1])
1236+
if err != nil {
1237+
return nil, errors.Wrap(err,"ReadRootVnObjectItemsGetResponseMetadataPayload")
1238+
}
1239+
//Keys
1240+
attributeMap, err := metadataPayload.KeyMap()
1241+
if err != nil {
1242+
return nil, errors.Wrap(err,"metadataPayload.KeyMap")
1243+
}
1244+
attributeMapNames,err := attributeMap.Names()
1245+
if err != nil {
1246+
return nil, errors.Wrap(err,"attributeMap.Names")
1247+
}
1248+
attributeNamesPtr,err := attributeMapNames.Arr()
1249+
if err != nil {
1250+
return nil, errors.Wrap(err,"attributeMapNames.Arr")
1251+
}
1252+
//Values
1253+
valueMap, err := metadataPayload.ValueMap()
1254+
if err != nil {
1255+
return nil, errors.Wrap(err,"metadataPayload.ValueMap")
1256+
}
1257+
values, err := valueMap.Values()
1258+
if err != nil {
1259+
return nil, errors.Wrap(err,"valueMap.Values")
1260+
}
1261+
1262+
// Items
1263+
items, err := metadataPayload.Items()
1264+
if err != nil {
1265+
return nil, errors.Wrap(err,"metadataPayload.Items")
1266+
}
1267+
valuesSections := make([]attributeValuesSection , len(capnpSections) - 1)
1268+
1269+
accLength := 0
1270+
//Additional data sections "in between"
1271+
for capnpSectionIndex:= 1;capnpSectionIndex < len(capnpSections) - 1; capnpSectionIndex++ {
1272+
data, err := node_common_capnp.ReadRootVnObjectAttributeValueMap(capnpSections[capnpSectionIndex])
1273+
if err != nil {
1274+
return nil, errors.Wrap(err,"node_common_capnp.ReadRootVnObjectAttributeValueMap")
1275+
}
1276+
dv, err := data.Values()
1277+
if err != nil {
1278+
return nil, errors.Wrap(err,"data.Values")
1279+
}
1280+
accLength = accLength + dv.Len()
1281+
valuesSections[capnpSectionIndex-1].data = dv
1282+
valuesSections[capnpSectionIndex-1].accumulatedPreviousSectionsLength = accLength
1283+
}
1284+
accLength = accLength + values.Len()
1285+
valuesSections[len(capnpSections) - 2].data = values
1286+
valuesSections[len(capnpSections) - 2].accumulatedPreviousSectionsLength = accLength
1287+
1288+
//Read in all the attribute names
1289+
attributeNamesNumber := attributeNamesPtr.Len()
1290+
attributeNames := make([]string,attributeNamesNumber)
1291+
for attributeIndex := 0; attributeIndex < attributeNamesNumber; attributeIndex++ {
1292+
attributeNames[attributeIndex], err = attributeNamesPtr.At(attributeIndex).Str()
1293+
if err != nil {
1294+
return nil, errors.Wrapf(err,"attributeNamesPtr.At(%d) size %d", attributeIndex, attributeNamesNumber)
1295+
}
1296+
}
1297+
1298+
// iterate through the items and decode them
1299+
for itemIndex := 0; itemIndex < items.Len(); itemIndex++ {
1300+
itemPtr := items.At(itemIndex)
1301+
item, err := itemPtr.Item()
1302+
if err != nil {
1303+
return nil, errors.Wrap(err,"itemPtr.Item")
1304+
}
1305+
name, err := item.Name()
1306+
if err != nil {
1307+
return nil, errors.Wrap(err,"item.Name")
1308+
}
1309+
itemAttributes, err := item.Attrs()
1310+
if err != nil {
1311+
return nil, errors.Wrap(err,"item.Attrs")
1312+
}
1313+
ditem, err := decodeCapnpAttributes(itemAttributes, valuesSections, attributeNames)
1314+
if err != nil {
1315+
return nil, errors.Wrap(err,"decodeCapnpAttributes")
1316+
}
1317+
ditem["__name"] = name
1318+
getItemsOutput.Items = append(getItemsOutput.Items, ditem)
1319+
}
1320+
return &getItemsOutput, nil
1321+
}

pkg/dataplane/http/headers.go

100644100755
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ var getItemsHeaders = map[string]string{
3636
"X-v3io-function": getItemsFunctionName,
3737
}
3838

39+
// headers for update item
40+
var getItemsHeadersCapnp = map[string]string{
41+
"Content-Type": "application/json",
42+
"X-v3io-response-content-type": "capnp",
43+
"X-v3io-function": getItemsFunctionName,
44+
}
45+
3946
// headers for create stream
4047
var createStreamHeaders = map[string]string{
4148
"Content-Type": "application/json",

pkg/dataplane/requestresponse.go

100644100755
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ func (r *Response) Body() []byte {
6868
return r.HTTPResponse.Body()
6969
}
7070

71+
func (r *Response) HeaderPeek(key string) []byte {
72+
return r.HTTPResponse.Header.Peek(key)
73+
}
74+
75+
7176
func (r *Response) Request() *Request {
7277
return &r.RequestResponse.Request
7378
}

pkg/dataplane/schemas/build

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/bin/bash
2+
# put the capnpc-go binary in /bin
3+
export PATH=$PATH:${GOPATH}/bin
4+
5+
capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/ExtAttrValue.capnp
6+
capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/StringWrapper.capnp
7+
capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/TimeSpec.capnp
8+
capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/VnObjectAttributeKeyMap.capnp
9+
capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/VnObjectAttributeValueMap.capnp
10+
capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/VnObjectItemsGetResponse.capnp
11+
capnp compile -I$GOPATH/src/github.com/iguazio/go-capnproto2/std -I$(pwd) -ogo node/common/VnObjectItemsScanCookie.capnp
12+

pkg/dataplane/schemas/clean

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/bin/bash
2+
3+
find . -name *.go -delete

pkg/dataplane/schemas/go.capnp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
@0xd12a1c51fedd6c88;
2+
3+
annotation package(file) :Text;
4+
# The Go package name for the generated file.
5+
6+
annotation import(file) :Text;
7+
# The Go import path that the generated file is accessible from.
8+
# Used to generate import statements and check if two types are in the
9+
# same package.
10+
11+
annotation doc(struct, field, enum) :Text;
12+
# Adds a doc comment to the generated code.
13+
14+
# annotation tag(enumerant) :Text;
15+
# Changes the string representation of the enum in the generated code.
16+
17+
# annotation notag(enumerant) :Void;
18+
# Removes the string representation of the enum in the generated code.
19+
20+
# annotation customtype(field) :Text;
21+
# OBSOLETE, not used by code generator.
22+
23+
# annotation name(struct, field, union, enum, enumerant, interface, method, param, annotation, const, group) :Text;
24+
# Used to rename the element in the generated code.
25+
26+
$package("capnp");
27+

0 commit comments

Comments
 (0)