diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala index 8d67fc700e..e47b9fdb7e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala @@ -80,7 +80,7 @@ object Databases extends Logging { def apply(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection, jdbcUrlFile_opt: Option[File]): SqliteDatabases = { jdbcUrlFile_opt.foreach(checkIfDatabaseUrlIsUnchanged("sqlite", _)) // We check whether the node operator needs to run an intermediate eclair version first. - using(eclairJdbc.createStatement(), inTransaction = true) { statement => checkChannelsDbVersion(statement, SqliteChannelsDb.DB_NAME, minimum = 7) } + using(eclairJdbc.createStatement(), inTransaction = true) { statement => checkChannelsDbVersion(statement, SqliteChannelsDb.DB_NAME, isSqlite = true) } SqliteDatabases( network = new SqliteNetworkDb(networkJdbc), liquidity = new SqliteLiquidityDb(eclairJdbc), @@ -157,7 +157,7 @@ object Databases extends Logging { // We check whether the node operator needs to run an intermediate eclair version first. PgUtils.inTransaction { connection => - using(connection.createStatement()) { statement => checkChannelsDbVersion(statement, PgChannelsDb.DB_NAME, minimum = 11) } + using(connection.createStatement()) { statement => checkChannelsDbVersion(statement, PgChannelsDb.DB_NAME, isSqlite = false) } } val databases = PostgresDatabases( diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/jdbc/JdbcUtils.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/jdbc/JdbcUtils.scala index 2c4b4c0715..a6792a7439 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/jdbc/JdbcUtils.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/jdbc/JdbcUtils.scala @@ -80,13 +80,32 @@ trait JdbcUtils { } /** - * We removed legacy channels codecs after the v0.13.0 eclair release, and migrated channels in that release. - * It is thus not possible to directly upgrade from an eclair version earlier than v0.13.0. - * We warn node operators that they must first run the v0.13.0 release to migrate their channel data. + * We made some changes in the v0.13.0 and v0.13.1 releases to allow deprecating legacy channel data and channel types. + * It is thus not possible to directly upgrade from an eclair version earlier than v0.13.x without going through some + * data migration code. + * + * In the v0.13.0 release, we: + * - introduced channel codecs v5 + * - migrated all channels to this codec version + * - incremented the channels DB version to 7 (sqlite) and 11 (postgres) + * + * In the v0.13.1 release, we: + * - refused to start if the channels DB version wasn't 7 (sqlite) or 11 (postgres), to force node operators to + * run the v0.13.0 release first to migrate their channels to channel codecs v5 + * - removed support for older channel codecs + * - moved closed channels to a dedicated DB table, that doesn't have a dependency on legacy channel types, to + * allow deprecating channel types that aren't used anymore + * - incremented the channels DB version to 8 (sqlite) and 12 (postgres) + * + * We warn node operators that they must first run the v0.13.x releases to migrate their channel data and prevent + * eclair from starting. */ - def checkChannelsDbVersion(statement: Statement, db_name: String, minimum: Int): Unit = { + def checkChannelsDbVersion(statement: Statement, db_name: String, isSqlite: Boolean): Unit = { + val eclair130 = if (isSqlite) 7 else 11 + val eclair131 = if (isSqlite) 8 else 12 getVersion(statement, db_name) match { - case Some(v) if v < minimum => throw new IllegalArgumentException("You are updating from a version of eclair older than v0.13.0: please update to the v0.13.0 release first to migrate your channel data, and afterwards you'll be able to update to the latest version.") + case Some(v) if v < eclair130 => throw new IllegalArgumentException("You are updating from a version of eclair older than v0.13.0: please update to the v0.13.0 release first to migrate your channel data, then to the v0.13.1 release to migrate your closed channels, and afterwards you'll be able to update to the latest version.") + case Some(v) if v < eclair131 => throw new IllegalArgumentException("You are updating from a version of eclair older than v0.13.1: please update to the v0.13.1 release first to migrate your closed channels, and afterwards you'll be able to update to the latest version.") case _ => () } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index f865191559..43f3d7617f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -30,7 +30,7 @@ import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated} import grizzled.slf4j.Logging import scodec.bits.BitVector -import java.sql.{Connection, Statement, Timestamp} +import java.sql.{Connection, Timestamp} import java.time.Instant import javax.sql.DataSource @@ -49,88 +49,6 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit inTransaction { pg => using(pg.createStatement()) { statement => - /** - * Before version 12, closed channels were directly kept in the local_channels table with an is_closed flag set to true. - * We move them to a dedicated table, where we keep minimal channel information. - */ - def migration1112(statement: Statement): Unit = { - // We start by dropping for foreign key constraint on htlc_infos, otherwise we won't be able to move recently - // closed channels to a different table. - statement.executeQuery("SELECT conname FROM pg_catalog.pg_constraint WHERE contype = 'f'").map(rs => rs.getString("conname")).headOption match { - case Some(foreignKeyConstraint) => statement.executeUpdate(s"ALTER TABLE local.htlc_infos DROP CONSTRAINT $foreignKeyConstraint") - case None => logger.warn("couldn't find foreign key constraint for htlc_infos table: DB migration may fail") - } - // We can now move closed channels to a dedicated table. - statement.executeUpdate("CREATE TABLE local.channels_closed (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, funding_txid TEXT NOT NULL, funding_output_index BIGINT NOT NULL, funding_tx_index BIGINT NOT NULL, funding_key_path TEXT NOT NULL, channel_features TEXT NOT NULL, is_channel_opener BOOLEAN NOT NULL, commitment_format TEXT NOT NULL, announced BOOLEAN NOT NULL, capacity_satoshis BIGINT NOT NULL, closing_txid TEXT NOT NULL, closing_type TEXT NOT NULL, closing_script TEXT NOT NULL, local_balance_msat BIGINT NOT NULL, remote_balance_msat BIGINT NOT NULL, closing_amount_satoshis BIGINT NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL, closed_at TIMESTAMP WITH TIME ZONE NOT NULL)") - statement.executeUpdate("CREATE INDEX channels_closed_remote_node_id_idx ON local.channels_closed(remote_node_id)") - // We migrate closed channels from the local_channels table to the new channels_closed table, whenever possible. - val insertStatement = pg.prepareStatement("INSERT INTO local.channels_closed VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") - val batchSize = 50 - using(pg.prepareStatement("SELECT channel_id, data, is_closed, created_timestamp, closed_timestamp FROM local.channels WHERE is_closed=TRUE")) { queryStatement => - val rs = queryStatement.executeQuery() - var inserted = 0 - var batchCount = 0 - while (rs.next()) { - val channelId = rs.getByteVector32FromHex("channel_id") - val data_opt = channelDataCodec.decode(BitVector(rs.getBytes("data"))).require.value match { - case d: DATA_NEGOTIATING_SIMPLE => - // We didn't store which closing transaction actually confirmed, so we select the most likely one. - // The simple_close feature wasn't widely supported before this migration, so this shouldn't affect a lot of channels. - val closingTx = d.publishedClosingTxs.lastOption.getOrElse(d.proposedClosingTxs.last.preferred_opt.get) - Some(DATA_CLOSED(d, closingTx)) - case d: DATA_CLOSING => - Helpers.Closing.isClosingTypeAlreadyKnown(d) match { - case Some(closingType) => Some(DATA_CLOSED(d, closingType)) - // If the closing type cannot be inferred from the stored data, it must be a mutual close. - // In that case, we didn't store which closing transaction actually confirmed, so we select the most likely one. - case None if d.mutualClosePublished.nonEmpty => Some(DATA_CLOSED(d, Helpers.Closing.MutualClose(d.mutualClosePublished.last))) - case None => - logger.warn(s"cannot move channel_id=$channelId to the channels_closed table, unknown closing_type") - None - } - case d => - logger.warn(s"cannot move channel_id=$channelId to the channels_closed table (state=${d.getClass.getSimpleName})") - None - } - data_opt match { - case Some(data) => - insertStatement.setString(1, channelId.toHex) - insertStatement.setString(2, data.remoteNodeId.toHex) - insertStatement.setString(3, data.fundingTxId.value.toHex) - insertStatement.setLong(4, data.fundingOutputIndex) - insertStatement.setLong(5, data.fundingTxIndex) - insertStatement.setString(6, data.fundingKeyPath) - insertStatement.setString(7, data.channelFeatures) - insertStatement.setBoolean(8, data.isChannelOpener) - insertStatement.setString(9, data.commitmentFormat) - insertStatement.setBoolean(10, data.announced) - insertStatement.setLong(11, data.capacity.toLong) - insertStatement.setString(12, data.closingTxId.value.toHex) - insertStatement.setString(13, data.closingType) - insertStatement.setString(14, data.closingScript.toHex) - insertStatement.setLong(15, data.localBalance.toLong) - insertStatement.setLong(16, data.remoteBalance.toLong) - insertStatement.setLong(17, data.closingAmount.toLong) - insertStatement.setTimestamp(18, rs.getTimestampNullable("created_timestamp").getOrElse(Timestamp.from(Instant.ofEpochMilli(0)))) - insertStatement.setTimestamp(19, rs.getTimestampNullable("closed_timestamp").getOrElse(Timestamp.from(Instant.ofEpochMilli(0)))) - insertStatement.addBatch() - batchCount = batchCount + 1 - if (batchCount % batchSize == 0) { - inserted = inserted + insertStatement.executeBatch().sum - batchCount = 0 - } - case None => () - } - } - inserted = inserted + insertStatement.executeBatch().sum - logger.info(s"moved $inserted channels to the channels_closed table") - } - // We can now clean-up the active channels table. - statement.executeUpdate("DELETE FROM local.channels WHERE is_closed=TRUE") - statement.executeUpdate("ALTER TABLE local.channels DROP COLUMN is_closed") - statement.executeUpdate("ALTER TABLE local.channels DROP COLUMN closed_timestamp") - } - getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") @@ -147,10 +65,6 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON local.htlc_infos(channel_id)") statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON local.htlc_infos(commitment_number)") statement.executeUpdate("CREATE INDEX channels_closed_remote_node_id_idx ON local.channels_closed(remote_node_id)") - case Some(v) if v < 11 => throw new RuntimeException("You are updating from a version of eclair older than v0.13.0: please update to the v0.13.0 release first to migrate your channel data, and afterwards you'll be able to update to the latest version.") - case Some(v@11) => - logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") - if (v < 12) migration1112(statement) case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index 1f34b1c7c5..f2bb46bba4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -26,9 +26,8 @@ import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, TimestampMilli} import grizzled.slf4j.Logging -import scodec.bits.BitVector -import java.sql.{Connection, Statement} +import java.sql.Connection object SqliteChannelsDb { val DB_NAME = "channels" @@ -50,90 +49,6 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { statement.execute("PRAGMA foreign_keys = ON") } - /** - * Before version 8, closed channels were directly kept in the local_channels table with an is_closed flag set to true. - * We move them to a dedicated table, where we keep minimal channel information. - */ - def migration78(statement: Statement): Unit = { - // We start by dropping for foreign key constraint on htlc_infos, otherwise we won't be able to move recently - // closed channels to a different table. The only option for that in sqlite is to re-create the table. - statement.executeUpdate("ALTER TABLE htlc_infos RENAME TO htlc_infos_old") - statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL)") - statement.executeUpdate("INSERT INTO htlc_infos(channel_id, commitment_number, payment_hash, cltv_expiry) SELECT channel_id, commitment_number, payment_hash, cltv_expiry FROM htlc_infos_old") - statement.executeUpdate("DROP TABLE htlc_infos_old") - statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON htlc_infos(channel_id)") - statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON htlc_infos(commitment_number)") - // We can now move closed channels to a dedicated table. - statement.executeUpdate("CREATE TABLE local_channels_closed (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, funding_txid TEXT NOT NULL, funding_output_index INTEGER NOT NULL, funding_tx_index INTEGER NOT NULL, funding_key_path TEXT NOT NULL, channel_features TEXT NOT NULL, is_channel_opener BOOLEAN NOT NULL, commitment_format TEXT NOT NULL, announced BOOLEAN NOT NULL, capacity_satoshis INTEGER NOT NULL, closing_txid TEXT NOT NULL, closing_type TEXT NOT NULL, closing_script TEXT NOT NULL, local_balance_msat INTEGER NOT NULL, remote_balance_msat INTEGER NOT NULL, closing_amount_satoshis INTEGER NOT NULL, created_at INTEGER NOT NULL, closed_at INTEGER NOT NULL)") - statement.executeUpdate("CREATE INDEX local_channels_closed_remote_node_id_idx ON local_channels_closed(remote_node_id)") - // We migrate closed channels from the local_channels table to the new local_channels_closed table, whenever possible. - val insertStatement = sqlite.prepareStatement("INSERT INTO local_channels_closed VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") - val batchSize = 50 - using(sqlite.prepareStatement("SELECT channel_id, data, is_closed, created_timestamp, closed_timestamp FROM local_channels WHERE is_closed=1")) { queryStatement => - val rs = queryStatement.executeQuery() - var inserted = 0 - var batchCount = 0 - while (rs.next()) { - val channelId = rs.getByteVector32("channel_id") - val data_opt = channelDataCodec.decode(BitVector(rs.getBytes("data"))).require.value match { - case d: DATA_NEGOTIATING_SIMPLE => - // We didn't store which closing transaction actually confirmed, so we select the most likely one. - // The simple_close feature wasn't widely supported before this migration, so this shouldn't affect a lot of channels. - val closingTx = d.publishedClosingTxs.lastOption.getOrElse(d.proposedClosingTxs.last.preferred_opt.get) - Some(DATA_CLOSED(d, closingTx)) - case d: DATA_CLOSING => - Helpers.Closing.isClosingTypeAlreadyKnown(d) match { - case Some(closingType) => Some(DATA_CLOSED(d, closingType)) - // If the closing type cannot be inferred from the stored data, it must be a mutual close. - // In that case, we didn't store which closing transaction actually confirmed, so we select the most likely one. - case None if d.mutualClosePublished.nonEmpty => Some(DATA_CLOSED(d, Helpers.Closing.MutualClose(d.mutualClosePublished.last))) - case None => - logger.warn(s"cannot move channel_id=$channelId to the local_channels_closed table, unknown closing_type") - None - } - case d => - logger.warn(s"cannot move channel_id=$channelId to the local_channels_closed table (state=${d.getClass.getSimpleName})") - None - } - data_opt match { - case Some(data) => - insertStatement.setString(1, channelId.toHex) - insertStatement.setString(2, data.remoteNodeId.toHex) - insertStatement.setString(3, data.fundingTxId.value.toHex) - insertStatement.setLong(4, data.fundingOutputIndex) - insertStatement.setLong(5, data.fundingTxIndex) - insertStatement.setString(6, data.fundingKeyPath) - insertStatement.setString(7, data.channelFeatures) - insertStatement.setBoolean(8, data.isChannelOpener) - insertStatement.setString(9, data.commitmentFormat) - insertStatement.setBoolean(10, data.announced) - insertStatement.setLong(11, data.capacity.toLong) - insertStatement.setString(12, data.closingTxId.value.toHex) - insertStatement.setString(13, data.closingType) - insertStatement.setString(14, data.closingScript.toHex) - insertStatement.setLong(15, data.localBalance.toLong) - insertStatement.setLong(16, data.remoteBalance.toLong) - insertStatement.setLong(17, data.closingAmount.toLong) - insertStatement.setLong(18, rs.getLongNullable("created_timestamp").getOrElse(0)) - insertStatement.setLong(19, rs.getLongNullable("closed_timestamp").getOrElse(0)) - insertStatement.addBatch() - batchCount = batchCount + 1 - if (batchCount % batchSize == 0) { - inserted = inserted + insertStatement.executeBatch().sum - batchCount = 0 - } - case None => () - } - } - inserted = inserted + insertStatement.executeBatch().sum - logger.info(s"moved $inserted channels to the local_channels_closed table") - } - // We can now clean-up the active channels table. - statement.executeUpdate("DELETE FROM local_channels WHERE is_closed=1") - statement.executeUpdate("ALTER TABLE local_channels DROP COLUMN is_closed") - statement.executeUpdate("ALTER TABLE local_channels DROP COLUMN closed_timestamp") - } - using(sqlite.createStatement(), inTransaction = true) { statement => getVersion(statement, DB_NAME) match { case None => @@ -146,10 +61,6 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON htlc_infos(channel_id)") statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON htlc_infos(commitment_number)") statement.executeUpdate("CREATE INDEX local_channels_closed_remote_node_id_idx ON local_channels_closed(remote_node_id)") - case Some(v) if v < 7 => throw new RuntimeException("You are updating from a version of eclair older than v0.13.0: please update to the v0.13.0 release first to migrate your channel data, and afterwards you'll be able to update to the latest version.") - case Some(v@7) => - logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") - if (v < 8) migration78(statement) case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala index cd44ecfb43..86a328da73 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala @@ -18,24 +18,18 @@ package fr.acinq.eclair.db import com.softwaremill.quicklens._ import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey -import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, SatoshiLong, Script, Transaction, TxIn, TxOut} -import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases, migrationCheck} +import fr.acinq.bitcoin.scalacompat.{ByteVector32, SatoshiLong} +import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases} import fr.acinq.eclair.TestUtils.randomTxId -import fr.acinq.eclair.blockchain.fee.FeeratePerKw import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.ChannelsDbSpec.getTimestamp import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.jdbc.JdbcUtils.using import fr.acinq.eclair.db.pg.PgChannelsDb -import fr.acinq.eclair.db.pg.PgUtils.setVersion import fr.acinq.eclair.db.sqlite.SqliteChannelsDb import fr.acinq.eclair.db.sqlite.SqliteUtils.ExtendedResultSet._ -import fr.acinq.eclair.json.JsonSerializers -import fr.acinq.eclair.transactions.Transactions.{ClosingTx, InputInfo} -import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec -import fr.acinq.eclair.wire.protocol.Shutdown -import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiry, MilliSatoshiLong, TestDatabases, randomBytes32, randomKey, randomLong} +import fr.acinq.eclair.{Alias, CltvExpiry, MilliSatoshiLong, TestDatabases, randomBytes32, randomLong} import org.scalatest.funsuite.AnyFunSuite import scodec.bits.{ByteVector, HexStringSyntax} @@ -220,150 +214,6 @@ class ChannelsDbSpec extends AnyFunSuite { } } - test("migrate closed channels to dedicated table") { - def createCommitments(): Commitments = { - ChannelCodecsSpec.normal.commitments - .modify(_.channelParams.channelId).setTo(randomBytes32()) - .modify(_.channelParams.remoteParams.nodeId).setTo(randomKey().publicKey) - } - - def closingTx(): ClosingTx = { - val input = InputInfo(OutPoint(randomTxId(), 3), TxOut(300_000 sat, Script.pay2wpkh(randomKey().publicKey))) - val tx = Transaction(2, Seq(TxIn(input.outPoint, Nil, 0)), Seq(TxOut(120_000 sat, Script.pay2wpkh(randomKey().publicKey)), TxOut(175_000 sat, Script.pay2tr(randomKey().xOnlyPublicKey()))), 0) - ClosingTx(input, tx, Some(1)) - } - - val paymentHash1 = randomBytes32() - val paymentHash2 = randomBytes32() - // The next two channels are closed and should be migrated to the closed_channels table. - // We haven't yet removed their corresponding htlc_infos, because it is done asynchronously for performance reasons. - val closed1 = DATA_CLOSING(createCommitments(), BlockHeight(750_000), hex"deadbeef", closingTx() :: Nil, closingTx() :: Nil) - val closed2 = DATA_NEGOTIATING_SIMPLE(createCommitments(), FeeratePerKw(2500 sat), hex"deadbeef", hex"beefdead", Nil, closingTx() :: Nil) - val htlcInfos = Map( - closed1.channelId -> Seq( - (7, paymentHash1, CltvExpiry(800_000)), - (7, paymentHash2, CltvExpiry(795_000)), - (8, paymentHash1, CltvExpiry(800_000)), - ), - closed2.channelId -> Seq( - (13, paymentHash1, CltvExpiry(801_000)), - (14, paymentHash2, CltvExpiry(801_000)), - ) - ) - // The following channel is closed, but was never confirmed and thus doesn't need to be migrated to the closed_channels table. - val closed3 = DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(createCommitments(), 0 msat, 0 msat, BlockHeight(775_000), BlockHeight(780_000), DualFundingStatus.WaitingForConfirmations, None) - // The following channels aren't closed, and must stay in the channels table after the migration. - val notClosed1 = DATA_CLOSING(createCommitments(), BlockHeight(800_000), hex"deadbeef", closingTx() :: Nil, closingTx() :: Nil) - val notClosed2 = DATA_SHUTDOWN(createCommitments(), Shutdown(randomBytes32(), hex"deadbeef"), Shutdown(randomBytes32(), hex"beefdead"), CloseStatus.Initiator(Some(ClosingFeerates(FeeratePerKw(1500 sat), FeeratePerKw(1000 sat), FeeratePerKw(2500 sat))))) - - def postCheck(db: ChannelsDb): Unit = { - // The closed channels have been migrated to a dedicated DB. - assert(db.listClosedChannels(None, None).map(_.channelId).toSet == Set(closed1.channelId, closed2.channelId)) - // The remaining channels are still active. - assert(db.listLocalChannels().toSet == Set(notClosed1, notClosed2)) - // The corresponding htlc_infos hasn't been removed. - assert(db.listHtlcInfos(closed1.channelId, 7).toSet == Set((paymentHash1, CltvExpiry(800_000)), (paymentHash2, CltvExpiry(795_000)))) - assert(db.listHtlcInfos(closed1.channelId, 8).toSet == Set((paymentHash1, CltvExpiry(800_000)))) - assert(db.listHtlcInfos(closed2.channelId, 13).toSet == Set((paymentHash1, CltvExpiry(801_000)))) - assert(db.listHtlcInfos(closed2.channelId, 14).toSet == Set((paymentHash2, CltvExpiry(801_000)))) - } - - forAllDbs { - case dbs: TestPgDatabases => - migrationCheck( - dbs = dbs, - initializeTables = connection => { - // We initialize a v11 database, where closed channels were kept inside the channels table with an is_closed flag. - using(connection.createStatement()) { statement => - statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") - statement.executeUpdate("CREATE TABLE local.channels (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, data BYTEA NOT NULL, json JSONB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)") - statement.executeUpdate("CREATE TABLE local.htlc_infos (channel_id TEXT NOT NULL, commitment_number BIGINT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local.channels(channel_id))") - statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON local.htlc_infos(channel_id)") - statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON local.htlc_infos(commitment_number)") - setVersion(statement, PgChannelsDb.DB_NAME, 11) - } - // We insert some channels in our DB and htc info related to those channels. - Seq(closed1, closed2, closed3).foreach { c => - using(connection.prepareStatement("INSERT INTO local.channels (channel_id, remote_node_id, data, json, is_closed) VALUES (?, ?, ?, ?::JSONB, TRUE)")) { statement => - statement.setString(1, c.channelId.toHex) - statement.setString(2, c.remoteNodeId.toHex) - statement.setBytes(3, channelDataCodec.encode(c).require.toByteArray) - statement.setString(4, JsonSerializers.serialization.write(c)(JsonSerializers.formats)) - statement.executeUpdate() - } - } - Seq(notClosed1, notClosed2).foreach { c => - using(connection.prepareStatement("INSERT INTO local.channels (channel_id, remote_node_id, data, json, is_closed) VALUES (?, ?, ?, ?::JSONB, FALSE)")) { statement => - statement.setString(1, c.channelId.toHex) - statement.setString(2, c.remoteNodeId.toHex) - statement.setBytes(3, channelDataCodec.encode(c).require.toByteArray) - statement.setString(4, JsonSerializers.serialization.write(c)(JsonSerializers.formats)) - statement.executeUpdate() - } - } - htlcInfos.foreach { case (channelId, infos) => - infos.foreach { case (commitmentNumber, paymentHash, expiry) => - using(connection.prepareStatement("INSERT INTO local.htlc_infos VALUES (?, ?, ?, ?)")) { statement => - statement.setString(1, channelId.toHex) - statement.setLong(2, commitmentNumber) - statement.setString(3, paymentHash.toHex) - statement.setLong(4, expiry.toLong) - statement.executeUpdate() - } - } - } - }, - dbName = PgChannelsDb.DB_NAME, - targetVersion = PgChannelsDb.CURRENT_VERSION, - postCheck = _ => postCheck(dbs.channels) - ) - case dbs: TestSqliteDatabases => - migrationCheck( - dbs = dbs, - initializeTables = connection => { - // We initialize a v7 database, where closed channels were kept inside the channels table with an is_closed flag. - using(connection.createStatement()) { statement => - statement.execute("PRAGMA foreign_keys = ON") - statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") - statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") - statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON htlc_infos(channel_id)") - statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON htlc_infos(commitment_number)") - setVersion(statement, SqliteChannelsDb.DB_NAME, 7) - } - // We insert some channels in our DB and htc info related to those channels. - Seq(closed1, closed2, closed3).foreach { c => - using(connection.prepareStatement("INSERT INTO local_channels (channel_id, data, is_closed) VALUES (?, ?, 1)")) { statement => - statement.setBytes(1, c.channelId.toArray) - statement.setBytes(2, channelDataCodec.encode(c).require.toByteArray) - statement.executeUpdate() - } - } - Seq(notClosed1, notClosed2).foreach { c => - using(connection.prepareStatement("INSERT INTO local_channels (channel_id, data, is_closed) VALUES (?, ?, 0)")) { statement => - statement.setBytes(1, c.channelId.toArray) - statement.setBytes(2, channelDataCodec.encode(c).require.toByteArray) - statement.executeUpdate() - } - } - htlcInfos.foreach { case (channelId, infos) => - infos.foreach { case (commitmentNumber, paymentHash, expiry) => - using(connection.prepareStatement("INSERT INTO htlc_infos VALUES (?, ?, ?, ?)")) { statement => - statement.setBytes(1, channelId.toArray) - statement.setLong(2, commitmentNumber) - statement.setBytes(3, paymentHash.toArray) - statement.setLong(4, expiry.toLong) - statement.executeUpdate() - } - } - } - }, - dbName = SqliteChannelsDb.DB_NAME, - targetVersion = SqliteChannelsDb.CURRENT_VERSION, - postCheck = _ => postCheck(dbs.channels) - ) - } - } - test("json column reset (postgres)") { val dbs = TestPgDatabases() val db = dbs.channels