Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/sys/bqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ extern "C" {

#include <sys/zfs_context.h>

typedef boolean_t (bqueue_peek_cb_t)(void *data, void *arg);

typedef struct bqueue {
list_t bq_list;
size_t bq_size;
Expand All @@ -52,6 +54,7 @@ void bqueue_destroy(bqueue_t *);
void bqueue_enqueue(bqueue_t *, void *, size_t);
void bqueue_enqueue_flush(bqueue_t *, void *, size_t);
void *bqueue_dequeue(bqueue_t *);
void bqueue_peek_into(bqueue_t *q, bqueue_peek_cb_t *cb, void *cb_arg);

#ifdef __cplusplus
}
Expand Down
41 changes: 41 additions & 0 deletions module/zfs/bqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,44 @@ bqueue_dequeue(bqueue_t *q)
q->bq_dequeuing_size -= obj2node(q, ret)->bqn_size;
return (ret);
}

/*
* Iterate over objects in the queue without removing them.
* Callback is called for each object.
* Iterate until callback returns B_FALSE.
* The caller must ensure no race with bqueue_dequeue,
* for example, only call this from the same thread as bqueue_dequeue.
*/
void
bqueue_peek_into(bqueue_t *q, bqueue_peek_cb_t *cb, void *cb_arg)
{
void *next = NULL;
void *data = list_head(&q->bq_dequeuing_list);

retry:
while (data != NULL) {
if (!cb(data, cb_arg))
return;
next = list_next(&q->bq_dequeuing_list, data);
if (next == NULL)
break;
data = next;
}
/* data point to the last node in the dequeuing list */

/* append bq_list to bq_dequeuing_list */
mutex_enter(&q->bq_lock);
while (q->bq_size == 0) {
cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
}
list_move_tail(&q->bq_dequeuing_list, &q->bq_list);
q->bq_dequeuing_size += q->bq_size;
q->bq_size = 0;
cv_broadcast(&q->bq_add_cv);
mutex_exit(&q->bq_lock);
if (data == NULL)
data = list_head(&q->bq_dequeuing_list);
else
data = list_next(&q->bq_dequeuing_list, data);
goto retry;
}
211 changes: 169 additions & 42 deletions module/zfs/dmu_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -1698,9 +1698,117 @@ receive_object_is_same_generation(objset_t *os, uint64_t object,
return (0);
}

typedef struct check_free_range_arg {
uint64_t object;
uint64_t max_offset;
zfs_range_tree_t *rt;
} check_free_range_arg_t;

static boolean_t
check_free_range_cb(void *data, void *arg)
{
check_free_range_arg_t *cfarg = (check_free_range_arg_t *)arg;
struct receive_record_arg *rrd = data;
boolean_t ret = B_TRUE;
uint64_t offset = 0, length = 0;

/* Check all records that would modify current object */
switch (rrd->header.drr_type) {
case DRR_WRITE:
{
struct drr_write *drrw = &rrd->header.drr_u.drr_write;
if (drrw->drr_object != cfarg->object)
return (B_FALSE);
offset = drrw->drr_offset;
length = drrw->drr_logical_size;
break;
}
case DRR_WRITE_EMBEDDED:
{
struct drr_write_embedded *drrwe =
&rrd->header.drr_u.drr_write_embedded;
if (drrwe->drr_object != cfarg->object)
return (B_FALSE);
offset = drrwe->drr_offset;
length = drrwe->drr_length;
break;
}
case DRR_FREE:
{
struct drr_free *drrf = &rrd->header.drr_u.drr_free;
if (drrf->drr_object != cfarg->object)
return (B_FALSE);
offset = drrf->drr_offset;
length = drrf->drr_length;
break;
}
case DRR_REDACT:
{
struct drr_redact *drrr = &rrd->header.drr_u.drr_redact;
if (drrr->drr_object != cfarg->object)
return (B_FALSE);
offset = drrr->drr_offset;
length = drrr->drr_length;
break;
}
case DRR_SPILL:
{
struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
if (drrs->drr_object != cfarg->object)
return (B_FALSE);
return (B_TRUE);
}
default:
/* Anything else means we are done with the object */
return (B_FALSE);
}

/*
* Assuming records are in-order, we can exit once drr offset is
* larger than max_offset, except for the first FREE with
* DMU_OBJECT_END that comes immediately after OBJECT.
*/
if (offset >= cfarg->max_offset && length != DMU_OBJECT_END)
ret = B_FALSE;
if (length == DMU_OBJECT_END)
length = UINT64_MAX - offset;
zfs_range_tree_clear(cfarg->rt, offset, length);

/* If we already clear everything, we can exit */
if (zfs_range_tree_is_empty(cfarg->rt)) {
ret = B_FALSE;
}
return (ret);
}

/*
* Check if following receive records will overwrite/free everthing under
* max_offset.
*/
static boolean_t
check_free_range(struct receive_writer_arg *rwa, uint64_t object,
uint64_t max_offset)
{
if (max_offset == 0)
return (B_TRUE);

boolean_t ret;
check_free_range_arg_t cfarg = {
.object = object,
.max_offset = max_offset,
};
cfarg.rt = zfs_range_tree_create(NULL, ZFS_RANGE_SEG64, NULL, 0, 0);
zfs_range_tree_add(cfarg.rt, 0, max_offset);
bqueue_peek_into(&rwa->q, check_free_range_cb, &cfarg);
ret = zfs_range_tree_is_empty(cfarg.rt);
zfs_range_tree_vacate(cfarg.rt, NULL, NULL);
zfs_range_tree_destroy(cfarg.rt);
return (ret);
}

static int
receive_handle_existing_object(const struct receive_writer_arg *rwa,
const struct drr_object *drro, const dmu_object_info_t *doi,
receive_handle_existing_object(struct receive_writer_arg *rwa,
const struct drr_object *drro, dmu_object_info_t *doi,
const void *bonus_data,
uint64_t *object_to_hold, uint32_t *new_blksz)
{
Expand All @@ -1711,6 +1819,7 @@ receive_handle_existing_object(const struct receive_writer_arg *rwa,
uint8_t dn_slots = drro->drr_dn_slots != 0 ?
drro->drr_dn_slots : DNODE_MIN_SLOTS;
boolean_t do_free_range = B_FALSE;
boolean_t do_grow_blksz = B_FALSE;
int err;

*object_to_hold = drro->drr_object;
Expand Down Expand Up @@ -1788,20 +1897,6 @@ receive_handle_existing_object(const struct receive_writer_arg *rwa,
* only increasing.
*/
do_free_range = B_TRUE;
} else if (doi->doi_max_offset <=
doi->doi_data_block_size) {
/*
* There is only one block. We can free it,
* because its contents will be replaced by a
* WRITE record. This can not be the no-L ->
* -L case, because the no-L case would have
* resulted in multiple blocks. If we
* supported -L -> no-L, it would not be safe
* to free the file's contents. Fortunately,
* that is not allowed (see
* recv_check_large_blocks()).
*/
do_free_range = B_TRUE;
} else {
boolean_t is_same_gen;
err = receive_object_is_same_generation(rwa->os,
Expand All @@ -1810,30 +1905,45 @@ receive_handle_existing_object(const struct receive_writer_arg *rwa,
if (err != 0)
return (SET_ERROR(EINVAL));

if (is_same_gen) {
/*
* This is the same logical file, and
* the block size must be increasing.
* It could only decrease if
* --large-block was changed to be
* off, which is checked in
* recv_check_large_blocks().
*/
if (drro->drr_blksz <=
doi->doi_data_block_size)
return (SET_ERROR(EINVAL));
if (!is_same_gen) {
do_free_range = B_TRUE;
goto skip;
}
/*
* If same gen, we try to do free range and take on
* the new blksz. However we need to check if it's
* safe to do so first by checking if everthing in the
* file will be overwritten in the following records.
* We only do this for small files so we don't end up
* having to check unlimited amount of records (at
* most 16 records).
*
* We need to do this because after the previous fix
* for -L that introduced this function, we can end up
* with same file having different block size if two
* sides have different recordsize setting.
*/
if (doi->doi_max_offset <= drro->drr_blksz * 16 &&
doi->doi_max_offset <= SPA_MAXBLOCKSIZE) {
do_free_range = check_free_range(rwa,
drro->drr_object, doi->doi_max_offset);
}
if (!do_free_range) {
*new_blksz = doi->doi_data_block_size;
/*
* We keep the existing blocksize and
* contents.
* We can't do truncate but our block size
* isn't power of 2. In the case where WRITE
* grows beyond current blksz, we must grow our
* blksz. Grow to the next power of 2.
*/
*new_blksz =
doi->doi_data_block_size;
} else {
do_free_range = B_TRUE;
if (!ISP2(*new_blksz)) {
*new_blksz = 1 << highbit64(*new_blksz);
do_grow_blksz = B_TRUE;
}
}
}
}

skip:
/* nblkptr can only decrease if the object was reallocated */
if (nblkptr < doi->doi_nblkptr)
do_free_range = B_TRUE;
Expand Down Expand Up @@ -1861,6 +1971,26 @@ receive_handle_existing_object(const struct receive_writer_arg *rwa,
0, DMU_OBJECT_END);
if (err != 0)
return (SET_ERROR(EINVAL));
} else if (do_grow_blksz) {
dmu_tx_t *tx = dmu_tx_create(rwa->os);
dmu_tx_hold_bonus(tx, drro->drr_object);
dmu_tx_hold_write(tx, drro->drr_object, 0, *new_blksz);
err = dmu_tx_assign(tx, DMU_TX_WAIT);
if (err != 0) {
dmu_tx_abort(tx);
return (err);
}
err = dmu_object_set_blocksize(rwa->os, drro->drr_object,
*new_blksz, 0, tx);
dmu_tx_commit(tx);
if (err != 0)
return (err);
/* Refresh doi */
err = dmu_object_info(rwa->os, drro->drr_object, doi);
if (err != 0)
return (err);
if (doi->doi_data_block_size != *new_blksz)
return (SET_ERROR(EINVAL));
}

/*
Expand Down Expand Up @@ -2272,14 +2402,11 @@ flush_write_batch_impl(struct receive_writer_arg *rwa)

if (drrw->drr_logical_size != dn->dn_datablksz) {
/*
* The WRITE record is larger than the object's block
* size. We must be receiving an incremental
* large-block stream into a dataset that previously did
* a non-large-block receive. Lightweight writes must
* be exactly one block, so we need to decompress the
* data (if compressed) and do a normal dmu_write().
* The WRITE record is different than the object's block
* size. Lightweight writes must be exactly one block,
* so we need to decompress the data (if compressed) and
* do a normal dmu_write().
*/
ASSERT3U(drrw->drr_logical_size, >, dn->dn_datablksz);
if (DRR_WRITE_COMPRESSED(drrw)) {
abd_t *decomp_abd =
abd_alloc_linear(drrw->drr_logical_size,
Expand Down
2 changes: 1 addition & 1 deletion tests/runfiles/common.run
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ tests = ['recv_dedup', 'recv_dedup_encrypted_zvol', 'rsend_001_pos',
'send_large_blocks_incremental', 'send_large_blocks_initial',
'send_large_microzap_incremental', 'send_large_microzap_transitive',
'send_doall', 'send_raw_spill_block', 'send_raw_ashift',
'send_raw_large_blocks', 'send_leak_keymaps']
'send_raw_large_blocks', 'send_leak_keymaps', 'send_recv_blksz']
tags = ['functional', 'rsend']

[tests/functional/scrub_mirror]
Expand Down
2 changes: 2 additions & 0 deletions tests/zfs-tests/tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ nobase_dist_datadir_zfs_tests_tests_DATA += \
functional/rsend/fs.tar.gz \
functional/rsend/rsend.cfg \
functional/rsend/rsend.kshlib \
functional/rsend/testpool_recv_blksz.gz \
functional/scrub_mirror/default.cfg \
functional/scrub_mirror/scrub_mirror_common.kshlib \
functional/send_xdr_encoding/send_xdr_encoding.cfg \
Expand Down Expand Up @@ -2128,6 +2129,7 @@ nobase_dist_datadir_zfs_tests_tests_SCRIPTS += \
functional/rsend/send_realloc_dnode_size.ksh \
functional/rsend/send_realloc_encrypted_files.ksh \
functional/rsend/send_realloc_files.ksh \
functional/rsend/send_recv_blksz.ksh \
functional/rsend/send_spill_block.ksh \
functional/rsend/send-wR_encrypted_zvol.ksh \
functional/rsend/send-zstream_drop_record.ksh \
Expand Down
Loading
Loading