-
Notifications
You must be signed in to change notification settings - Fork 234
Expand file tree
/
Copy pathmysql2pg.go
More file actions
398 lines (361 loc) · 11.3 KB
/
mysql2pg.go
File metadata and controls
398 lines (361 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
// migrate-mysql-to-postgres copies Ergo's history data from a MySQL database
// to a PostgreSQL database, reading connection details from an Ergo config file
// with both databases populated in `datastore` (neither database needs `enabled: true`
// to work.) It creates the PostgreSQL schema from scratch, then copies all data.
//
// WARNINGS:
// 1. This was tested with Ergo v2.18.0 only
// 2. It will destroy any preexisting data on the PostgreSQL side
// 3. It should not modify any data on the MySQL side, but make a backup first anyway
// 4. For best results, quiesce MySQL by stopping Ergo before running the migration
//
// Usage:
//
// go run ./distrib/postgresql/mysql2pg -config /path/to/ircd.yaml
package main
import (
"database/sql"
"flag"
"fmt"
"log"
"net/url"
"os"
_ "github.com/go-sql-driver/mysql"
_ "github.com/jackc/pgx/v5/stdlib"
"gopkg.in/yaml.v2"
)
const (
maxTargetLength = 64 // copied from irc/postgresql/config.go
latestSchema = "2" // copied from irc/postgresql/history.go
latestMinorSchema = "2"
batchSize = 1000
)
// Minimal config structs matching the ergo YAML structure.
// Field names are lowercased by yaml.v2 when no tag is present.
type mysqlConfig struct {
Host string
Port int
SocketPath string `yaml:"socket-path"`
User string
Password string
HistoryDatabase string `yaml:"history-database"`
}
type postgresConfig struct {
Host string
Port int
SocketPath string `yaml:"socket-path"`
User string
Password string
HistoryDatabase string `yaml:"history-database"`
URI string `yaml:"uri"`
SSLMode string `yaml:"ssl-mode"`
}
type ergoConfig struct {
Datastore struct {
MySQL mysqlConfig `yaml:"mysql"`
PostgreSQL postgresConfig `yaml:"postgresql"`
}
}
// mysqlDSN builds a go-sql-driver/mysql DSN from config.
// Copied from irc/mysql/history.go (*MySQL).open().
func mysqlDSN(c mysqlConfig) string {
var address string
if c.SocketPath != "" {
address = fmt.Sprintf("unix(%s)", c.SocketPath)
} else {
port := c.Port
if port == 0 {
port = 3306
}
address = fmt.Sprintf("tcp(%s:%d)", c.Host, port)
}
return fmt.Sprintf("%s:%s@%s/%s", c.User, c.Password, address, c.HistoryDatabase)
}
// postgresURI builds a libpq URI from config.
// Copied from irc/postgresql/history.go (*Config).buildURI().
func postgresURI(c postgresConfig) string {
if c.URI != "" {
return c.URI
}
u := &url.URL{
Scheme: "postgresql",
Path: "/" + c.HistoryDatabase,
}
q := url.Values{}
if c.SocketPath != "" {
q.Set("host", c.SocketPath)
if c.User != "" || c.Password != "" {
u.User = url.UserPassword(c.User, c.Password)
}
} else {
port := c.Port
if port == 0 {
port = 5432
}
host := c.Host
if host == "" {
host = "localhost"
}
u.Host = fmt.Sprintf("%s:%d", host, port)
if c.User != "" || c.Password != "" {
u.User = url.UserPassword(c.User, c.Password)
}
}
sslMode := c.SSLMode
if sslMode == "" {
sslMode = "disable"
}
q.Set("sslmode", sslMode)
u.RawQuery = q.Encode()
return u.String()
}
func main() {
configPath := flag.String("config", "ircd.yaml", "path to ergo config file")
flag.Parse()
data, err := os.ReadFile(*configPath)
if err != nil {
log.Fatalf("read config: %v", err)
}
var config ergoConfig
if err := yaml.Unmarshal(data, &config); err != nil {
log.Fatalf("parse config: %v", err)
}
mysqlDB, err := sql.Open("mysql", mysqlDSN(config.Datastore.MySQL))
if err != nil {
log.Fatalf("open mysql: %v", err)
}
defer mysqlDB.Close()
if err := mysqlDB.Ping(); err != nil {
log.Fatalf("ping mysql: %v", err)
}
log.Println("connected to MySQL")
pgDB, err := sql.Open("pgx", postgresURI(config.Datastore.PostgreSQL))
if err != nil {
log.Fatalf("open postgres: %v", err)
}
defer pgDB.Close()
if err := pgDB.Ping(); err != nil {
log.Fatalf("ping postgres: %v", err)
}
log.Println("connected to PostgreSQL")
log.Println("setting up PostgreSQL schema")
if err := setupSchema(pgDB); err != nil {
log.Fatalf("setup schema: %v", err)
}
tables := []struct {
name string
fn func(*sql.DB, *sql.DB) (int, error)
}{
{"history", copyHistory},
{"sequence", copySequence},
{"conversations", copyConversations},
{"correspondents", copyCorrespondents},
{"account_messages", copyAccountMessages},
{"forget", copyForget},
}
for _, t := range tables {
log.Printf("copying %s...", t.name)
n, err := t.fn(mysqlDB, pgDB)
if err != nil {
log.Fatalf("copy %s: %v", t.name, err)
}
log.Printf(" %d rows", n)
}
log.Println("resetting sequences")
if err := resetSequences(pgDB); err != nil {
log.Fatalf("reset sequences: %v", err)
}
log.Println("done")
}
// setupSchema drops and recreates all tables and indexes.
// Table definitions and indexes are copied from irc/postgresql/history.go.
func setupSchema(db *sql.DB) error {
drops := []string{
"DROP TABLE IF EXISTS forget CASCADE",
"DROP TABLE IF EXISTS account_messages CASCADE",
"DROP TABLE IF EXISTS correspondents CASCADE",
"DROP TABLE IF EXISTS conversations CASCADE",
"DROP TABLE IF EXISTS sequence CASCADE",
"DROP TABLE IF EXISTS history CASCADE",
"DROP TABLE IF EXISTS metadata CASCADE",
}
for _, stmt := range drops {
if _, err := db.Exec(stmt); err != nil {
return fmt.Errorf("%s: %w", stmt, err)
}
}
stmts := []string{
`CREATE TABLE metadata (
key_name VARCHAR(32) PRIMARY KEY,
value VARCHAR(32) NOT NULL
)`,
`CREATE TABLE history (
id BIGSERIAL PRIMARY KEY,
data BYTEA NOT NULL,
msgid BYTEA NOT NULL CHECK (octet_length(msgid) = 16)
)`,
`CREATE INDEX idx_history_msgid ON history (msgid)`,
fmt.Sprintf(`CREATE TABLE sequence (
history_id BIGINT NOT NULL PRIMARY KEY,
target BYTEA NOT NULL CHECK (octet_length(target) <= %d),
nanotime BIGINT NOT NULL CHECK (nanotime >= 0)
)`, maxTargetLength),
`CREATE INDEX idx_sequence_target_nanotime ON sequence (target, nanotime)`,
fmt.Sprintf(`CREATE TABLE conversations (
id BIGSERIAL PRIMARY KEY,
target BYTEA NOT NULL CHECK (octet_length(target) <= %d),
correspondent BYTEA NOT NULL CHECK (octet_length(correspondent) <= %d),
nanotime BIGINT NOT NULL CHECK (nanotime >= 0),
history_id BIGINT NOT NULL
)`, maxTargetLength, maxTargetLength),
`CREATE INDEX idx_conversations_target_correspondent_nanotime ON conversations (target, correspondent, nanotime)`,
`CREATE INDEX idx_conversations_history_id ON conversations (history_id)`,
fmt.Sprintf(`CREATE TABLE correspondents (
id BIGSERIAL PRIMARY KEY,
target BYTEA NOT NULL CHECK (octet_length(target) <= %d),
correspondent BYTEA NOT NULL CHECK (octet_length(correspondent) <= %d),
nanotime BIGINT NOT NULL CHECK (nanotime >= 0),
UNIQUE (target, correspondent)
)`, maxTargetLength, maxTargetLength),
`CREATE INDEX idx_correspondents_target_nanotime ON correspondents (target, nanotime)`,
`CREATE INDEX idx_correspondents_nanotime ON correspondents (nanotime)`,
fmt.Sprintf(`CREATE TABLE account_messages (
history_id BIGINT NOT NULL PRIMARY KEY,
account BYTEA NOT NULL CHECK (octet_length(account) <= %d)
)`, maxTargetLength),
`CREATE INDEX idx_account_messages_account_history_id ON account_messages (account, history_id)`,
fmt.Sprintf(`CREATE TABLE forget (
id BIGSERIAL PRIMARY KEY,
account BYTEA NOT NULL CHECK (octet_length(account) <= %d)
)`, maxTargetLength),
}
for _, stmt := range stmts {
if _, err := db.Exec(stmt); err != nil {
return fmt.Errorf("exec: %w\nstatement: %s", err, stmt)
}
}
_, err := db.Exec(
`INSERT INTO metadata (key_name, value) VALUES ($1, $2), ($3, $4)`,
"db.version", latestSchema, "db.minorversion", latestMinorSchema,
)
return err
}
func copyHistory(src, dst *sql.DB) (int, error) {
return copyBatched(src, dst,
"SELECT id, data, msgid FROM history WHERE id > ? ORDER BY id LIMIT ?",
"INSERT INTO history (id, data, msgid) VALUES ($1, $2, $3)",
3,
)
}
func copySequence(src, dst *sql.DB) (int, error) {
return copyBatched(src, dst,
"SELECT history_id, target, nanotime FROM sequence WHERE history_id > ? ORDER BY history_id LIMIT ?",
"INSERT INTO sequence (history_id, target, nanotime) VALUES ($1, $2, $3)",
3,
)
}
func copyConversations(src, dst *sql.DB) (int, error) {
return copyBatched(src, dst,
"SELECT id, target, correspondent, nanotime, history_id FROM conversations WHERE id > ? ORDER BY id LIMIT ?",
"INSERT INTO conversations (id, target, correspondent, nanotime, history_id) VALUES ($1, $2, $3, $4, $5)",
5,
)
}
func copyCorrespondents(src, dst *sql.DB) (int, error) {
return copyBatched(src, dst,
"SELECT id, target, correspondent, nanotime FROM correspondents WHERE id > ? ORDER BY id LIMIT ?",
"INSERT INTO correspondents (id, target, correspondent, nanotime) VALUES ($1, $2, $3, $4)",
4,
)
}
func copyAccountMessages(src, dst *sql.DB) (int, error) {
return copyBatched(src, dst,
"SELECT history_id, account FROM account_messages WHERE history_id > ? ORDER BY history_id LIMIT ?",
"INSERT INTO account_messages (history_id, account) VALUES ($1, $2)",
2,
)
}
func copyForget(src, dst *sql.DB) (int, error) {
return copyBatched(src, dst,
"SELECT id, account FROM forget WHERE id > ? ORDER BY id LIMIT ?",
"INSERT INTO forget (id, account) VALUES ($1, $2)",
2,
)
}
// copyBatched copies rows from src to dst in batches using keyset pagination on
// the first (primary key) column. srcQuery must accept (lastID int64, limit int)
// and return rows ordered by that column. One transaction is committed per batch.
func copyBatched(src, dst *sql.DB, srcQuery, dstInsert string, ncols int) (int, error) {
vals := make([]any, ncols)
ptrs := make([]any, ncols)
for i := range vals {
ptrs[i] = &vals[i]
}
var lastID int64
total := 0
for {
rows, err := src.Query(srcQuery, lastID, batchSize)
if err != nil {
return total, fmt.Errorf("query: %w", err)
}
tx, err := dst.Begin()
if err != nil {
rows.Close()
return total, fmt.Errorf("begin tx: %w", err)
}
batchCount := 0
for rows.Next() {
if err := rows.Scan(ptrs...); err != nil {
rows.Close()
tx.Rollback()
return total, fmt.Errorf("scan: %w", err)
}
// MySQL returns BIGINT UNSIGNED as uint64; convert to int64 for PostgreSQL.
for i, v := range vals {
if u, ok := v.(uint64); ok {
vals[i] = int64(u)
}
}
if _, err := tx.Exec(dstInsert, vals...); err != nil {
rows.Close()
tx.Rollback()
return total, fmt.Errorf("insert: %w", err)
}
if id, ok := vals[0].(int64); ok {
lastID = id
}
batchCount++
}
rows.Close()
if err := rows.Err(); err != nil {
tx.Rollback()
return total, fmt.Errorf("rows: %w", err)
}
if err := tx.Commit(); err != nil {
return total, fmt.Errorf("commit: %w", err)
}
total += batchCount
log.Printf(" %d rows copied so far", total)
if batchCount < batchSize {
break
}
}
return total, nil
}
func resetSequences(db *sql.DB) error {
seqs := []struct{ seq, table, col string }{
{"history_id_seq", "history", "id"},
{"conversations_id_seq", "conversations", "id"},
{"correspondents_id_seq", "correspondents", "id"},
{"forget_id_seq", "forget", "id"},
}
for _, s := range seqs {
if _, err := db.Exec(fmt.Sprintf(
"SELECT setval('%s', COALESCE((SELECT MAX(%s) FROM %s), 1))",
s.seq, s.col, s.table,
)); err != nil {
return fmt.Errorf("reset %s: %w", s.seq, err)
}
}
return nil
}