Skip to content

Commit fed39c7

Browse files
committed
Retrieve onchain feerate when we connect to an electrum server
When we connect to an electrum sever we perform a "handshake" that includes exchanging protocol version messages and retrieving the server's current tip, and now we also retrieve onchain fees. We also add an extra, optional CoroutineExceptionHandler to ElectrumClient's constructor, which can be used for testing or to specify a different behaviour to the one that is currently hard-coded. Since this is done during the connection handshake, errors will be caught by the corouting exception handler that we use in the client and will not crash the application.
1 parent 7091e9a commit fed39c7

File tree

4 files changed

+133
-46
lines changed

4 files changed

+133
-46
lines changed

src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package fr.acinq.lightning.blockchain.electrum
22

33
import fr.acinq.bitcoin.*
4+
import fr.acinq.lightning.blockchain.fee.FeeratePerByte
5+
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
6+
import fr.acinq.lightning.blockchain.fee.OnChainFeerates
47
import fr.acinq.lightning.io.TcpSocket
5-
import fr.acinq.lightning.io.linesFlow
68
import fr.acinq.lightning.io.send
79
import fr.acinq.lightning.utils.*
810
import kotlinx.coroutines.*
@@ -23,13 +25,14 @@ sealed interface ElectrumClientCommand {
2325
sealed interface ElectrumConnectionStatus {
2426
data class Closed(val reason: TcpSocket.IOException?) : ElectrumConnectionStatus
2527
object Connecting : ElectrumConnectionStatus
26-
data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader) : ElectrumConnectionStatus
28+
data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader, val onchainFeeRates: OnChainFeerates) : ElectrumConnectionStatus
2729
}
2830

2931
class ElectrumClient(
3032
socketBuilder: TcpSocket.Builder?,
3133
scope: CoroutineScope,
32-
private val loggerFactory: LoggerFactory
34+
private val loggerFactory: LoggerFactory,
35+
defaultExceptionHandler: CoroutineExceptionHandler? = null
3336
) : CoroutineScope by scope, IElectrumClient {
3437

3538
private val logger = loggerFactory.newLogger(this::class)
@@ -98,9 +101,11 @@ class ElectrumClient(
98101
}
99102
}
100103

101-
private fun establishConnection(serverAddress: ServerAddress) = launch(CoroutineExceptionHandler { _, exception ->
102-
logger.error(exception) { "error starting electrum client: " }
103-
}) {
104+
val exceptionHandler = defaultExceptionHandler ?: CoroutineExceptionHandler { _, exception ->
105+
logger.error(exception) { "error starting electrum client" }
106+
}
107+
108+
private fun establishConnection(serverAddress: ServerAddress) = launch(exceptionHandler) {
104109
_connectionStatus.value = ElectrumConnectionStatus.Connecting
105110
val socket: TcpSocket = try {
106111
val (host, port, tls) = serverAddress
@@ -138,22 +143,41 @@ class ElectrumClient(
138143
}
139144

140145
val flow = socket.linesFlow().map { json.decodeFromString(ElectrumResponseDeserializer, it) }
141-
val version = ServerVersion()
142-
sendRequest(version, 0)
143146
val rpcFlow = flow.filterIsInstance<Either.Right<Nothing, JsonRPCResponse>>().map { it.value }
147+
var requestId = 0
148+
149+
val version = ServerVersion()
150+
sendRequest(version, requestId++)
144151
val theirVersion = parseJsonResponse(version, rpcFlow.first())
145152
require(theirVersion is ServerVersionResponse) { "invalid server version response $theirVersion" }
146153
logger.info { "server version $theirVersion" }
147-
sendRequest(HeaderSubscription, 0)
154+
155+
sendRequest(HeaderSubscription, requestId++)
148156
val header = parseJsonResponse(HeaderSubscription, rpcFlow.first())
149157
require(header is HeaderSubscriptionResponse) { "invalid header subscription response $header" }
158+
159+
suspend fun estimateFee(confirmations: Int): EstimateFeeResponse {
160+
val request = EstimateFees(confirmations)
161+
sendRequest(request, requestId++)
162+
val response = parseJsonResponse(request, rpcFlow.first())
163+
require(response is EstimateFeeResponse) { "invalid estimatefee response $response" }
164+
return response
165+
}
166+
167+
val fees = listOf(estimateFee(2), estimateFee(6), estimateFee(18), estimateFee(144))
168+
logger.info { "onchain fees $fees" }
169+
val feeRates = OnChainFeerates(
170+
fundingFeerate = fees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
171+
mutualCloseFeerate = fees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
172+
claimMainFeerate = fees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
173+
fastFeerate = fees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
174+
)
150175
_notifications.emit(header)
151-
_connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header)
176+
_connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header, feeRates)
152177
logger.info { "server tip $header" }
153178

154179
// pending requests map
155180
val requestMap = mutableMapOf<Int, Pair<ElectrumRequest, CompletableDeferred<ElectrumResponse>>>()
156-
var requestId = 0
157181

158182
// reset mailbox
159183
mailbox.cancel(CancellationException("connection in progress"))

src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,11 @@ class Peer(
193193
}
194194
}
195195
launch {
196-
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect {
197-
// onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED
198-
// since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough.
199-
// (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis)
200-
updateEstimateFees()
196+
watcher.client.connectionStatus.filterIsInstance<ElectrumConnectionStatus.Connected>().collect {
197+
// Onchain fees are retrieved once when we establish a connection to an electrum server.
198+
// It is acceptable since the application will typically not be running more than a few minutes at a time.
199+
// (for a node that is online most of the time things would be different, and we would need to re-evaluate onchain fee estimates on a regular basis)
200+
onChainFeeratesFlow.value = it.onchainFeeRates
201201
}
202202
}
203203
launch {
@@ -255,24 +255,6 @@ class Peer(
255255
}
256256
}
257257

258-
private suspend fun updateEstimateFees() {
259-
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first()
260-
val sortedFees = listOf(
261-
watcher.client.estimateFees(2),
262-
watcher.client.estimateFees(6),
263-
watcher.client.estimateFees(18),
264-
watcher.client.estimateFees(144),
265-
)
266-
logger.info { "on-chain fees: $sortedFees" }
267-
// TODO: If some feerates are null, we may implement a retry
268-
onChainFeeratesFlow.value = OnChainFeerates(
269-
fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
270-
mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
271-
claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
272-
fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
273-
)
274-
}
275-
276258
fun connect() {
277259
if (connectionState.value is Connection.CLOSED) establishConnection()
278260
else logger.warning { "Peer is already connecting / connected" }

src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,18 @@ interface TcpSocket {
2020
suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int)
2121
suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int
2222

23+
fun linesFlow(): Flow<String> {
24+
return flow {
25+
val buffer = ByteArray(8192)
26+
while (true) {
27+
val size = receiveAvailable(buffer)
28+
emit(buffer.subArray(size))
29+
}
30+
}
31+
.decodeToString()
32+
.splitByLines()
33+
}
34+
2335
suspend fun startTls(tls: TLS): TcpSocket
2436

2537
fun close()
@@ -69,11 +81,3 @@ suspend fun TcpSocket.receiveAvailable(buffer: ByteArray) = receiveAvailable(buf
6981
internal expect object PlatformSocketBuilder : TcpSocket.Builder
7082

7183
suspend fun TcpSocket.receiveFully(size: Int): ByteArray = ByteArray(size).also { receiveFully(it) }
72-
73-
fun TcpSocket.linesFlow(): Flow<String> = flow {
74-
val buffer = ByteArray(8192)
75-
while (true) {
76-
val size = receiveAvailable(buffer)
77-
emit(buffer.subArray(size))
78-
}
79-
}.decodeToString().splitByLines()

src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,20 @@ package fr.acinq.lightning.blockchain.electrum
22

33
import fr.acinq.bitcoin.*
44
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
5+
import fr.acinq.lightning.io.TcpSocket
56
import fr.acinq.lightning.tests.utils.LightningTestSuite
67
import fr.acinq.lightning.tests.utils.runSuspendTest
78
import fr.acinq.lightning.utils.Connection
9+
import fr.acinq.lightning.utils.ServerAddress
810
import fr.acinq.lightning.utils.toByteVector32
911
import fr.acinq.secp256k1.Hex
10-
import kotlinx.coroutines.CoroutineScope
11-
import kotlinx.coroutines.flow.first
12-
import kotlinx.coroutines.joinAll
13-
import kotlinx.coroutines.launch
12+
import kotlinx.coroutines.*
13+
import kotlinx.coroutines.flow.*
14+
import kotlinx.serialization.json.Json
15+
import kotlinx.serialization.json.jsonObject
16+
import kotlinx.serialization.json.jsonPrimitive
17+
import org.kodein.log.LoggerFactory
18+
import org.kodein.log.newLogger
1419
import kotlin.test.*
1520
import kotlin.time.Duration.Companion.seconds
1621

@@ -177,4 +182,76 @@ class ElectrumClientTest : LightningTestSuite() {
177182

178183
client.stop()
179184
}
185+
186+
@OptIn(DelicateCoroutinesApi::class)
187+
@Test
188+
fun `catch coroutine errors`() {
189+
val myCustomError = "this is a test error"
190+
191+
class MyTcpSocket() : TcpSocket {
192+
val output = MutableSharedFlow<String>()
193+
override suspend fun send(bytes: ByteArray?, offset: Int, length: Int, flush: Boolean) {
194+
if (bytes != null) {
195+
CoroutineScope(Dispatchers.IO).launch {
196+
val encoded = bytes.decodeToString(offset, offset + length)
197+
val request = Json.parseToJsonElement(encoded)
198+
val response = when (request.jsonObject["method"]!!.jsonPrimitive.content) {
199+
"server.version" -> """{"jsonrpc": "2.0", "result": ["ElectrumX 1.15.0", "1.4"], "id": 0}"""
200+
"blockchain.headers.subscribe" -> """{"jsonrpc": "2.0", "result": {"hex": "000080209a35ef4422bc37b0e1c3df9d32cfaaef6a6d31047c0202000000000000000000b9f14c32922d305844c739829ef13df9d188953e74a392720c02eeadd93acbf9ae22a464be8e05174bc5c367", "height": 797144}, "id": 1}"""
201+
"blockchain.estimatefee" -> """{"jsonrpc": "2.0", "error": {"code": 42, "message": "$myCustomError"}, "id": 2}""" // we return an error, as if estimatefee had failed
202+
else -> """{"jsonrpc": "2.0", "error": {"code": 43, "message": "unhandled request"}, "id": 2}"""
203+
}
204+
output.emit(response)
205+
}
206+
}
207+
}
208+
209+
override suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int) = TODO("Not yet implemented")
210+
override suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int = TODO("Not yet implemented")
211+
override suspend fun startTls(tls: TcpSocket.TLS): TcpSocket = TODO("Not yet implemented")
212+
override fun close() {}
213+
override fun linesFlow(): Flow<String> = output.asSharedFlow()
214+
}
215+
216+
class MyBuilder() : TcpSocket.Builder {
217+
override suspend fun connect(host: String, port: Int, tls: TcpSocket.TLS, loggerFactory: LoggerFactory): TcpSocket {
218+
return MyTcpSocket()
219+
}
220+
}
221+
222+
val errorFlow = MutableStateFlow<Throwable?>(null)
223+
val loggerFactory = LoggerFactory.default
224+
val logger = loggerFactory.newLogger(this::class)
225+
val myErrorHandler = CoroutineExceptionHandler { _, e ->
226+
logger.error(e) { "error caught in custom exception handler" }
227+
errorFlow.value = e
228+
}
229+
230+
runBlocking(Dispatchers.IO) {
231+
withTimeout(15.seconds) {
232+
val builder = MyBuilder()
233+
// from Kotlin's documentation:
234+
// all children coroutines (coroutines created in the context of another Job) delegate handling of their exceptions to their parent coroutine, which
235+
// also delegates to the parent, and so on until the root, so the CoroutineExceptionHandler installed in their context is never used
236+
// => here we need to create a new root scope otherwise our exception handler will not be used
237+
val client = ElectrumClient(builder, GlobalScope, LoggerFactory.default, myErrorHandler)
238+
client.connect(ServerAddress("my-test-node", 50002, TcpSocket.TLS.DISABLED)) // address and port do not matter, but we cannot use TLS (not implemented, see above)
239+
errorFlow.filterNotNull().first { it.message!!.contains(myCustomError) }
240+
client.stop()
241+
}
242+
243+
// if we use runBlocking's scope, our exception handler will not be used
244+
errorFlow.value = null
245+
val error = assertFails {
246+
withTimeout(15.seconds) {
247+
val builder = MyBuilder()
248+
val client = ElectrumClient(builder, this, LoggerFactory.default, myErrorHandler)
249+
client.connect(ServerAddress("my-test-node", 50002, TcpSocket.TLS.DISABLED)) // address and port do not matter, but we cannot use TLS (not implemented, see above)
250+
errorFlow.filterNotNull().first { it.message!!.contains(myCustomError) }
251+
client.stop()
252+
}
253+
}
254+
assertTrue(error.message!!.contains(myCustomError))
255+
}
256+
}
180257
}

0 commit comments

Comments
 (0)