8
8
"log"
9
9
"net/http"
10
10
"os"
11
+ "regexp"
11
12
"runtime"
12
13
"sync"
13
14
"sync/atomic"
@@ -37,25 +38,35 @@ var reSent int32
37
38
38
39
const enableResend = false
39
40
41
+ func formatCommas (num int32 ) string {
42
+ str := fmt .Sprintf ("%d" , num )
43
+ re := regexp .MustCompile ("(\\ d+)(\\ d{3})" )
44
+ for n := "" ; n != str ; {
45
+ n = str
46
+ str = re .ReplaceAllString (str , "$1,$2" )
47
+ }
48
+ return str
49
+ }
50
+
40
51
func main () {
41
52
go func () {
42
53
log .Println (http .ListenAndServe ("localhost:6060" , nil ))
43
54
}()
44
55
// Your application code here
45
56
46
57
// Tune the parameters to test the reliability
47
- const messagesToSend = 50_000_000
48
- const numberOfProducers = 5
49
- const concurrentProducers = 2
58
+ const messagesToSend = 10_000_000
59
+ const numberOfProducers = 1
60
+ const concurrentProducers = 1
50
61
const numberOfConsumers = 1
51
62
const sendDelay = 1 * time .Millisecond
52
63
const delayEachMessages = 500
53
- const maxProducersPerClient = 2
54
- const maxConsumersPerClient = 2
64
+ const maxProducersPerClient = 1
65
+ const maxConsumersPerClient = 1
55
66
//
56
67
57
68
reader := bufio .NewReader (os .Stdin )
58
- stream .SetLevelInfo (logs .DEBUG )
69
+ stream .SetLevelInfo (logs .INFO )
59
70
fmt .Println ("Reliable Producer/Consumer example" )
60
71
fmt .Println ("Connecting to RabbitMQ streaming ..." )
61
72
@@ -69,6 +80,7 @@ func main() {
69
80
SetMaxProducersPerClient (maxProducersPerClient ).
70
81
SetMaxConsumersPerClient (maxConsumersPerClient ).
71
82
SetUris (addresses ))
83
+
72
84
CheckErr (err )
73
85
fmt .Printf ("Environment created with %d producers and %d consumers\n \n " , maxProducersPerClient , maxConsumersPerClient )
74
86
@@ -82,7 +94,7 @@ func main() {
82
94
}
83
95
err = env .DeclareStream (streamName ,
84
96
& stream.StreamOptions {
85
- MaxLengthBytes : stream.ByteCapacity {}.GB (1 ),
97
+ MaxLengthBytes : stream.ByteCapacity {}.GB (10 ),
86
98
},
87
99
)
88
100
CheckErr (err )
@@ -95,12 +107,12 @@ func main() {
95
107
totalConfirmed := atomic .LoadInt32 (& confirmed ) + atomic .LoadInt32 (& fail )
96
108
expectedMessages := messagesToSend * numberOfProducers * concurrentProducers * 2
97
109
fmt .Printf ("********************************************\n " )
98
- fmt .Printf ("%s - ToSend: %d - nProducers: %d - concurrentProducers: %d - nConsumers %d \n " , time .Now ().Format (time .RFC850 ),
99
- expectedMessages , numberOfProducers , concurrentProducers , numberOfConsumers )
100
- fmt .Printf ("Sent:%d - ReSent %d - Confirmed:%d - Not confirmed:%d - Fail+Confirmed :%d \n " ,
101
- sent , atomic .LoadInt32 (& reSent ), atomic .LoadInt32 (& confirmed ), atomic .LoadInt32 (& fail ), totalConfirmed )
102
- fmt .Printf ("Total Consumed: %d - Per consumer: %d \n " , atomic .LoadInt32 (& consumed ),
103
- atomic .LoadInt32 (& consumed )/ numberOfConsumers )
110
+ fmt .Printf ("%s - ToSend: %s - nProducers: %d - concurrentProducers: %d - nConsumers %d \n " , time .Now ().Format (time .RFC850 ),
111
+ formatCommas ( int32 ( expectedMessages )) , numberOfProducers , concurrentProducers , numberOfConsumers )
112
+ fmt .Printf ("Sent:%s - ReSent:%s - Confirmed:%s - Not confirmed:%s - Fail+Confirmed:%s \n " ,
113
+ formatCommas ( sent ), formatCommas ( atomic .LoadInt32 (& reSent )), formatCommas ( atomic .LoadInt32 (& confirmed )), formatCommas ( atomic .LoadInt32 (& fail )), formatCommas ( totalConfirmed ) )
114
+ fmt .Printf ("Total Consumed:%s - Per consumer:%s \n " , formatCommas ( atomic .LoadInt32 (& consumed ) ),
115
+ formatCommas ( atomic .LoadInt32 (& consumed )/ numberOfConsumers ) )
104
116
105
117
for _ , producer := range producers {
106
118
fmt .Printf ("%s, status: %s \n " ,
0 commit comments