2
2
// vim: ts=8 sw=2 smarttab
3
3
4
4
#include < stack>
5
+ #include < queue>
5
6
#include < fcntl.h>
6
7
#include < algorithm>
7
8
#include < sys/time.h>
@@ -63,6 +64,12 @@ std::string entry_path(const std::string &dir, const std::string &name) {
63
64
return dir + " /" + name;
64
65
}
65
66
67
+ std::string entry_diff_path (const std::string &dir, const std::string &name) {
68
+ if (dir == " ." )
69
+ return name;
70
+ return dir + " /" + name;
71
+ }
72
+
66
73
std::map<std::string, std::string> decode_snap_metadata (snap_metadata *snap_metadata,
67
74
size_t nr_snap_metadata) {
68
75
std::map<std::string, std::string> metadata;
@@ -1210,17 +1217,12 @@ void PeerReplayer::post_sync_close_handles(const FHandles &fh) {
1210
1217
ceph_close (fh.p_mnt , fh.p_fd );
1211
1218
}
1212
1219
1213
- int PeerReplayer::do_synchronize (const std::string &dir_root, const Snapshot ¤t,
1214
- boost::optional<Snapshot> prev) {
1220
+ int PeerReplayer::do_synchronize (const std::string &dir_root, const Snapshot ¤t) {
1215
1221
dout (20 ) << " : dir_root=" << dir_root << " , current=" << current << dendl;
1216
- if (prev) {
1217
- dout (20 ) << " : incremental sync check from prev=" << prev << dendl;
1218
- }
1219
-
1220
1222
FHandles fh;
1221
- int r = pre_sync_check_and_open_handles (dir_root, current, prev , &fh);
1223
+ int r = pre_sync_check_and_open_handles (dir_root, current, boost::none , &fh);
1222
1224
if (r < 0 ) {
1223
- dout (5 ) << " : cannot proceeed with sync: " << cpp_strerror (r) << dendl;
1225
+ dout (5 ) << " : cannot proceed with sync: " << cpp_strerror (r) << dendl;
1224
1226
return r;
1225
1227
}
1226
1228
@@ -1371,6 +1373,177 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu
1371
1373
return r;
1372
1374
}
1373
1375
1376
+ int PeerReplayer::do_synchronize (const std::string &dir_root, const Snapshot ¤t,
1377
+ boost::optional<Snapshot> prev) {
1378
+ if (!prev) {
1379
+ derr << " : invalid previous snapshot" << dendl;
1380
+ return -ENODATA;
1381
+ }
1382
+
1383
+ dout (20 ) << " : incremental sync check from prev=" << prev << dendl;
1384
+
1385
+ FHandles fh;
1386
+ int r = pre_sync_check_and_open_handles (dir_root, current, prev, &fh);
1387
+ if (r < 0 ) {
1388
+ dout (5 ) << " : cannot proceed with sync: " << cpp_strerror (r) << dendl;
1389
+ return r;
1390
+ }
1391
+
1392
+ BOOST_SCOPE_EXIT_ALL ( (this )(&fh) ) {
1393
+ post_sync_close_handles (fh);
1394
+ };
1395
+
1396
+ // record that we are going to "dirty" the data under this directory root
1397
+ auto snap_id_str{stringify (current.second )};
1398
+ r = ceph_setxattr (m_remote_mount, dir_root.c_str (), " ceph.mirror.dirty_snap_id" ,
1399
+ snap_id_str.c_str (), snap_id_str.size (), 0 );
1400
+ if (r < 0 ) {
1401
+ derr << " : error setting \" ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root
1402
+ << " : " << cpp_strerror (r) << dendl;
1403
+ return r;
1404
+ }
1405
+
1406
+ struct ceph_statx cstx;
1407
+ r = ceph_fstatx (m_local_mount, fh.c_fd , &cstx,
1408
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
1409
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
1410
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
1411
+ if (r < 0 ) {
1412
+ derr << " : failed to stat snap=" << current.first << " : " << cpp_strerror (r)
1413
+ << dendl;
1414
+ return r;
1415
+ }
1416
+
1417
+ ceph_snapdiff_info sd_info;
1418
+ ceph_snapdiff_entry_t sd_entry;
1419
+
1420
+ // The queue of SyncEntry items (directories) to be synchronized.
1421
+ // We follow a breadth first approach here based on the snapdiff output.
1422
+ std::queue<SyncEntry> sync_queue;
1423
+
1424
+ // start with initial/default entry
1425
+ std::string epath = " ." , npath = " " , nabs_path = " " , nname = " " ;
1426
+ sync_queue.emplace (SyncEntry (epath, cstx));
1427
+
1428
+ while (!sync_queue.empty ()) {
1429
+ if (should_backoff (dir_root, &r)) {
1430
+ dout (0 ) << " : backing off r=" << r << dendl;
1431
+ break ;
1432
+ }
1433
+ r = pre_sync_check_and_open_handles (dir_root, current, prev, &fh);
1434
+ if (r < 0 ) {
1435
+ dout (5 ) << " : cannot proceed with sync: " << cpp_strerror (r) << dendl;
1436
+ return r;
1437
+ }
1438
+
1439
+ dout (20 ) << " : " << sync_queue.size () << " entries in queue" << dendl;
1440
+ const auto &queue_entry = sync_queue.front ();
1441
+ epath = queue_entry.epath ;
1442
+ dout (20 ) << " : syncing entry, path=" << epath << dendl;
1443
+ r = ceph_open_snapdiff (fh.p_mnt , dir_root.c_str (), epath.c_str (),
1444
+ stringify ((*prev).first ).c_str (), current.first .c_str (), &sd_info);
1445
+ if (r != 0 ) {
1446
+ derr << " : failed to open snapdiff, r=" << r << dendl;
1447
+ return r;
1448
+ }
1449
+ while (0 < (r = ceph_readdir_snapdiff (&sd_info, &sd_entry))) {
1450
+ if (r < 0 ) {
1451
+ derr << " : failed to read directory=" << epath << dendl;
1452
+ ceph_close_snapdiff (&sd_info);
1453
+ return r;
1454
+ }
1455
+
1456
+ // New entry found
1457
+ nname = sd_entry.dir_entry .d_name ;
1458
+ if (" ." == nname || " .." == nname)
1459
+ continue ;
1460
+ // create path for the newly found entry
1461
+ npath = entry_diff_path (epath, nname);
1462
+ nabs_path = entry_diff_path (dir_root, npath);
1463
+
1464
+ r = ceph_statx (sd_info.cmount , nabs_path.c_str (), &cstx,
1465
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
1466
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
1467
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
1468
+ if (r < 0 ) {
1469
+ // can't stat, so it's a deleted entry.
1470
+ if (DT_DIR == sd_entry.dir_entry .d_type ) { // is a directory
1471
+ r = cleanup_remote_dir (dir_root, npath, fh);
1472
+ if (r < 0 ) {
1473
+ derr << " : failed to remove directory=" << nabs_path << dendl;
1474
+ break ;
1475
+ }
1476
+ }
1477
+ else { // is a file
1478
+ r = ceph_unlinkat (m_remote_mount, fh.r_fd_dir_root , npath.c_str (), 0 );
1479
+ if (r < 0 ) {
1480
+ break ;
1481
+ }
1482
+ }
1483
+ } else {
1484
+ // stat success, update the existing entry
1485
+ struct ceph_statx tstx;
1486
+ int rstat_r = ceph_statx (m_remote_mount, nabs_path.c_str (), &tstx,
1487
+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
1488
+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
1489
+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
1490
+ if (S_ISDIR (cstx.stx_mode )) { // is a directory
1491
+ // cleanup if it's a file in the remotefs
1492
+ if ((0 == rstat_r) && !S_ISDIR (tstx.stx_mode )) {
1493
+ r = ceph_unlinkat (m_remote_mount, fh.r_fd_dir_root , npath.c_str (), 0 );
1494
+ if (r < 0 ) {
1495
+ derr << " : Error in directory sync. Failed to remove file="
1496
+ << nabs_path << dendl;
1497
+ break ;
1498
+ }
1499
+ }
1500
+ r = remote_mkdir (npath, cstx, fh);
1501
+ if (r < 0 ) {
1502
+ break ;
1503
+ }
1504
+ // push it to sync_queue for later processing
1505
+ sync_queue.emplace (SyncEntry (npath, cstx));
1506
+ } else { // is a file
1507
+ bool need_data_sync = true ;
1508
+ bool need_attr_sync = true ;
1509
+ r = should_sync_entry (npath, cstx, fh, &need_data_sync, &need_attr_sync);
1510
+ if (r < 0 ) {
1511
+ break ;
1512
+ }
1513
+ dout (5 ) << " : entry=" << npath << " , data_sync=" << need_data_sync
1514
+ << " , attr_sync=" << need_attr_sync << dendl;
1515
+ if (need_data_sync || need_attr_sync) {
1516
+ // cleanup if it's a directory in the remotefs
1517
+ if ((0 == rstat_r) && S_ISDIR (tstx.stx_mode )) {
1518
+ r = cleanup_remote_dir (dir_root, npath, fh);
1519
+ if (r < 0 ) {
1520
+ derr << " : Error in file sync. Failed to remove remote directory="
1521
+ << nabs_path << dendl;
1522
+ break ;
1523
+ }
1524
+ }
1525
+ r = remote_file_op (dir_root, npath, cstx, fh, need_data_sync, need_attr_sync);
1526
+ if (r < 0 ) {
1527
+ break ;
1528
+ }
1529
+ }
1530
+ }
1531
+ }
1532
+ }
1533
+ if (0 == r) {
1534
+ dout (10 ) << " : successfully synchronized the entry=" << epath << dendl;
1535
+ }
1536
+
1537
+ // Close the current open directory and take the next queue_entry, if success or failure.
1538
+ r = ceph_close_snapdiff (&sd_info);
1539
+ if (r != 0 ) {
1540
+ derr << " : failed to close directory=" << epath << dendl;
1541
+ }
1542
+ sync_queue.pop ();
1543
+ }
1544
+ return r;
1545
+ }
1546
+
1374
1547
int PeerReplayer::synchronize (const std::string &dir_root, const Snapshot ¤t,
1375
1548
boost::optional<Snapshot> prev) {
1376
1549
dout (20 ) << " : dir_root=" << dir_root << " , current=" << current << dendl;
@@ -1389,7 +1562,7 @@ int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &curre
1389
1562
if (r < 0 ) {
1390
1563
dout (5 ) << " : missing \" ceph.mirror.dirty_snap_id\" xattr on remote -- using"
1391
1564
<< " incremental sync with remote scan" << dendl;
1392
- r = do_synchronize (dir_root, current, boost::none );
1565
+ r = do_synchronize (dir_root, current);
1393
1566
} else {
1394
1567
size_t xlen = r;
1395
1568
char *val = (char *)alloca (xlen+1 );
@@ -1410,7 +1583,7 @@ int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &curre
1410
1583
r = do_synchronize (dir_root, current, prev);
1411
1584
} else {
1412
1585
dout (5 ) << " : mismatch -- using incremental sync with remote scan" << dendl;
1413
- r = do_synchronize (dir_root, current, boost::none );
1586
+ r = do_synchronize (dir_root, current);
1414
1587
}
1415
1588
}
1416
1589
0 commit comments