Skip to content

Commit 40a7588

Browse files
author
Guy Baron
authored
set the correct Type and Content-Type headers on out going messages (#160)
* set the correct Type and Content-Type headers on out going messages * refactoring
1 parent 3cc55ae commit 40a7588

File tree

6 files changed

+46
-10
lines changed

6 files changed

+46
-10
lines changed

gbus/bus.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -655,12 +655,13 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply
655655
}
656656

657657
msg := amqp.Publishing{
658-
Body: buffer,
659-
ReplyTo: replyTo,
660-
MessageId: message.ID,
661-
CorrelationId: message.CorrelationID,
662-
ContentEncoding: b.Serializer.Name(),
663-
Headers: headers,
658+
Type: message.PayloadFQN,
659+
Body: buffer,
660+
ReplyTo: replyTo,
661+
MessageId: message.ID,
662+
CorrelationId: message.CorrelationID,
663+
ContentType: b.Serializer.Name(),
664+
Headers: headers,
664665
}
665666
span.LogFields(message.GetTraceLog()...)
666667

gbus/policy/generic.go

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package policy
2+
3+
import "github.com/streadway/amqp"
4+
5+
type Generic struct {
6+
Funk func(publishing *amqp.Publishing)
7+
}
8+
9+
func (g *Generic) Apply(publishing *amqp.Publishing) {
10+
g.Funk(publishing)
11+
}

gbus/serialization/avro.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func NewAvroSerializer(schemaRegistryUrls ...string) *Avro {
6262

6363
//Name implements Serializer.Name
6464
func (as *Avro) Name() string {
65-
return "avro"
65+
return "avro/binary"
6666
}
6767

6868
//Encode encodes an object into a byte array

gbus/serialization/proto.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515

1616
var _ gbus.Serializer = &Proto{}
1717

18+
const ProtoContentType = "application/x-protobuf"
19+
1820
//Proto a serializer for GBus uses protobuf
1921
type Proto struct {
2022
lock *sync.Mutex
@@ -33,7 +35,7 @@ func NewProtoSerializer(logger logrus.FieldLogger) gbus.Serializer {
3335

3436
//Name implements Serializer.Name
3537
func (as *Proto) Name() string {
36-
return "proto"
38+
return ProtoContentType
3739
}
3840

3941
//Encode encodes an object into a byte array

tests/bus_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/wework/grabbit/gbus/serialization"
1313

1414
"github.com/wework/grabbit/gbus/metrics"
15+
"github.com/wework/grabbit/gbus/policy"
1516

1617
"github.com/opentracing/opentracing-go"
1718
olog "github.com/opentracing/opentracing-go/log"
@@ -621,6 +622,27 @@ func TestOnlyRawMessageHandlersInvoked(t *testing.T) {
621622
}, t)
622623
}
623624

625+
func TestTypeAndContentTypeHeadersSet(t *testing.T) {
626+
cmd := Command1{}
627+
628+
bus := createNamedBusForTest(testSvc1)
629+
630+
policy := &policy.Generic{
631+
Funk: func(publishing *amqp.Publishing) {
632+
if publishing.Type != cmd.SchemaName() {
633+
t.Errorf("publishing.Type != cmd.SchemaName()")
634+
}
635+
dfb := bus.(*gbus.DefaultBus)
636+
if publishing.ContentType != dfb.Serializer.Name() {
637+
t.Errorf("expected %s as content-type but actual value was %s", dfb.Serializer.Name(), publishing.ContentType)
638+
}
639+
}}
640+
641+
bus.Start()
642+
defer bus.Shutdown()
643+
bus.Send(context.Background(), testSvc1, gbus.NewBusMessage(cmd), policy)
644+
}
645+
624646
func TestSendEmptyBody(t *testing.T) {
625647
/*
626648
test sending of message with len(payload) == 0 .

tests/protoSerialization_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ func TestProtoSerialization(t *testing.T) {
2424
serializer := serialization.NewProtoSerializer(logger)
2525

2626
name := serializer.Name()
27-
if name != "proto" {
28-
t.Fatalf("incorrect serializer name. expected:proto actual:%s", name)
27+
if name != serialization.ProtoContentType {
28+
t.Fatalf("incorrect serializer name. expected:application/x-protobuf actual:%s", name)
2929
}
3030
cmd := getProtoCommand()
3131
schemaName := cmd.SchemaName()

0 commit comments

Comments
 (0)