@@ -52,6 +52,35 @@ func TestGRPCServerConfig(t *testing.T) {
5252 assert .Equal (t , "8081" , ServerGRPC .Port )
5353}
5454
55+ func TestServerMQTTConfig (t * testing.T ) {
56+ os .Setenv ("SERVER_MQTT_CONSUL_ADDRESS" , "consul:8081" )
57+ os .Setenv ("SERVER_MQTT_CONSUL_KV_KEY" , "kv/path" )
58+ os .Setenv ("SERVER_MQTT_CONSUL_HEALTH_ONLY" , "true" )
59+ os .Setenv ("SERVER_MQTT_CONSUL_WAIT_TIME" , "300" )
60+ os .Setenv ("SERVER_MQTT_AUTH_USERNAME" , "test" )
61+ os .Setenv ("SERVER_MQTT_AUTH_PASSWORD" , "pass" )
62+ os .Setenv ("SERVER_MQTT_CONSUMER_RETRY_INTERVAL_IN_SEC" , "1" )
63+ os .Setenv ("SERVER_MQTT_CONSUMER_WRITE_TIMEOUT_IN_SEC" , "1" )
64+ os .Setenv ("SERVER_MQTT_CONSUMER_LOG_LEVEL" , "warn" )
65+ os .Setenv ("SERVER_MQTT_CONSUMER_POOL_SIZE" , "1" )
66+ os .Setenv ("SERVER_MQTT_CONSUMER_TOPIC_FORMAT" , "default-topic" )
67+ os .Setenv ("SERVER_MQTT_CONNECTION_GROUP" , "consumer" )
68+ serverMQTTConfigLoader ()
69+ assert .Equal (t , "consul:8081" , ServerMQTT .ConsulConfig .Address )
70+ assert .Equal (t , "kv/path" , ServerMQTT .ConsulConfig .KVKey )
71+ assert .Equal (t , true , ServerMQTT .ConsulConfig .HealthOnly )
72+ assert .Equal (t , 300 * time .Second , ServerMQTT .ConsulConfig .WaitTime )
73+ assert .Equal (t , "test" , ServerMQTT .AuthConfig .Username )
74+ assert .Equal (t , "pass" , ServerMQTT .AuthConfig .Password )
75+ assert .Equal (t , 1 * time .Second , ServerMQTT .ConsumerConfig .RetryIntervalInSec )
76+ assert .Equal (t , 1 * time .Second , ServerMQTT .ConsumerConfig .WriteTimeoutInSec )
77+ assert .Equal (t , "warn" , ServerMQTT .ConsumerConfig .LogLevel )
78+ assert .Equal (t , 1 , ServerMQTT .ConsumerConfig .PoolSize )
79+ assert .Equal (t , "default-topic" , ServerMQTT .ConsumerConfig .TopicFormat )
80+ assert .Equal (t , "consumer" , ServerMQTT .ConnGroup )
81+
82+ }
83+
5584func TestDynamicConfigLoad (t * testing.T ) {
5685 os .Setenv ("PUBLISHER_KAFKA_CLIENT_RANDOM" , "anything" )
5786 os .Setenv ("PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS" , "localhost:9092" )
@@ -67,6 +96,8 @@ func TestKafkaConfig_ToKafkaConfigMap(t *testing.T) {
6796 os .Setenv ("PUBLISHER_KAFKA_CLIENT_ACKS" , "1" )
6897 os .Setenv ("PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES" , "10000" )
6998 os .Setenv ("SOMETHING_PUBLISHER_KAFKA_CLIENT_SOMETHING" , "anything" )
99+ os .Setenv ("PUBLISHER_KAFKA_HEALTHCHECK_TOPIC_NAME" , "test-log" )
100+ os .Setenv ("PUBLISHER_KAFKA_HEALTHCHECK_TIMEOUT_MS" , "5000" )
70101 publisherKafkaConfigLoader ()
71102 kafkaConfig := PublisherKafka .ToKafkaConfigMap ()
72103 bootstrapServer , _ := kafkaConfig .Get ("bootstrap.servers" , "" )
@@ -76,6 +107,8 @@ func TestKafkaConfig_ToKafkaConfigMap(t *testing.T) {
76107 assert .Equal (t , "" , topic )
77108 assert .NotEqual (t , something , "anything" )
78109 assert .Equal (t , 4 , len (* kafkaConfig ))
110+ assert .Equal (t , "test-log" , PublisherKafka .HealthCheckConfig .TopicName )
111+ assert .Equal (t , 5000 , PublisherKafka .HealthCheckConfig .TimeOut )
79112}
80113
81114func TestWorkerConfig (t * testing.T ) {
0 commit comments