@@ -2,15 +2,11 @@ package main
2
2
3
3
import (
4
4
"context"
5
- "fmt"
6
5
"log"
7
- "math/rand"
8
6
"time"
9
7
10
- amqp "github.com/rabbitmq/amqp091-go"
11
8
"github.com/torchiaf/Sensors/controller/config"
12
- "github.com/torchiaf/Sensors/controller/models"
13
- "github.com/torchiaf/Sensors/controller/utils"
9
+ "github.com/torchiaf/Sensors/rpc_client"
14
10
)
15
11
16
12
func failOnError (err error , msg string ) {
@@ -19,103 +15,37 @@ func failOnError(err error, msg string) {
19
15
}
20
16
}
21
17
22
- func randomString (l int ) string {
23
- bytes := make ([]byte , l )
24
- for i := 0 ; i < l ; i ++ {
25
- bytes [i ] = byte (randInt (65 , 90 ))
26
- }
27
- return string (bytes )
28
- }
29
-
30
- func randInt (min int , max int ) int {
31
- return min + rand .Intn (max - min )
32
- }
33
-
34
- func exec (routingKey string , message models.Message ) (res string , err error ) {
35
-
36
- address := fmt .Sprintf ("amqp://%s:%s@%s:%s/" , config .Config .RabbitMQ .Username , config .Config .RabbitMQ .Password , config .Config .RabbitMQ .Host , config .Config .RabbitMQ .Port )
37
-
38
- conn , err := amqp .Dial (address )
39
- failOnError (err , "Failed to connect to RabbitMQ" )
40
- defer conn .Close ()
41
-
42
- ch , err := conn .Channel ()
43
- failOnError (err , "Failed to open a channel" )
44
- defer ch .Close ()
45
-
46
- q , err := ch .QueueDeclare (
47
- "" , // name
48
- false , // durable
49
- false , // delete when unused
50
- true , // exclusive
51
- false , // noWait
52
- nil , // arguments
53
- )
54
- failOnError (err , "Failed to declare a queue" )
55
-
56
- msgs , err := ch .Consume (
57
- q .Name , // queue
58
- "" , // consumer
59
- true , // auto-ack
60
- false , // exclusive
61
- false , // no-local
62
- false , // no-wait
63
- nil , // args
64
- )
65
- failOnError (err , "Failed to register a consumer" )
66
-
67
- corrId := randomString (32 )
68
-
69
- ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
70
- defer cancel ()
71
-
72
- log .Printf ("Msg: %s:" , utils .ToString (message ))
73
-
74
- err = ch .PublishWithContext (ctx ,
75
- "" , // exchange
76
- routingKey , // routing key
77
- false , // mandatory
78
- false , // immediate
79
- amqp.Publishing {
80
- ContentType : "text/plain" ,
81
- CorrelationId : corrId ,
82
- ReplyTo : q .Name ,
83
- Body : []byte (utils .ToString (message )),
84
- })
85
- failOnError (err , "Failed to publish a message" )
86
-
87
- for d := range msgs {
88
- if corrId == d .CorrelationId {
89
- res = string (d .Body )
90
- failOnError (err , "Error msgs" )
91
- break
92
- }
93
- }
94
-
95
- return
96
- }
97
-
98
18
func main () {
99
- rand .Seed (time .Now ().UTC ().UnixNano ())
100
-
101
19
log .Printf ("Config %+v" , config .Config )
102
20
21
+ client := rpc_client .New (context .Background ())
22
+
103
23
for {
104
24
for _ , module := range config .Config .Modules {
105
25
log .Printf (" [x] Requesting on {%s, %s, %s}" , module .Name , module .Type , module .RoutingKey )
106
26
107
- res , err := exec (
27
+ res , err := client . Read (
108
28
module .RoutingKey ,
109
- models.Message {
110
- Device : "dht11" ,
111
- // Args: map[string]interface{}{
112
- // "foo": "bar",
113
- // },
114
- },
29
+ "dht11" ,
30
+ []string {},
115
31
)
116
- failOnError (err , "Failed to handle RPC request" )
117
-
118
- log .Printf (" [%s] Got %+v" , module .Name , res )
32
+ failOnError (err , "Failed to handle RPC request: dht11" )
33
+
34
+ log .Printf (" [%s] [%s] Got %+v" , module .Name , "dht11" , res )
35
+
36
+ if module .Name == "raspberrypi-1" {
37
+ res1 , err := client .Read (
38
+ module .RoutingKey ,
39
+ "tm1637" ,
40
+ []string {
41
+ "temperature" ,
42
+ "12" ,
43
+ },
44
+ )
45
+ failOnError (err , "Failed to handle RPC request: tm1637" )
46
+
47
+ log .Printf (" [%s] [%s] Got %+v" , module .Name , "tm1637" , res1 )
48
+ }
119
49
}
120
50
121
51
time .Sleep (time .Second )
0 commit comments