Open
Description
Callbacks are strictly more powerful and don’t require unnecessary goroutines.
Also is easy to write correct code
Server implementation(untested)
type ServerV2 struct {
Producer Producer
Consumer Consumer
}
type MessageSender func(*Message) error
type Consumer interface {
Init(topic, consumer string, sendMessage MessageSender) error
ReceiveConfirmation(*Confirmation) error
Close() error
}
type ConfirmationSender func(*Confirmation) error
type Producer interface {
Init(topic string, publisher string, sendConfirmation ConfirmationSender) error
ReceiveMessage(*Message) error
Close() error
}
func (s *ServerV2) Consume(stream MessageSource_ConsumeServer) error {
_, cancel := context.WithCancel(stream.Context())
defer cancel()
c := &consumer{Consumer: s.Consumer}
for {
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
if strings.HasSuffix(err.Error(), "context canceled") {
return nil
}
return err
}
//todo handle returned errors
go c.handleRequest(msg, stream)
}
}
type consumer struct {
started *atomicBool
Consumer Consumer
}
func (c *consumer) handleRequest(msg *ConsumerRequest, stream MessageSource_ConsumeServer) error {
switch {
case msg.GetStartRequest() != nil:
if c.started.Get() {
return ErrStartedTwice
}
sr := msg.GetStartRequest()
sendMessage:= func(m *Message) error {
err := stream.Send(m)
if err != nil {
return err
}
return nil
}
if err := c.Consumer.Init(sr.GetTopic(), sr.GetConsumer(), sendMessage); err != nil {
return err
}
c.started.Set(true)
case msg.GetConfirmation() != nil:
if !c.started.Get() {
return ErrInvalidConfirm
}
return c.Consumer.ReceiveConfirmation(msg.GetConfirmation())
default:
return ErrInvalidRequest
}
panic("impossible")
}
type publisher struct {
started *atomicBool
Producer Producer
}
func (p *publisher) handleRequest(msg *PublisherRequest, stream MessageSink_PublishServer) error {
switch {
case msg.GetStartRequest() != nil:
if p.started.Get() {
return ErrStartedTwice
}
sr := msg.GetStartRequest()
sendConfirmation := func(c *Confirmation) error {
return stream.Send(c)
}
p.Producer.Init(sr.Topic, "", sendConfirmation)
p.started.Set(true)
case msg.GetMsg() != nil:
if !p.started.Get() {
return ErrNotConnected
}
return p.Producer.ReceiveMessage(msg.GetMsg())
}
return nil
}
func (s *ServerV2) Publish(stream MessageSink_PublishServer) error {
_, cancel := context.WithCancel(stream.Context())
defer cancel()
p := &publisher{Producer: s.Producer}
for {
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
return err
}
if strings.HasSuffix(err.Error(), "context canceled") {
return err
}
return err
}
//todo handle returned errors
go p.handleRequest(msg, stream)
}
}
type atomicBool struct{ flag int32 }
func (b *atomicBool) Set(value bool) {
var i int32 = 0
if value {
i = 1
}
atomic.StoreInt32(&(b.flag), int32(i))
}
func (b *atomicBool) Get() bool {
if atomic.LoadInt32(&(b.flag)) != 0 {
return true
}
return false
}
and the backend(untested):
var _ = proximo.Consumer(&NatsHandlerConsumer{})
var _ = proximo.Producer(&NatsHandlerProducer{})
type NatsHandlerConsumer struct {
Url string
sub *nats.Subscription
}
func (n *NatsHandlerConsumer) Init(topic, consumer string, sendMessage proximo.MessageSender) error {
conn, err := nats.Connect(n.Url)
if err != nil {
return err
}
//defer conn.Close()
ch := make(chan *nats.Msg, 64) //TODO: make 64 configurable at startup time
sub, err := conn.ChanSubscribe(topic, ch)
if err != nil {
return err
}
n.sub = sub
for {
select { // drop
case m := <-ch:
sendMessage(&proximo.Message{
Data: m.Data,
Id: proximo.GenerateID(),
})
}
}
return nil
}
func (n *NatsHandlerConsumer) ReceiveConfirmation(c *proximo.Confirmation) error { return nil }
func (n *NatsHandlerConsumer) Close() error { return n.sub.Unsubscribe() }
type NatsHandlerProducer struct {
Url string
topic string
sendConfirmation proximo.ConfirmationSender
conn *nats.Conn
}
func (n *NatsHandlerProducer) Init(topic string, publisher string, sendConfirmation proximo.ConfirmationSender) error {
var err error
n.topic = topic
n.conn, err = nats.Connect(n.Url)
if err != nil {
return err
}
return err
}
func (n *NatsHandlerProducer) ReceiveMessage(msg *proximo.Message) error {
err := n.conn.Publish(n.topic, msg.GetData())
n.sendConfirmation(&proximo.Confirmation{msg.GetId()})
return err
}
func (n *NatsHandlerProducer) Close() error {
n.conn.Close()
return nil
}
Metadata
Assignees
Labels
No labels
Activity