Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object Databases extends Logging {
def apply(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection, jdbcUrlFile_opt: Option[File]): SqliteDatabases = {
jdbcUrlFile_opt.foreach(checkIfDatabaseUrlIsUnchanged("sqlite", _))
// We check whether the node operator needs to run an intermediate eclair version first.
using(eclairJdbc.createStatement(), inTransaction = true) { statement => checkChannelsDbVersion(statement, SqliteChannelsDb.DB_NAME, minimum = 7) }
using(eclairJdbc.createStatement(), inTransaction = true) { statement => checkChannelsDbVersion(statement, SqliteChannelsDb.DB_NAME, isSqlite = true) }
SqliteDatabases(
network = new SqliteNetworkDb(networkJdbc),
liquidity = new SqliteLiquidityDb(eclairJdbc),
Expand Down Expand Up @@ -157,7 +157,7 @@ object Databases extends Logging {

// We check whether the node operator needs to run an intermediate eclair version first.
PgUtils.inTransaction { connection =>
using(connection.createStatement()) { statement => checkChannelsDbVersion(statement, PgChannelsDb.DB_NAME, minimum = 11) }
using(connection.createStatement()) { statement => checkChannelsDbVersion(statement, PgChannelsDb.DB_NAME, isSqlite = false) }
}

val databases = PostgresDatabases(
Expand Down
29 changes: 24 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/jdbc/JdbcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,32 @@ trait JdbcUtils {
}

/**
* We removed legacy channels codecs after the v0.13.0 eclair release, and migrated channels in that release.
* It is thus not possible to directly upgrade from an eclair version earlier than v0.13.0.
* We warn node operators that they must first run the v0.13.0 release to migrate their channel data.
* We made some changes in the v0.13.0 and v0.13.1 releases to allow deprecating legacy channel data and channel types.
* It is thus not possible to directly upgrade from an eclair version earlier than v0.13.x without going through some
* data migration code.
*
* In the v0.13.0 release, we:
* - introduced channel codecs v5
* - migrated all channels to this codec version
* - incremented the channels DB version to 7 (sqlite) and 11 (postgres)
*
* In the v0.13.1 release, we:
* - refused to start if the channels DB version wasn't 7 (sqlite) or 11 (postgres), to force node operators to
* run the v0.13.0 release first to migrate their channels to channel codecs v5
* - removed support for older channel codecs
* - moved closed channels to a dedicated DB table, that doesn't have a dependency on legacy channel types, to
* allow deprecating channel types that aren't used anymore
* - incremented the channels DB version to 8 (sqlite) and 12 (postgres)
*
* We warn node operators that they must first run the v0.13.x releases to migrate their channel data and prevent
* eclair from starting.
*/
def checkChannelsDbVersion(statement: Statement, db_name: String, minimum: Int): Unit = {
def checkChannelsDbVersion(statement: Statement, db_name: String, isSqlite: Boolean): Unit = {
val eclair130 = if (isSqlite) 7 else 11
val eclair131 = if (isSqlite) 8 else 12
getVersion(statement, db_name) match {
case Some(v) if v < minimum => throw new IllegalArgumentException("You are updating from a version of eclair older than v0.13.0: please update to the v0.13.0 release first to migrate your channel data, and afterwards you'll be able to update to the latest version.")
case Some(v) if v < eclair130 => throw new IllegalArgumentException("You are updating from a version of eclair older than v0.13.0: please update to the v0.13.0 release first to migrate your channel data, then to the v0.13.1 release to migrate your closed channels, and afterwards you'll be able to update to the latest version.")
case Some(v) if v < eclair131 => throw new IllegalArgumentException("You are updating from a version of eclair older than v0.13.1: please update to the v0.13.1 release first to migrate your closed channels, and afterwards you'll be able to update to the latest version.")
case _ => ()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated}
import grizzled.slf4j.Logging
import scodec.bits.BitVector

import java.sql.{Connection, Statement, Timestamp}
import java.sql.{Connection, Timestamp}
import java.time.Instant
import javax.sql.DataSource

Expand All @@ -49,88 +49,6 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit

inTransaction { pg =>
using(pg.createStatement()) { statement =>
/**
* Before version 12, closed channels were directly kept in the local_channels table with an is_closed flag set to true.
* We move them to a dedicated table, where we keep minimal channel information.
*/
def migration1112(statement: Statement): Unit = {
// We start by dropping for foreign key constraint on htlc_infos, otherwise we won't be able to move recently
// closed channels to a different table.
statement.executeQuery("SELECT conname FROM pg_catalog.pg_constraint WHERE contype = 'f'").map(rs => rs.getString("conname")).headOption match {
case Some(foreignKeyConstraint) => statement.executeUpdate(s"ALTER TABLE local.htlc_infos DROP CONSTRAINT $foreignKeyConstraint")
case None => logger.warn("couldn't find foreign key constraint for htlc_infos table: DB migration may fail")
}
// We can now move closed channels to a dedicated table.
statement.executeUpdate("CREATE TABLE local.channels_closed (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, funding_txid TEXT NOT NULL, funding_output_index BIGINT NOT NULL, funding_tx_index BIGINT NOT NULL, funding_key_path TEXT NOT NULL, channel_features TEXT NOT NULL, is_channel_opener BOOLEAN NOT NULL, commitment_format TEXT NOT NULL, announced BOOLEAN NOT NULL, capacity_satoshis BIGINT NOT NULL, closing_txid TEXT NOT NULL, closing_type TEXT NOT NULL, closing_script TEXT NOT NULL, local_balance_msat BIGINT NOT NULL, remote_balance_msat BIGINT NOT NULL, closing_amount_satoshis BIGINT NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL, closed_at TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE INDEX channels_closed_remote_node_id_idx ON local.channels_closed(remote_node_id)")
// We migrate closed channels from the local_channels table to the new channels_closed table, whenever possible.
val insertStatement = pg.prepareStatement("INSERT INTO local.channels_closed VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
val batchSize = 50
using(pg.prepareStatement("SELECT channel_id, data, is_closed, created_timestamp, closed_timestamp FROM local.channels WHERE is_closed=TRUE")) { queryStatement =>
val rs = queryStatement.executeQuery()
var inserted = 0
var batchCount = 0
while (rs.next()) {
val channelId = rs.getByteVector32FromHex("channel_id")
val data_opt = channelDataCodec.decode(BitVector(rs.getBytes("data"))).require.value match {
case d: DATA_NEGOTIATING_SIMPLE =>
// We didn't store which closing transaction actually confirmed, so we select the most likely one.
// The simple_close feature wasn't widely supported before this migration, so this shouldn't affect a lot of channels.
val closingTx = d.publishedClosingTxs.lastOption.getOrElse(d.proposedClosingTxs.last.preferred_opt.get)
Some(DATA_CLOSED(d, closingTx))
case d: DATA_CLOSING =>
Helpers.Closing.isClosingTypeAlreadyKnown(d) match {
case Some(closingType) => Some(DATA_CLOSED(d, closingType))
// If the closing type cannot be inferred from the stored data, it must be a mutual close.
// In that case, we didn't store which closing transaction actually confirmed, so we select the most likely one.
case None if d.mutualClosePublished.nonEmpty => Some(DATA_CLOSED(d, Helpers.Closing.MutualClose(d.mutualClosePublished.last)))
case None =>
logger.warn(s"cannot move channel_id=$channelId to the channels_closed table, unknown closing_type")
None
}
case d =>
logger.warn(s"cannot move channel_id=$channelId to the channels_closed table (state=${d.getClass.getSimpleName})")
None
}
data_opt match {
case Some(data) =>
insertStatement.setString(1, channelId.toHex)
insertStatement.setString(2, data.remoteNodeId.toHex)
insertStatement.setString(3, data.fundingTxId.value.toHex)
insertStatement.setLong(4, data.fundingOutputIndex)
insertStatement.setLong(5, data.fundingTxIndex)
insertStatement.setString(6, data.fundingKeyPath)
insertStatement.setString(7, data.channelFeatures)
insertStatement.setBoolean(8, data.isChannelOpener)
insertStatement.setString(9, data.commitmentFormat)
insertStatement.setBoolean(10, data.announced)
insertStatement.setLong(11, data.capacity.toLong)
insertStatement.setString(12, data.closingTxId.value.toHex)
insertStatement.setString(13, data.closingType)
insertStatement.setString(14, data.closingScript.toHex)
insertStatement.setLong(15, data.localBalance.toLong)
insertStatement.setLong(16, data.remoteBalance.toLong)
insertStatement.setLong(17, data.closingAmount.toLong)
insertStatement.setTimestamp(18, rs.getTimestampNullable("created_timestamp").getOrElse(Timestamp.from(Instant.ofEpochMilli(0))))
insertStatement.setTimestamp(19, rs.getTimestampNullable("closed_timestamp").getOrElse(Timestamp.from(Instant.ofEpochMilli(0))))
insertStatement.addBatch()
batchCount = batchCount + 1
if (batchCount % batchSize == 0) {
inserted = inserted + insertStatement.executeBatch().sum
batchCount = 0
}
case None => ()
}
}
inserted = inserted + insertStatement.executeBatch().sum
logger.info(s"moved $inserted channels to the channels_closed table")
}
// We can now clean-up the active channels table.
statement.executeUpdate("DELETE FROM local.channels WHERE is_closed=TRUE")
statement.executeUpdate("ALTER TABLE local.channels DROP COLUMN is_closed")
statement.executeUpdate("ALTER TABLE local.channels DROP COLUMN closed_timestamp")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local")
Expand All @@ -147,10 +65,6 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON local.htlc_infos(channel_id)")
statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON local.htlc_infos(commitment_number)")
statement.executeUpdate("CREATE INDEX channels_closed_remote_node_id_idx ON local.channels_closed(remote_node_id)")
case Some(v) if v < 11 => throw new RuntimeException("You are updating from a version of eclair older than v0.13.0: please update to the v0.13.0 release first to migrate your channel data, and afterwards you'll be able to update to the latest version.")
case Some(v@11) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
if (v < 12) migration1112(statement)
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, TimestampMilli}
import grizzled.slf4j.Logging
import scodec.bits.BitVector

import java.sql.{Connection, Statement}
import java.sql.Connection

object SqliteChannelsDb {
val DB_NAME = "channels"
Expand All @@ -50,90 +49,6 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {
statement.execute("PRAGMA foreign_keys = ON")
}

/**
* Before version 8, closed channels were directly kept in the local_channels table with an is_closed flag set to true.
* We move them to a dedicated table, where we keep minimal channel information.
*/
def migration78(statement: Statement): Unit = {
// We start by dropping for foreign key constraint on htlc_infos, otherwise we won't be able to move recently
// closed channels to a different table. The only option for that in sqlite is to re-create the table.
statement.executeUpdate("ALTER TABLE htlc_infos RENAME TO htlc_infos_old")
statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL)")
statement.executeUpdate("INSERT INTO htlc_infos(channel_id, commitment_number, payment_hash, cltv_expiry) SELECT channel_id, commitment_number, payment_hash, cltv_expiry FROM htlc_infos_old")
statement.executeUpdate("DROP TABLE htlc_infos_old")
statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON htlc_infos(channel_id)")
statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON htlc_infos(commitment_number)")
// We can now move closed channels to a dedicated table.
statement.executeUpdate("CREATE TABLE local_channels_closed (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, funding_txid TEXT NOT NULL, funding_output_index INTEGER NOT NULL, funding_tx_index INTEGER NOT NULL, funding_key_path TEXT NOT NULL, channel_features TEXT NOT NULL, is_channel_opener BOOLEAN NOT NULL, commitment_format TEXT NOT NULL, announced BOOLEAN NOT NULL, capacity_satoshis INTEGER NOT NULL, closing_txid TEXT NOT NULL, closing_type TEXT NOT NULL, closing_script TEXT NOT NULL, local_balance_msat INTEGER NOT NULL, remote_balance_msat INTEGER NOT NULL, closing_amount_satoshis INTEGER NOT NULL, created_at INTEGER NOT NULL, closed_at INTEGER NOT NULL)")
statement.executeUpdate("CREATE INDEX local_channels_closed_remote_node_id_idx ON local_channels_closed(remote_node_id)")
// We migrate closed channels from the local_channels table to the new local_channels_closed table, whenever possible.
val insertStatement = sqlite.prepareStatement("INSERT INTO local_channels_closed VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
val batchSize = 50
using(sqlite.prepareStatement("SELECT channel_id, data, is_closed, created_timestamp, closed_timestamp FROM local_channels WHERE is_closed=1")) { queryStatement =>
val rs = queryStatement.executeQuery()
var inserted = 0
var batchCount = 0
while (rs.next()) {
val channelId = rs.getByteVector32("channel_id")
val data_opt = channelDataCodec.decode(BitVector(rs.getBytes("data"))).require.value match {
case d: DATA_NEGOTIATING_SIMPLE =>
// We didn't store which closing transaction actually confirmed, so we select the most likely one.
// The simple_close feature wasn't widely supported before this migration, so this shouldn't affect a lot of channels.
val closingTx = d.publishedClosingTxs.lastOption.getOrElse(d.proposedClosingTxs.last.preferred_opt.get)
Some(DATA_CLOSED(d, closingTx))
case d: DATA_CLOSING =>
Helpers.Closing.isClosingTypeAlreadyKnown(d) match {
case Some(closingType) => Some(DATA_CLOSED(d, closingType))
// If the closing type cannot be inferred from the stored data, it must be a mutual close.
// In that case, we didn't store which closing transaction actually confirmed, so we select the most likely one.
case None if d.mutualClosePublished.nonEmpty => Some(DATA_CLOSED(d, Helpers.Closing.MutualClose(d.mutualClosePublished.last)))
case None =>
logger.warn(s"cannot move channel_id=$channelId to the local_channels_closed table, unknown closing_type")
None
}
case d =>
logger.warn(s"cannot move channel_id=$channelId to the local_channels_closed table (state=${d.getClass.getSimpleName})")
None
}
data_opt match {
case Some(data) =>
insertStatement.setString(1, channelId.toHex)
insertStatement.setString(2, data.remoteNodeId.toHex)
insertStatement.setString(3, data.fundingTxId.value.toHex)
insertStatement.setLong(4, data.fundingOutputIndex)
insertStatement.setLong(5, data.fundingTxIndex)
insertStatement.setString(6, data.fundingKeyPath)
insertStatement.setString(7, data.channelFeatures)
insertStatement.setBoolean(8, data.isChannelOpener)
insertStatement.setString(9, data.commitmentFormat)
insertStatement.setBoolean(10, data.announced)
insertStatement.setLong(11, data.capacity.toLong)
insertStatement.setString(12, data.closingTxId.value.toHex)
insertStatement.setString(13, data.closingType)
insertStatement.setString(14, data.closingScript.toHex)
insertStatement.setLong(15, data.localBalance.toLong)
insertStatement.setLong(16, data.remoteBalance.toLong)
insertStatement.setLong(17, data.closingAmount.toLong)
insertStatement.setLong(18, rs.getLongNullable("created_timestamp").getOrElse(0))
insertStatement.setLong(19, rs.getLongNullable("closed_timestamp").getOrElse(0))
insertStatement.addBatch()
batchCount = batchCount + 1
if (batchCount % batchSize == 0) {
inserted = inserted + insertStatement.executeBatch().sum
batchCount = 0
}
case None => ()
}
}
inserted = inserted + insertStatement.executeBatch().sum
logger.info(s"moved $inserted channels to the local_channels_closed table")
}
// We can now clean-up the active channels table.
statement.executeUpdate("DELETE FROM local_channels WHERE is_closed=1")
statement.executeUpdate("ALTER TABLE local_channels DROP COLUMN is_closed")
statement.executeUpdate("ALTER TABLE local_channels DROP COLUMN closed_timestamp")
}

using(sqlite.createStatement(), inTransaction = true) { statement =>
getVersion(statement, DB_NAME) match {
case None =>
Expand All @@ -146,10 +61,6 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {
statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON htlc_infos(channel_id)")
statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON htlc_infos(commitment_number)")
statement.executeUpdate("CREATE INDEX local_channels_closed_remote_node_id_idx ON local_channels_closed(remote_node_id)")
case Some(v) if v < 7 => throw new RuntimeException("You are updating from a version of eclair older than v0.13.0: please update to the v0.13.0 release first to migrate your channel data, and afterwards you'll be able to update to the latest version.")
case Some(v@7) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
if (v < 8) migration78(statement)
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down
Loading