11package com.gojek.courier.app.ui
22
33import android.os.Bundle
4+ import android.util.Log
45import androidx.appcompat.app.AppCompatActivity
56import com.gojek.chuckmqtt.external.MqttChuckConfig
67import com.gojek.chuckmqtt.external.MqttChuckInterceptor
78import com.gojek.chuckmqtt.external.Period
89import com.gojek.courier.Courier
10+ import com.gojek.courier.QoS
11+ import com.gojek.courier.QoS.ZERO
912import com.gojek.courier.app.R
1013import com.gojek.courier.app.data.network.CourierService
1114import com.gojek.courier.app.data.network.model.Message
15+ import com.gojek.courier.callback.SendMessageCallback
1216import com.gojek.courier.logging.ILogger
1317import com.gojek.courier.messageadapter.gson.GsonMessageAdapterFactory
18+ import com.gojek.courier.messageadapter.text.TextMessageAdapterFactory
1419import com.gojek.courier.streamadapter.rxjava2.RxJava2StreamAdapterFactory
1520import com.gojek.mqtt.auth.Authenticator
1621import com.gojek.mqtt.client.MqttClient
@@ -24,6 +29,7 @@ import com.gojek.mqtt.model.AdaptiveKeepAliveConfig
2429import com.gojek.mqtt.model.KeepAlive
2530import com.gojek.mqtt.model.MqttConnectOptions
2631import com.gojek.mqtt.model.ServerUri
32+ import com.gojek.mqtt.model.Will
2733import com.gojek.workmanager.pingsender.WorkManagerPingSenderConfig
2834import com.gojek.workmanager.pingsender.WorkPingSenderFactory
2935import kotlinx.android.synthetic.main.activity_main.brokerIP
@@ -78,12 +84,37 @@ class MainActivity : AppCompatActivity() {
7884 send.setOnClickListener {
7985 courierService.publish(
8086 topic = topic.text.toString(),
81- message = Message (123 , message.text.toString())
87+ message = Message (123 , message.text.toString()),
88+ callback = object : SendMessageCallback {
89+ override fun onMessageSendTrigger () {
90+ Log .d(" Courier" , " onMessageSendTrigger" )
91+ }
92+
93+ override fun onMessageWrittenOnSocket () {
94+ Log .d(" Courier" , " onMessageWrittenOnSocket" )
95+ }
96+
97+ override fun onMessageSendSuccess () {
98+ Log .d(" Courier" , " onMessageSendSuccess" )
99+ }
100+
101+ override fun onMessageSendFailure (error : Throwable ) {
102+ Log .d(" Courier" , " onMessageSendFailure" )
103+ }
104+ }
82105 )
83106 }
84107
85108 subscribe.setOnClickListener {
86- courierService.subscribe(topic = topic.text.toString())
109+ val topics = topic.text.toString().split(" ," )
110+ val stream = if (topics.size == 1 ) {
111+ courierService.subscribe(topic = topics[0 ])
112+ } else {
113+ val topicMap = mutableMapOf<String , QoS >()
114+ for (topic in topics) { topicMap[topic] = ZERO }
115+ courierService.subscribeAll(topicMap = topicMap)
116+ }
117+ stream.subscribe { Log .d(" Courier" , " Message received: $it " ) }
87118 }
88119
89120 unsubscribe.setOnClickListener {
@@ -92,12 +123,21 @@ class MainActivity : AppCompatActivity() {
92123 }
93124
94125 private fun connectMqtt (clientId : String , username : String , password : String , ip : String , port : Int ) {
126+
127+ val will = Will (
128+ topic = " last/will/topic" ,
129+ message = " Client disconnected unexpectedly" ,
130+ qos = QoS .ZERO ,
131+ retained = false
132+ )
133+
95134 val connectOptions = MqttConnectOptions .Builder ()
96135 .serverUris(listOf (ServerUri (ip, port, if (port == 443 ) " ssl" else " tcp" )))
97136 .clientId(clientId)
98137 .userName(username)
99138 .password(password)
100139 .cleanSession(false )
140+ .will(will)
101141 .keepAlive(KeepAlive (timeSeconds = 30 ))
102142 .build()
103143
@@ -129,19 +169,20 @@ class MainActivity : AppCompatActivity() {
129169 ),
130170 inactivityTimeoutSeconds = 45 ,
131171 activityCheckIntervalSeconds = 30 ,
172+ connectPacketTimeoutSeconds = 5 ,
132173 incomingMessagesTTLSecs = 60 ,
133174 incomingMessagesCleanupIntervalSecs = 10 ,
134175 maxInflightMessagesLimit = 1000 ,
135176 ),
136- pingSender = WorkPingSenderFactory .createMqttPingSender(applicationContext, WorkManagerPingSenderConfig ())
177+ pingSender = WorkPingSenderFactory .createMqttPingSender(applicationContext, WorkManagerPingSenderConfig (sendForcePing = true ))
137178 )
138179 mqttClient = MqttClientFactory .create(this , mqttConfig)
139180 mqttClient.addEventHandler(eventHandler)
140181
141182 val configuration = Courier .Configuration (
142183 client = mqttClient,
143184 streamAdapterFactories = listOf (RxJava2StreamAdapterFactory ()),
144- messageAdapterFactories = listOf (GsonMessageAdapterFactory ()),
185+ messageAdapterFactories = listOf (TextMessageAdapterFactory (), GsonMessageAdapterFactory ()),
145186 logger = getLogger()
146187 )
147188 val courier = Courier (configuration)
0 commit comments