-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathmain.go
116 lines (89 loc) · 2.87 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Copyright (c) OpenFaaS Author(s) 2019. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"time"
"github.com/openfaas/connector-sdk/types"
sdk "github.com/openfaas/go-sdk"
)
func main() {
var (
username,
password,
gateway,
topic string
interval time.Duration
)
flag.StringVar(&username, "username", "admin", "username")
flag.StringVar(&password, "password", "", "password")
flag.StringVar(&gateway, "gateway", "http://127.0.0.1:8080", "gateway")
flag.DurationVar(&interval, "interval", time.Second*10, "Interval between emitting a sample message")
flag.StringVar(&topic, "topic", "payment.received", "Sample topic name to emit from timer")
flag.Parse()
if len(password) == 0 {
password = lookupPasswordViaKubectl()
}
// Set Print* variables to false for production use
config := &types.ControllerConfig{
RebuildInterval: time.Second * 30,
GatewayURL: gateway,
PrintResponse: true,
PrintRequestBody: true,
PrintResponseBody: true,
AsyncFunctionInvocation: false,
ContentType: "text/plain",
UserAgent: "openfaas-ce/timer-connector",
UpstreamTimeout: time.Second * 120,
}
fmt.Printf("Tester connector. Topic: %s, Interval: %s\n", topic, interval)
auth := &sdk.BasicAuth{
Username: username,
Password: password,
}
controller := types.NewController(auth, config)
receiver := ResponseReceiver{}
controller.Subscribe(&receiver)
controller.BeginMapBuilder()
additionalHeaders := http.Header{}
additionalHeaders.Add("X-Connector", "cmd/timer")
// Simulate events emitting from queue/pub-sub
// by sleeping for 10 seconds between emitting the same message
messageID := 0
t := time.NewTicker(interval)
for {
<-t.C
log.Printf("[tester] Emitting event on topic payment.received - %s\n", gateway)
h := additionalHeaders.Clone()
// Add a de-dupe header to the message
h.Add("X-Message-Id", fmt.Sprintf("%d", messageID))
payload, _ := json.Marshal(samplePayload{
CreatedAt: time.Now(),
MessageID: messageID,
})
controller.Invoke(topic, &payload, h)
messageID++
t.Reset(interval)
}
}
type samplePayload struct {
CreatedAt time.Time `json:"createdAt"`
MessageID int `json:"messageId"`
}
// ResponseReceiver enables connector to receive results from the
// function invocation
type ResponseReceiver struct {
}
// Response is triggered by the controller when a message is
// received from the function invocation
func (ResponseReceiver) Response(res types.InvokerResponse) {
if res.Error != nil {
log.Printf("[tester] error: %s", res.Error.Error())
} else {
log.Printf("[tester] result: [%d] %s => %s (%d) bytes (%fs)", res.Status, res.Topic, res.Function, len(*res.Body), res.Duration.Seconds())
}
}