Skip to content

Commit

Permalink
Make NATS more verbose
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <[email protected]>
  • Loading branch information
alexellis committed Feb 21, 2020
1 parent 8831593 commit 98e1110
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 14 deletions.
16 changes: 13 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package main

import (
"log"

"github.com/openfaas-incubator/connector-sdk/types"
"github.com/openfaas-incubator/nats-connector/config"
"github.com/openfaas-incubator/nats-connector/nats"
Expand All @@ -29,9 +31,17 @@ func main() {

brokerConfig := nats.BrokerConfig{
Host: config.Broker,
ConnTimeout: config.UpstreamTimeout,
ConnTimeout: config.UpstreamTimeout, // ConnTimeout isn't the same as UpstreamTimeout, it's just the delay to connect to NATS.
}

broker := nats.NewBroker(brokerConfig)
broker.Subscribe(controller, config.Topics)
broker, err := nats.NewBroker(brokerConfig)

if err != nil {
log.Fatal(err)
}

err = broker.Subscribe(controller, config.Topics)
if err != nil {
log.Fatal(err)
}
}
54 changes: 43 additions & 11 deletions nats/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package nats

import (
"fmt"
"log"
"sync"
"time"
Expand All @@ -13,30 +14,41 @@ import (
)

const queueGroup = "openfaas_nats_worker_group"

const clientName = "openfaas_connector"

// BrokerConfig high level config for the broker
type BrokerConfig struct {
Host string

// Host is the NATS address, the port is hard-coded to 4222
Host string

// ConnTimeout is the timeout for Dial on a connection.
ConnTimeout time.Duration
}

// Broker used to subscribe to NATS subjects
type Broker interface {
Subscribe(types.Controller, []string)
Subscribe(types.Controller, []string) error
}

type broker struct {
client *nats.Conn
}

// NATSPort hard-coded port for NATS
const NATSPort = "4222"

// NewBroker loops until we are able to connect to the NATS server
func NewBroker(config BrokerConfig) Broker {
func NewBroker(config BrokerConfig) (Broker, error) {
broker := &broker{}
brokerURL := fmt.Sprintf("nats://%s:%s", config.Host, NATSPort)

brokerURL := "nats://" + config.Host + ":4222"
for {
client, err := nats.Connect(brokerURL, nats.Timeout(config.ConnTimeout), nats.Name(clientName))
client, err := nats.Connect(brokerURL,
nats.Timeout(config.ConnTimeout),
nats.Name(clientName))

if client != nil && err == nil {
broker.client = client
break
Expand All @@ -45,30 +57,50 @@ func NewBroker(config BrokerConfig) Broker {
if client != nil {
client.Close()
}

log.Println("Wait for brokers to come up.. ", brokerURL)
time.Sleep(1 * time.Second)
// TODO Add healthcheck
}
return broker

return broker, nil
}

// Subscribe to a list of NATS subjects and block until interrupted
func (b *broker) Subscribe(controller types.Controller, topics []string) {
func (b *broker) Subscribe(controller types.Controller, topics []string) error {
log.Printf("Configured topics: %v", topics)

if b.client == nil {
return fmt.Errorf("client was nil, try to reconnect")
}

wg := sync.WaitGroup{}
wg.Add(1)

subs := []*nats.Subscription{}
for _, topic := range topics {
log.Printf("Binding to topic: %v", topic)
// check client not nil
b.client.QueueSubscribe(topic, queueGroup, func(m *nats.Msg) {
log.Printf("Received topic: %s, message: %s", m.Subject, string(m.Data))
log.Printf("Binding to topic: %q", topic)

sub, err := b.client.QueueSubscribe(topic, queueGroup, func(m *nats.Msg) {
log.Printf("Topic: %s, message: %q", m.Subject, string(m.Data))

controller.Invoke(m.Subject, &m.Data)
})
subs = append(subs, sub)

if err != nil {
log.Printf("Unable to bind to topic: %s", topic)
}
}

for _, sub := range subs {
log.Printf("Subscription: %s ready", sub.Subject)
}

// interrupt handling
wg.Wait()

b.client.Close()

return nil
}

0 comments on commit 98e1110

Please sign in to comment.