Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@ jtimon-darwin-amd64
jtimon-linux-amd64
*.log
*.logs
.*
debug.test
jtimon-windows-amd64.exe
coverage.out
up-*
cp-*
cp-configs
bin
sample-log
config-file-lists
config-list.txt
*.tar
diff-*
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Config struct {
Alias string `json:"alias"`
PasswordDecoder string `json:"password-decoder"`
EnableUintSupport bool `json:"enable-uint"`
TCP *TCPConfig `json:"tcp"`
}

// GnmiConfig definition
Expand Down Expand Up @@ -361,6 +362,12 @@ func ConfigRead(jctx *JCtx, init bool, restart *bool) error {
if err := KafkaInit(jctx); err != nil {
log.Printf("KafkaInit error : %v", err)
}
// TCP Client
if *tcpPush {
if err := TcpClientInit(jctx.config.TCP, &jctx.tcpCtx); err != nil {
log.Printf("TCPClientInit error : %v", err)
}
}
} else {
err := HandleConfigChange(jctx, config, restart)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions jtimon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,16 @@ func prometheusCollect(host string, port int, jctx *JCtx) error {

return nil
}

func tcpExport(host string, port int, jctx *JCtx) error {
// var f *os.File
// var err error

// if f, err = os.Create(jctx.file + ".testres"); err != nil {
// return err
// }
// defer f.Close()

// url := fmt.Sprintf("http://%s:%d/metrics", host, port)
return nil
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
myCert = flag.String("cert", "./certs/self_signed/server-cert.pem", "Path of server cert")
myKey = flag.String("pem", "./certs/self_signed/server-key.pem", "Path of server key")
kafkaBroker = flag.String("kafka-broker", "kafka:9092", "Comma seperated list of Kafka brokers each in the form ip:port")
tcpPush = flag.Bool("tcp-push", false, "Send telemetry packet to TCP endpoint as JSON")

jtimonVersion = "version-not-available"
buildTime = "build-time-not-available"
Expand Down
Empty file added outfile
Empty file.
26 changes: 26 additions & 0 deletions sample-config/3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"host": "host-or-ip",
"port": 50051,
"cid": "mac1",
"paths": [{
"path": "/interfaces",
"freq": 2000
}],
"influx": {
"server": "127.0.0.1",
"port": 8086,
"dbname": "db",
"user": "influx",
"password": "influxdb",
"recreate": true,
"measurement": "m"
},
"log": {
"file": "jtimon.log"
},
"tcp": {
"server": "<ip-addr>",
"port": 10517,
"<credentials>": "<credentials>"
}
}
25 changes: 16 additions & 9 deletions subscribe_juniper_junos.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (
"encoding/json"
"os"
"syscall"

na_pb "github.com/nileshsimaria/jtimon/telemetry"
"github.com/golang/protobuf/proto"
auth_pb "github.com/nileshsimaria/jtimon/authentication"
na_pb "github.com/nileshsimaria/jtimon/telemetry"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -94,11 +93,11 @@ func handleOnePacket(ocData *na_pb.OpenConfigData, jctx *JCtx) {
}

// subSendAndReceive handles the following
// - Opens up a stream for receiving the telemetry data
// - Handles SIGHUP by terminating the current stream and requests the
// caller to restart the streaming by setting the corresponding return
// code
// - In case of an error, Set the error code to restart the connection.
// - Opens up a stream for receiving the telemetry data
// - Handles SIGHUP by terminating the current stream and requests the
// caller to restart the streaming by setting the corresponding return
// code
// - In case of an error, Set the error code to restart the connection.
func subSendAndReceive(conn *grpc.ClientConn, jctx *JCtx,
subReqM na_pb.SubscriptionRequest) SubErrorCode {

Expand Down Expand Up @@ -131,7 +130,6 @@ func subSendAndReceive(conn *grpc.ClientConn, jctx *JCtx,
go func() {
// Go Routine which actually starts the streaming connection and receives the data
jLog(jctx, fmt.Sprintf("Receiving telemetry data from %s:%d\n", jctx.config.Host, jctx.config.Port))

for {
ocData, err := stream.Recv()
if err == io.EOF {
Expand All @@ -158,6 +156,8 @@ func subSendAndReceive(conn *grpc.ClientConn, jctx *JCtx,
if b, err := json.MarshalIndent(ocData, "", " "); err == nil {
jLog(jctx, fmt.Sprintf("%s\n", b))
}
} else {
jLog(jctx, fmt.Sprintf("%s\n", ocData))
}

if *print || *stateHandler || IsVerboseLogging(jctx) {
Expand All @@ -170,7 +170,6 @@ func subSendAndReceive(conn *grpc.ClientConn, jctx *JCtx,
} else {
go addIDB(ocData, jctx, rtime)
}

// to prometheus
if *prom {
if *noppgoroutines {
Expand All @@ -185,6 +184,14 @@ func subSendAndReceive(conn *grpc.ClientConn, jctx *JCtx,
} else {
go addKafka(ocData, jctx, rtime)
}
// to tcp endpoint
if *tcpPush {
if *noppgoroutines {
AddTcpEndpoint(ocData, jctx)
} else {
go AddTcpEndpoint(ocData, jctx)
}
}
}
}()
for {
Expand Down
3 changes: 3 additions & 0 deletions subscribe_juniper_junos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,9 @@ func TestInflux(t *testing.T) {
}
})
}
}
func TestTCP(t *testing.T) {

}
func TestJTISIMMaxRun(t *testing.T) {
tests := []struct {
Expand Down
166 changes: 166 additions & 0 deletions tcp_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package main // TODO: package as standalone


import (
"encoding/json"
"bytes"
"fmt"
"log"
"net"
"sync"
"time"
backoff "github.com/cenkalti/backoff/v4"
na_pb "github.com/nileshsimaria/jtimon/telemetry"
)

// Signal enums
const (
TCPDisconnected = iota
)

// Config Definition
type TCPConfig struct {
Host string `json:"host"`
Port int `json:"port"`
}

// Runtime contexts
type TCPClient struct {
conn net.Conn
cv *sync.Cond
}
type TCPCtx struct {
client *TCPClient
backoff *backoff.ExponentialBackOff
statusCh chan int
dataCh chan []byte
}

func tcpConnect(tcfg *TCPConfig, tctx *TCPCtx) error {
addr := fmt.Sprintf("%s:%d", tcfg.Host, tcfg.Port)
conn, err := net.Dial("tcp", addr)
if err != nil {
log.Printf("error dialing...\n")
sendStatusDisconnect(tctx)
return err
}
log.Printf("TCP endpoint: connected to %v\n", addr)
tctx.client.conn = conn
tctx.client.cv.Broadcast()
tctx.backoff.Reset()
return nil
}

func sendStatusDisconnect(tctx *TCPCtx) {
select {
case tctx.statusCh <- TCPDisconnected:
default:
}
tctx.client.cv.L.Lock()
tctx.client.conn = nil
tctx.client.cv.L.Unlock()
}

func heartbeat(tctx *TCPCtx) {
for {
var err error = nil
func() {
tctx.client.cv.L.Lock()
defer tctx.client.cv.L.Unlock()
if tctx.client.conn == nil {
return
}
b := []byte{}
_, err = tctx.client.conn.Write(b)
}()
if err != nil {
log.Printf("TCP endpoint: heartbeat lost\n")
sendStatusDisconnect(tctx)
}
time.Sleep(1 * time.Second)
}
}

func TcpClientInit(tcfg *TCPConfig, tctx **TCPCtx) error {
if tcfg == nil {
return fmt.Errorf("no TCP endpoint configuration provided")
}
*tctx = &TCPCtx{dataCh: make(chan []byte), statusCh: make(chan int, 1)}
(*tctx).client = &TCPClient{conn: nil, cv: sync.NewCond(&sync.Mutex{})}
(*tctx).backoff = backoff.NewExponentialBackOff()
(*tctx).backoff.MaxInterval = 20 * time.Second

go heartbeat(*tctx)
go func() {
connect: {
go func() {
if err := tcpConnect(tcfg, *tctx); err != nil {
log.Printf("TCP endpoint: connection error: %v\n", err)
}
}()
for {
select {
case data, ok := <-(*tctx).dataCh:
if !ok {
return
}
go func() {
if err := sendData(*tctx, data); err != nil {
log.Printf("TCP endpoint: data send error: %v\n", err)
}
}()
case <-(*tctx).statusCh:
goto reconnect
}
}
}
reconnect: {
time.Sleep((*tctx).backoff.NextBackOff())
goto connect
}
}()
return nil
}

func tcpClientTeardown(tctx *TCPCtx) {
if tctx != nil {
defer tctx.client.conn.Close()
}
}

func sendData(tctx *TCPCtx, m[]byte) error {
tctx.client.cv.L.Lock()
defer tctx.client.cv.L.Unlock()
for tctx.client.conn == nil {
tctx.client.cv.Wait()
}
if _, err := tctx.client.conn.Write(m); err != nil {
return err
}
return nil
}

func PushTcpEndpoint(tctx *TCPCtx, b []byte) error {
tctx.dataCh <- b
return nil
}

func AddTcpEndpoint(ocData *na_pb.OpenConfigData, jctx *JCtx) {
b, err := processOcData(ocData)
if err != nil {
jLog(jctx, fmt.Sprintf("marshal error: %v", err))
return
}
PushTcpEndpoint(jctx.tcpCtx, b)
}

func processOcData(ocData *na_pb.OpenConfigData) ([]byte, error) {
b, err := json.Marshal(ocData)
if err != nil {
return nil, err
}
stripped := bytes.Replace(b, []byte("\n"), []byte(""), -1)
stripped = bytes.Replace(stripped, []byte("\r"), []byte(""), -1)
capped := append(stripped, '\n')
return capped, nil
}
4 changes: 4 additions & 0 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type JCtx struct {
testExp *os.File
testRes *os.File
receivedSyncRsp bool
tcpCtx *TCPCtx
}

// JWorkers holds worker
Expand Down Expand Up @@ -273,6 +274,9 @@ func NewJWorker(file string, wg *sync.WaitGroup, wsChan chan string) (*JWorker,
return w, err
}
log.Printf("%v, jctx.config.Kafka.producer: %v", jctx.config.Host, jctx.config.Kafka)
log.Printf("%v, jctx.config.TCP: %v", jctx.config.Host, jctx.config.TCP)


if alias, err := NewAlias(jctx.config.Alias); err == nil {
jctx.alias = alias
} else {
Expand Down