Skip to content

Commit 2086ddb

Browse files
authored
💥 Use ExternalStorageReference proto for payload references (#2311)
1 parent 9d32461 commit 2086ddb

12 files changed

Lines changed: 575 additions & 155 deletions

‎converter/payload_handler_test.go‎

Lines changed: 25 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,14 @@ type appendCodec struct {
3131
func (c *appendCodec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) {
3232
result := make([]*commonpb.Payload, len(payloads))
3333
for i, p := range payloads {
34-
enc := string(p.GetMetadata()[converter.MetadataEncoding]) + c.encodingSuffix
35-
data := append(append([]byte(nil), p.GetData()...), c.marker)
34+
meta := make(map[string][]byte, len(p.GetMetadata()))
35+
for k, v := range p.GetMetadata() {
36+
meta[k] = v
37+
}
38+
meta[converter.MetadataEncoding] = []byte(string(p.GetMetadata()[converter.MetadataEncoding]) + c.encodingSuffix)
3639
result[i] = &commonpb.Payload{
37-
Metadata: map[string][]byte{converter.MetadataEncoding: []byte(enc)},
38-
Data: data,
40+
Metadata: meta,
41+
Data: append(append([]byte(nil), p.GetData()...), c.marker),
3942
}
4043
}
4144
return result, nil
@@ -52,8 +55,13 @@ func (c *appendCodec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload,
5255
if len(data) == 0 || data[len(data)-1] != c.marker {
5356
return nil, fmt.Errorf("appendCodec.Decode: expected trailing marker byte %d", c.marker)
5457
}
58+
meta := make(map[string][]byte, len(p.GetMetadata()))
59+
for k, v := range p.GetMetadata() {
60+
meta[k] = v
61+
}
62+
meta[converter.MetadataEncoding] = []byte(strings.TrimSuffix(enc, c.encodingSuffix))
5563
result[i] = &commonpb.Payload{
56-
Metadata: map[string][]byte{converter.MetadataEncoding: []byte(strings.TrimSuffix(enc, c.encodingSuffix))},
64+
Metadata: meta,
5765
Data: data[:len(data)-1],
5866
}
5967
}
@@ -234,52 +242,25 @@ func TestDecode_AppliesPostThenPreCodecs(t *testing.T) {
234242
require.Equal(t, originalEncoding, encoding(result[0]))
235243
}
236244

237-
func TestDecode_StorageRefReturnedAsIs(t *testing.T) {
238-
// appendCodec.Decode would error on a storage reference because it requires
239-
// the ".pre" encoding suffix — confirms pre-storage codecs are skipped.
240-
preCodec := &appendCodec{encodingSuffix: ".pre", marker: 'P'}
241-
postCodec := &appendCodec{encodingSuffix: ".post", marker: 'O'}
242-
h, err := converter.NewPayloadHTTPHandler(converter.PayloadHTTPHandlerOptions{
243-
PreStorageCodecs: []converter.PayloadCodec{preCodec},
244-
PostStorageCodecs: []converter.PayloadCodec{postCodec},
245-
})
245+
func TestDecode_NoDrivers_StorageRefFails(t *testing.T) {
246+
h, err := converter.NewPayloadHTTPHandler(converter.PayloadHTTPHandlerOptions{})
246247
require.NoError(t, err)
247248

248249
ref := makeStorageRef(t, "drv", "k1")
249-
postEncodedRef, err := postCodec.Encode([]*commonpb.Payload{ref})
250-
require.NoError(t, err)
251-
252-
rr := servePost(t, h, "/decode", createRequest(t, postEncodedRef[0]))
253-
result := getPayloads(t, rr)
254-
require.Len(t, result, 1)
255-
require.True(t, extstore.IsStorageReference(result[0]))
250+
rr := servePost(t, h, "/decode", createRequest(t, ref))
251+
require.Equal(t, http.StatusBadRequest, rr.Code)
252+
require.Contains(t, rr.Body.String(), "no storage driver is configured")
256253
}
257254

258-
func TestDecode_MixedBatch(t *testing.T) {
259-
preCodec := &appendCodec{encodingSuffix: ".pre", marker: 'P'}
260-
postCodec := &appendCodec{encodingSuffix: ".post", marker: 'O'}
261-
h, err := converter.NewPayloadHTTPHandler(converter.PayloadHTTPHandlerOptions{
262-
PreStorageCodecs: []converter.PayloadCodec{preCodec},
263-
PostStorageCodecs: []converter.PayloadCodec{postCodec},
264-
})
255+
func TestDecode_NoDrivers_MixedBatch_StorageRefFails(t *testing.T) {
256+
h, err := converter.NewPayloadHTTPHandler(converter.PayloadHTTPHandlerOptions{})
265257
require.NoError(t, err)
266258

267259
regular := makePayload(t, "data")
268-
originalEncoding := encoding(regular)
269-
preEncoded, err := preCodec.Encode([]*commonpb.Payload{regular})
270-
require.NoError(t, err)
271-
postEncoded, err := postCodec.Encode(preEncoded)
272-
require.NoError(t, err)
273-
274260
ref := makeStorageRef(t, "drv", "k1")
275-
postEncodedRef, err := postCodec.Encode([]*commonpb.Payload{ref})
276-
require.NoError(t, err)
277-
278-
rr := servePost(t, h, "/decode", createRequest(t, postEncoded[0], postEncodedRef[0]))
279-
result := getPayloads(t, rr)
280-
require.Len(t, result, 2)
281-
require.Equal(t, originalEncoding, encoding(result[0]))
282-
require.True(t, extstore.IsStorageReference(result[1]))
261+
rr := servePost(t, h, "/decode", createRequest(t, regular, ref))
262+
require.Equal(t, http.StatusBadRequest, rr.Code)
263+
require.Contains(t, rr.Body.String(), "no storage driver is configured")
283264
}
284265

285266
func TestDecode_RoundTrip(t *testing.T) {
@@ -507,9 +488,8 @@ func TestDownload_NoDrivers(t *testing.T) {
507488

508489
ref := makeStorageRef(t, "drv", "k1")
509490
rr := servePost(t, h, "/download", createRequest(t, ref))
510-
result := getPayloads(t, rr)
511-
require.Len(t, result, 1)
512-
require.True(t, extstore.IsStorageReference(result[0]))
491+
require.Equal(t, http.StatusBadRequest, rr.Code)
492+
require.Contains(t, rr.Body.String(), "no storage driver is configured")
513493
}
514494

515495
func TestDownload_UnknownDriver(t *testing.T) {

0 commit comments

Comments
 (0)