-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathMessageReceiver.scala
134 lines (120 loc) · 4.85 KB
/
MessageReceiver.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package de.m7w3.signal.messages
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{Executors, ThreadFactory, TimeUnit, TimeoutException}
import de.m7w3.signal.events.{EventPublisher, PreKeyEvent, ReceiptEvent}
import de.m7w3.signal.store.model.Registration
import de.m7w3.signal.{ApplicationContext, Constants, Logging}
import org.whispersystems.libsignal._
import org.whispersystems.signalservice.api.crypto.SignalServiceCipher
import org.whispersystems.signalservice.api.push.SignalServiceAddress
import org.whispersystems.signalservice.api.{SignalServiceMessagePipe, SignalServiceMessageReceiver}
import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
/**
* receives messages on a separate thread and hands them on to a message handler for further processing
*/
case class MessageReceiver(cipher: SignalServiceCipher,
messageReceiver: SignalServiceMessageReceiver,
messageHandler: MessageHandler,
eventPublisher: EventPublisher,
timeoutMillis: Long) extends Logging {
val threadFactory = new ThreadFactory {
override def newThread(r: Runnable): Thread = new Thread(r, "signal-desktop-message-receiver")
}
val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1, threadFactory))
val keepOnRockin: AtomicBoolean = new AtomicBoolean(true)
logger.info("start receiving messages")
ec.execute(new Runnable {
override def run(): Unit = {
receiveMessages()
}
})
def receiveMessages(): Unit = {
val messagePipe = messageReceiver.createMessagePipe()
try{
doReceiveMessages(messagePipe)
} finally {
messagePipe.shutdown()
}
}
@tailrec
final def doReceiveMessages(pipe: SignalServiceMessagePipe): Unit = {
try {
val envelope = pipe.read(timeoutMillis, TimeUnit.MILLISECONDS)
if (envelope.isReceipt) {
// TODO: handle
logger.debug(s"received receipt from ${envelope.getSource}")
// not handled yet, just an example, maybe we don't need an event here
eventPublisher.publishEvent(ReceiptEvent.fromEnevelope(envelope))
} else if (envelope.isPreKeySignalMessage() || envelope.isSignalMessage()){
//TODO: check that recipient is not blocked
//TODO: send recipient
val content = cipher.decrypt(envelope)
if (content.getDataMessage.isPresent) {
messageHandler.handleDataMessage(envelope, content.getDataMessage.get())
} else if (content.getSyncMessage.isPresent) {
messageHandler.handleSyncMessage(envelope, content.getSyncMessage.get())
} else if (envelope.isPreKeySignalMessage) {
logger.debug("received prekey signal message")
eventPublisher.publishEvent(PreKeyEvent(envelope, content))
} else {
logger.warn("Got unrecognized message...")
}
} else{
logger.error(s"Received envelope of unknown type: ${envelope.getType()}")
}
} catch {
case te: TimeoutException =>
logger.debug(s"timeout waiting for messages...")
case e: InvalidVersionException =>
logger.warn("invalid version", e)
// TODO: what would moxie do?
case e @ (_:InvalidMessageException | _:InvalidKeyIdException | _:InvalidKeyException) =>
logger.warn("corrupt message", e)
// TODO: what would moxie do?
case nse: NoSessionException =>
logger.warn("no session for this message", nse)
// TODO: what would moxie do?
case lme: LegacyMessageException =>
logger.warn("legacy message", lme)
// TODO: what would moxie do?
case dme: DuplicateMessageException =>
logger.warn("duplicate message", dme)
// TODO: what would moxie do?
case uie: UntrustedIdentityException =>
logger.warn("untrusted identity", uie)
// TODO: what would moxie do?
}
if (keepOnRockin.get()) {
doReceiveMessages(pipe)
} else {
logger.info("stopped receiving messages.")
}
}
}
object MessageReceiver {
def initialize(context: ApplicationContext): MessageReceiver = {
val data: Registration = context.protocolStore.getRegistrationData()
val signalMessageReceiver: SignalServiceMessageReceiver = new SignalServiceMessageReceiver(
Constants.SERVICE_URLS,
data.userName,
data.password,
data.deviceId,
data.signalingKey,
Constants.USER_AGENT
)
val messageHandler = new SignalDesktopMessageHandler(
context.applicationStore,
signalMessageReceiver,
context
)
val signalServiceCipher = new SignalServiceCipher(new SignalServiceAddress(data.userName), context.protocolStore)
MessageReceiver(
signalServiceCipher,
signalMessageReceiver,
messageHandler,
context.asInstanceOf[EventPublisher],
10 * 1000L
)
}
}