/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License (the "License").
* You may not use this file except in compliance with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/*
* Copyright 2011 Nexenta Systems, Inc. All rights reserved.
* Copyright (c) 2011, 2015 by Delphix. All rights reserved.
* Copyright (c) 2014, Joyent, Inc. All rights reserved.
* Copyright 2014 HybridCluster. All rights reserved.
* Copyright 2016 RackTop Systems.
* Copyright (c) 2014 Integros [integros.com]
*/
#include <sys/dmu_impl.h>
#include <sys/zfs_context.h>
#include <sys/dmu_objset.h>
#include <sys/dmu_traverse.h>
#include <sys/dsl_dataset.h>
#include <sys/dsl_prop.h>
#include <sys/dsl_pool.h>
#include <sys/dsl_synctask.h>
#include <sys/zfs_ioctl.h>
#include <sys/zio_checksum.h>
#include <sys/zfs_znode.h>
#include <zfs_fletcher.h>
#include <sys/zfs_onexit.h>
#include <sys/dmu_send.h>
#include <sys/dsl_destroy.h>
#include <sys/dsl_bookmark.h>
#include <sys/zfeature.h>
/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
/* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
struct send_thread_arg {
bqueue_t q;
int error_code;
};
struct send_block_record {
};
static int
{
/*
* The code does not rely on this (len being a multiple of 8). We keep
* this assertion because of the corresponding assertion in
* receive_read(). Keeping this assertion ensures that we do not
* inadvertently break backwards compatibility (causing the assertion
* in receive_read() to trigger on old software).
*
* Removing the assertions could be rolled into a new feature that uses
* data that isn't 8-byte aligned; if the assertions were removed, a
* feature flag would have to be added.
*/
}
/*
* For all record types except BEGIN, fill in the checksum (overlaid in
* drr_u.drr_checksum.drr_checksum). The checksum verifies everything
* up to the start of the checksum itself.
*/
static int
{
==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
} else {
}
}
if (payload_len != 0) {
}
return (0);
}
/*
* Fill in the drr_free struct, or perform aggregation if the previous record is
* also a free record, and the two are adjacent.
*
* Note that we send free records even for a full send, because we want to be
* able to receive a full send as a clone, which requires a list of all the free
* and freeobject records that were generated on the source.
*/
static int
{
/*
* When we receive a free record, dbuf_free_range() assumes
* that the receiving system doesn't have any dbufs in the range
* being freed. This is always true because there is a one-record
* constraint: we only send one WRITE record for any given
* object,offset. We know that the one-record constraint is
* true because we always send data in increasing order by
* object,offset.
*
* If the increasing-order constraint ever changes, we should find
* another way to assert that the one-record constraint is still
* satisfied.
*/
length = -1ULL;
/*
* If there is a pending op, but it's not PENDING_FREE, push it out,
* since free block aggregation can only be done for blocks of the
* same type (i.e., DRR_FREE records can only be aggregated with
* other DRR_FREE records. DRR_FREEOBJECTS records can only be
* aggregated with other DRR_FREEOBJECTS records.
*/
}
/*
* There should never be a PENDING_FREE if length is -1
* (because dump_dnode is the only place where this
* function is called with a -1, and only after flushing
* any pending record).
*/
/*
* Check to see whether this free block can be aggregated
* with pending one.
*/
return (0);
} else {
/* not a continuation. Push out pending record */
}
}
/* create a FREE record and make it pending */
if (length == -1ULL) {
} else {
}
return (0);
}
static int
void *data)
{
/*
* We send data in increasing object, offset order.
* See comment in dump_free() for details.
*/
/*
* If there is any kind of pending aggregation (currently either
* a grouping of free objects or free blocks), push it out to
* the stream, since aggregation can't be done across operations
* of different types.
*/
}
/* write a WRITE record */
/* only set the compression fields if the buf is compressed */
} else {
}
/*
* There's no pre-computed checksum for partial-block
* writes or embedded BP's, so (like
* fletcher4-checkummed blocks) userland will have to
* compute a dedup-capable checksum itself.
*/
} else {
}
return (0);
}
static int
{
return (EINTR);
}
return (EINTR);
return (0);
}
static int
{
}
/* write a SPILL record */
return (0);
}
static int
{
/*
* If there is a pending op, but it's not PENDING_FREEOBJECTS,
* push it out, since free block aggregation can only be done for
* blocks of the same type (i.e., DRR_FREE records can only be
* aggregated with other DRR_FREE records. DRR_FREEOBJECTS records
* can only be aggregated with other DRR_FREEOBJECTS records.
*/
}
/*
* See whether this free object array can be aggregated
* with pending one
*/
return (0);
} else {
/* can't be aggregated. Push out pending record */
}
}
/* write a FREEOBJECTS record */
return (0);
}
static int
{
/*
* Note: when resuming, we will visit all the dnodes in
* the block of dnodes that we are resuming from. In
* this case it's unnecessary to send the dnodes prior to
* the one we are resuming from. We should be at most one
* block's worth of dnodes behind the resume point.
*/
return (0);
}
}
/* write an OBJECT record */
}
/* Free anything past the end of the file. */
return (0);
}
static boolean_t
{
if (!BP_IS_EMBEDDED(bp))
return (B_FALSE);
/*
* Compression function must be legacy, or explicitly enabled.
*/
return (B_FALSE);
/*
* Embed type must be explicitly enabled.
*/
switch (BPE_GET_ETYPE(bp)) {
case BP_EMBEDDED_TYPE_DATA:
return (B_TRUE);
break;
default:
return (B_FALSE);
}
return (B_FALSE);
}
/*
* This is the callback function to traverse_dataset that acts as the worker
* thread for dmu_send_impl.
*/
/*ARGSUSED*/
static int
{
int err = 0;
return (0);
return (0);
}
return (err);
}
/*
* This function kicks off the traverse_dataset. It also handles setting the
* error code of the thread in case something goes wrong, and pushes the End of
* Stream record when the traverse_dataset call has finished. If there is no
* dataset to traverse, the thread immediately pushes End of Stream marker.
*/
static void
{
int err;
}
}
/*
* This function actually handles figuring out what kind of record needs to be
* dumped, reading the data (which has hopefully been prefetched), and calling
* the appropriate helper function.
*/
static int
{
int err = 0;
return (0);
} else if (BP_IS_HOLE(bp) &&
} else if (BP_IS_HOLE(bp)) {
return (0);
} else if (type == DMU_OT_DNODE) {
for (int i = 0; i < blksz >> DNODE_SHIFT; i++) {
if (err != 0)
break;
}
/* it's an embedded level-0 block of a regular object */
} else {
/* it's a level-0 block of a regular object */
/*
* If we have large blocks stored on disk but the send flags
* don't allow us to send large blocks, we split the data from
* the arc buf into chunks.
*/
/*
* We should only request compressed data from the ARC if all
* the following are true:
* - stream compression was requested
* - we aren't splitting large blocks into smaller chunks
* - the data won't need to be byteswapped before sending
* - this isn't an embedded block
* - this isn't metadata (if receiving on a different endian
* system it can be byteswapped more easily)
*/
if (request_compressed)
zioflags |= ZIO_FLAG_RAW;
if (zfs_send_corrupt_data) {
/* Send a block filled with 0x"zfs badd bloc" */
blksz);
ptr++)
*ptr = 0x2f5baddb10cULL;
} else {
}
}
if (split_large_blocks) {
offset += n;
buf += n;
blksz -= n;
}
} else {
}
}
return (err);
}
/*
* Pop the new data off the queue, and free the old data.
*/
static struct send_block_record *
{
return (tmp);
}
/*
* Actually do the bulk of the work in a zfs send.
*
* Note: Releases dp using the specified tag.
*/
static int
{
int err;
if (err != 0) {
return (err);
}
#ifdef _KERNEL
}
if (version >= ZPL_VERSION_SA) {
}
}
#endif
if (embedok &&
}
if (compressok) {
}
if ((featureflags &
}
}
if (is_clone)
if (ancestor_zb != NULL) {
}
if (!to_ds->ds_is_snapshot) {
}
if (err != 0)
goto out;
}
if (err != 0) {
goto out;
}
to_arg.error_code = 0;
}
if (err != 0) {
while (!to_data->eos_marker) {
}
}
bqueue_destroy(&to_arg.q);
if (err != 0)
goto out;
if (err != 0) {
goto out;
}
out:
return (err);
}
int
{
int err;
if (err != 0)
return (err);
if (err != 0) {
return (err);
}
if (fromsnap != 0) {
if (err != 0) {
return (err);
}
} else {
}
return (err);
}
int
{
int err;
if (err != 0)
return (err);
/*
* We are sending a filesystem or volume. Ensure
* that it doesn't change by owning the dataset.
*/
} else {
}
if (err != 0) {
return (err);
}
/*
* If the fromsnap is in a different filesystem, then
* mark the send stream as a clone.
*/
}
if (err == 0) {
}
} else {
}
if (err != 0) {
return (err);
}
} else {
}
if (owned)
else
return (err);
}
static int
{
int err;
/*
* Assume that space (both on-disk and in-stream) is dominated by
* data. We will adjust for indirect blocks and the copies property,
* but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
*/
/* Assume all (uncompressed) blocks are recordsize. */
&recordsize);
if (err != 0)
return (err);
/*
* If we're estimating a send size for a compressed stream, use the
* compressed data size to estimate the stream size. Otherwise, use the
* uncompressed data size.
*/
/*
* Subtract out approximate space used by indirect blocks.
* Assume most space is used by data blocks (non-indirect, non-dnode).
* Assume no ditto blocks or internal fragmentation.
*
* Therefore, space used by indirect blocks is sizeof(blkptr_t) per
* block.
*/
/* Add in the space for the record associated with each block. */
return (0);
}
int
{
int err;
/* tosnap must be a snapshot */
if (!ds->ds_is_snapshot)
/* fromsnap, if provided, must be a snapshot */
/*
* fromsnap must be an earlier snapshot from the same fs as tosnap,
* or the origin's fs.
*/
/* Get compressed and uncompressed size estimates of changed data. */
} else {
if (err != 0)
return (err);
}
return (err);
}
struct calculate_send_arg {
};
/*
* Simple callback used to traverse the blocks of a snapshot and sum their
* uncompressed and compressed sizes.
*/
/* ARGSUSED */
static int
{
}
return (0);
}
/*
* Given a desination snapshot and a TXG, calculate the approximate size of a
* send stream sent from that TXG. from_txg may be zero, indicating that the
* whole snapshot will be sent.
*/
int
{
int err;
/* tosnap must be a snapshot */
if (!ds->ds_is_snapshot)
/* verify that from_txg is before the provided snapshot was taken */
}
/*
* traverse the blocks of the snapshot with birth times after
* from_txg, summing their uncompressed size
*/
if (err)
return (err);
return (err);
}
typedef struct dmu_recv_begin_arg {
const char *drba_origin;
static int
{
int error;
/* temporary clone name must not exist */
/* new snapshot name must not exist */
/*
* Check snapshot limit before receiving. We'll recheck again at the
* end, but might as well abort before receiving if we're already over
* the limit.
*
* Note that we do not check the file system limit with
* dsl_dir_fscount_check because the temporary %clones don't count
* against that limit.
*/
if (error != 0)
return (error);
if (fromguid != 0) {
/* Find snapshot in this dir that matches fromguid. */
while (obj != 0) {
&snap);
if (error != 0)
}
break;
}
if (obj == 0)
} else {
/*
* If we are not forcing, there must be no
* changes since fromsnap.
*/
}
}
} else {
/* if full, then must be forced */
/* start from $ORIGIN@$ORIGIN, if supported */
}
return (0);
}
static int
{
int error;
/* already checked */
/* Verify pool version supports SA if SA_SPILL feature set */
if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
/*
* The receiving code doesn't know how to translate a WRITE_EMBEDDED
* record to a plain WRITE record, so the pool must have the
* EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
* records. Same with WRITE_EMBEDDED records that use LZ4 compression.
*/
if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
/*
* The receiving code doesn't know how to translate large blocks
* to smaller ones, so the pool must have the LARGE_BLOCKS
* feature enabled if the stream has LARGE_BLOCKS.
*/
if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
if (error == 0) {
/* target fs already exists; recv into temp clone */
/* Can't recv a clone into an existing fs */
}
/* target fs does not exist; must be a full backup or clone */
/*
* If it's a non-clone incremental, we are missing the
* target fs, so fail the recv.
*/
drba->drba_origin))
/*
* If we're receiving a full send as a clone, and it doesn't
* contain all the necessary free records and freeobject
* records, reject it.
*/
!(flags & DRR_FLAG_FREERECORDS))
/* Open the parent of tofs */
if (error != 0)
return (error);
/*
* Check filesystem and snapshot limits before receiving. We'll
* recheck snapshot limits again at the end (we create the
* filesystems and increment those counts during begin_sync).
*/
if (error != 0) {
return (error);
}
if (error != 0) {
return (error);
}
if (error != 0) {
return (error);
}
if (!origin->ds_is_snapshot) {
}
fromguid != 0) {
}
}
error = 0;
}
return (error);
}
static void
{
int error;
if (error == 0) {
/* create temporary clone */
if (drba->drba_snapobj != 0) {
}
if (drba->drba_snapobj != 0)
} else {
const char *tail;
}
/* Create new dataset. */
}
if (drrb->drr_fromguid != 0) {
}
}
}
}
}
/*
* If we actually created a non-clone, we need to create the
* objset in our new dataset.
*/
}
}
static int
{
int error;
/* already checked */
/* Verify pool version supports SA if SA_SPILL feature set */
if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
/*
* The receiving code doesn't know how to translate a WRITE_EMBEDDED
* record to a plain WRITE record, so the pool must have the
* EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
* records. Same with WRITE_EMBEDDED records that use LZ4 compression.
*/
if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
/* 6 extra bytes for /%recv */
/* %recv does not exist; continue in tofs */
if (error != 0)
return (error);
}
/* check that ds is marked inconsistent */
if (!DS_IS_INCONSISTENT(ds)) {
}
/* check that there is resuming data, and that the toguid matches */
if (!dsl_dataset_is_zapified(ds)) {
}
}
/*
* Check if the receive is still running. If so, it will be owned.
* Note that nothing else can own the dataset (e.g. after the receive
* fails) because it will be marked inconsistent.
*/
if (dsl_dataset_has_owner(ds)) {
}
/* There should not be any snapshots of this fs yet. */
}
/*
* Note: resume point will be checked when we process the first WRITE
* record.
*/
/* check that the origin matches */
val = 0;
}
return (0);
}
static void
{
/* 6 extra bytes for /%recv */
/* %recv does not exist; continue in tofs */
}
/* clear the inconsistent flag so that we can own it */
}
/*
* NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
* succeeds; otherwise we will leak the holds on the datasets.
*/
int
{
} else {
}
return (dsl_sync_task(tofs,
} else {
return (dsl_sync_task(tofs,
}
}
struct receive_record_arg {
/*
* If the record is a write, pointer to the arc_buf_t containing the
* payload.
*/
int payload_size;
};
struct receive_writer_arg {
bqueue_t q;
/*
* These three args are used to signal to the main thread that we're
* done.
*/
int err;
/* A map from guid to dataset to help handle dedup'd streams. */
};
struct objlist {
/*
* Last object looked up. Used to assert that objects are being looked
* up in ascending order.
*/
};
struct receive_objnode {
};
struct receive_arg {
/*
* A record that has had its payload read in, but hasn't yet been handed
* off to the worker thread.
*/
/* A record that has had its header read in, but not its payload. */
int err;
/* Sorted list of objects not to issue prefetches for. */
};
typedef struct guid_map_entry {
static int
{
return (-1);
return (1);
return (0);
}
static void
{
}
}
static int
{
int done = 0;
/*
* The code doesn't rely on this (lengths being multiples of 8). See
* comment in dump_bytes.
*/
/*
* Note: ECKSUM indicates that the receive
* was interrupted and can potentially be resumed.
*/
}
}
return (0);
}
static void
{
case DRR_BEGIN:
break;
case DRR_OBJECT:
break;
case DRR_FREEOBJECTS:
break;
case DRR_WRITE:
break;
case DRR_WRITE_BYREF:
break;
case DRR_WRITE_EMBEDDED:
break;
case DRR_FREE:
break;
case DRR_SPILL:
break;
case DRR_END:
break;
}
}
}
static inline uint8_t
{
if (bonus_type == DMU_OT_SA) {
return (1);
} else {
return (1 +
}
}
static void
{
return;
/*
* We use ds_resume_bytes[] != 0 to indicate that we need to
* update this on disk, so it must not be 0.
*/
/*
* We only resume from write records, which have a valid
* (non-meta-dnode) object number.
*/
/*
* For resuming to work correctly, we must receive records in order,
* sorted by object,offset. This is checked by the callers, but
* assert it here for good measure.
*/
}
static int
void *data)
{
int err;
}
/*
* If we are losing blkptrs or changing the block size this must
* be a new file instance. We must clear out the previous file
* contents before we can change this type of metadata in the dnode.
*/
if (err == 0) {
int nblkptr;
drro->drr_bonuslen);
0, DMU_OBJECT_END);
if (err != 0)
}
}
if (err != 0) {
return (err);
}
if (object == DMU_NEW_OBJECT) {
/* currently free, want to be allocated */
/* currently allocated, but with different properties */
}
if (err != 0) {
}
drro->drr_bonuslen);
}
}
return (0);
}
/* ARGSUSED */
static int
struct drr_freeobjects *drrfo)
{
int next_err = 0;
int err;
continue;
if (err != 0)
return (err);
}
return (next_err);
return (0);
}
static int
{
int err;
/*
* For resuming to work, records must be in increasing order
* by (object, offset).
*/
}
if (err != 0) {
return (err);
}
}
/* use the bonus buf to look up the dnode in dmu_assign_arcbuf */
/*
* Note: If the receive fails, we want the resume stream to start
* with the same record that we last successfully received (as opposed
* to the next record), so that we can verify that we are
* resuming from the correct location.
*/
return (0);
}
/*
* Handle a DRR_WRITE_BYREF record. This record is used in dedup'ed
* streams to refer to a copy of the data that is already on the
* system because it came in earlier in the stream. This function
* finds the earlier copy of the data, and uses that copy instead of
* data from the stream to fulfill this write.
*/
static int
struct drr_write_byref *drrwbr)
{
int err;
/*
* If the GUID of the referenced dataset is different from the
* GUID of the target dataset, find the referenced dataset.
*/
}
} else {
}
if (err != 0)
return (err);
if (err != 0) {
return (err);
}
/* See comment in restore_write. */
return (0);
}
static int
{
int err;
return (EINVAL);
return (EINVAL);
return (EINVAL);
return (EINVAL);
if (err != 0) {
return (err);
}
/* See comment in restore_write. */
return (0);
}
static int
void *data)
{
int err;
return (err);
}
if (err != 0) {
return (err);
}
return (0);
}
/* ARGSUSED */
static int
{
int err;
return (err);
}
/* used to destroy the drc_ds on error */
static void
{
if (drc->drc_resumable) {
/* wait for our resume state to be written to disk */
} else {
(void) dsl_destroy_head(name);
}
}
static void
{
} else {
}
}
/*
* Read the payload into a buffer of size len, and update the current record's
* payload field.
* Allocate ra->next_rrd and read the next record's header into
* ra->next_rrd->header.
* Verify checksum of payload and next record.
*/
static int
{
int err;
if (len != 0) {
if (err != 0)
return (err);
/* note: rrd is NULL when reading the begin record's payload */
}
}
if (err != 0) {
return (err);
}
}
/*
* Note: checksum is of everything up to but not including the
* checksum itself.
*/
==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
}
return (0);
}
static void
{
list->last_lookup = 0;
}
static void
{
kmem_free(n, sizeof (*n));
}
}
/*
* This function looks through the objlist to see if the specified object number
* is contained in the objlist. In the process, it will remove all object
* numbers in the list that are smaller than the specified object number. Thus,
* any lookup of an object number smaller than a previously looked up object
* number will always return false; therefore, all lookups should be done in
* ascending order.
*/
static boolean_t
{
}
}
/*
* The objlist is a list of object numbers stored in ascending order. However,
* the insertion of new object numbers does not seek out the correct location to
* store a new object number; instead, it appends it to the list for simplicity.
* Thus, any users must take care to only insert new object numbers in ascending
* order.
*/
static void
{
#ifdef ZFS_DEBUG
#endif
}
/*
* Issue the prefetch reads for any necessary indirect blocks.
*
* We use the object ignore list to tell us whether or not to issue prefetches
* for a given object. We do this for both correctness (in case the blocksize
* of an object has changed) and performance (if the object doesn't exist, don't
* needlessly try to issue prefetches). We also trim the list as we go through
* the stream to prevent it from growing to an unbounded size.
*
* The object numbers within will always be in sorted order, and any write
* records we see will also be in sorted order, but they're not sorted with
* respect to each other (i.e. we can get several object records before
* receiving each object's write records). As a result, once we've reached a
* given object number, we can safely remove any reference to lower object
* numbers in the ignore list. In practice, we receive up to 32 object records
* before receiving write records, so the list can have up to 32 nodes in it.
*/
/* ARGSUSED */
static void
{
}
}
/*
* Read records off the stream, issuing any necessary prefetches.
*/
static int
{
int err;
case DRR_OBJECT:
{
if (err != 0) {
return (err);
}
/*
* See receive_read_prefetch for an explanation why we're
* storing this object in the ignore_obj_list.
*/
err = 0;
}
return (err);
}
case DRR_FREEOBJECTS:
{
return (err);
}
case DRR_WRITE:
{
if (DRR_WRITE_COMPRESSED(drrw)) {
} else {
}
if (err != 0) {
return (err);
}
return (err);
}
case DRR_WRITE_BYREF:
{
drrwb->drr_length);
return (err);
}
case DRR_WRITE_EMBEDDED:
{
if (err != 0) {
return (err);
}
drrwe->drr_length);
return (err);
}
case DRR_FREE:
{
/*
* It might be beneficial to prefetch indirect blocks here, but
* we don't really have the data to decide for sure.
*/
return (err);
}
case DRR_END:
{
return (0);
}
case DRR_SPILL:
{
buf);
if (err != 0)
return (err);
}
default:
}
}
/*
* Commit the records to the pool.
*/
static int
struct receive_record_arg *rrd)
{
int err;
/* Processing in order, therefore bytes_read should be increasing. */
case DRR_OBJECT:
{
return (err);
}
case DRR_FREEOBJECTS:
{
}
case DRR_WRITE:
{
/* if receive_write() is successful, it consumes the arc_buf */
if (err != 0)
return (err);
}
case DRR_WRITE_BYREF:
{
}
case DRR_WRITE_EMBEDDED:
{
return (err);
}
case DRR_FREE:
{
}
case DRR_SPILL:
{
return (err);
}
default:
}
}
/*
* dmu_recv_stream's worker thread; pull records off the queue, and then call
* receive_process_record When we're done, signal the main thread and exit.
*/
static void
{
/*
* If there's an error, the main thread will stop putting things
* on the queue, but we need to clear everything in it before we
* can exit.
*/
}
}
}
static int
{
"resume_object", &resume_obj) != 0 ||
"resume_offset", &resume_off) != 0) {
}
if (resume_obj != val)
if (resume_off != val)
return (0);
}
/*
* Read in the stream's records, one by one, and apply them to the pool. There
* are two threads involved; the thread that calls this function will spin up a
* worker thread, read the records off the stream one by one, and issue
* prefetches for any necessary indirect blocks. It will then push the records
* onto an internal blocking queue. The worker thread will pull the records off
* the queue, and actually write the data into the DMU. This way, the worker
* thread doesn't have to wait for reads to complete, since everything it needs
* (the indirect blocks) will be prefetched.
*
* NB: callers *must* call dmu_recv_end() if this succeeds.
*/
int
{
int err = 0;
int featureflags;
}
/* these were verified in dmu_recv_begin */
/*
* Open the objset we are modifying.
*/
/* if this stream is dedup'ed, set up the avl tree for guid mapping */
if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
if (cleanup_fd == -1) {
goto out;
}
cleanup_fd = -1;
goto out;
}
if (*action_handlep == 0) {
sizeof (guid_map_entry_t),
goto out;
} else {
(void **)&rwa.guid_to_ds_map);
goto out;
}
}
if (payloadlen != 0)
if (err != 0) {
if (payloadlen != 0)
goto out;
}
if (payloadlen != 0) {
if (err != 0)
goto out;
}
if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
if (err != 0)
goto out;
}
/*
* We're reading rwa.err without locks, which is safe since we are the
* only reader, and the worker thread is the only writer. It's ok if we
* miss a write for an iteration or two of the loop, since the writer
* thread will keep freeing records we send it until we send it an eos
* marker.
*
* We can leave this loop in 3 ways: First, if rwa.err is
* non-zero. In that case, the writer thread will free the rrd we just
* pushed. Second, if we're interrupted; in that case, either it's the
* first loop and ra.rrd was never allocated, or it's later, and ra.rrd
* has been handed off to the writer thread who will free it. Finally,
* if receive_read_record fails or we're at the end of the stream, then
* we free ra.rrd and exit.
*/
break;
}
/* Allocates and loads header into ra.next_rrd */
break;
}
}
}
bqueue_destroy(&rwa.q);
if (err == 0)
out:
if (err != 0) {
/*
* Clean up references. If receive is not resumable,
* destroy what we created, so we don't leave it in
* the inconsistent state.
*/
}
return (err);
}
static int
{
int error;
if (error != 0)
return (error);
/*
* We will destroy any snapshots in tofs (i.e. before
* origin_head) that are after the origin (which is
* the snap before drc_ds, because drc_ds can not
* have any snaps of its own).
*/
while (obj !=
&snap);
if (error != 0)
break;
if (error == 0) {
}
if (error != 0)
break;
}
if (error != 0) {
return (error);
}
}
if (error != 0) {
return (error);
}
if (error != 0)
return (error);
} else {
}
return (error);
}
static void
{
&origin_head));
/*
* Destroy any snapshots of drc_tofs (origin_head)
* after the origin (the snap before drc_ds).
*/
while (obj !=
&snap));
}
}
origin_head, tx);
/* set snapshot's creation time and guid */
} else {
/* set snapshot's creation time and guid */
}
}
/*
* Release the hold from dmu_recv_begin. This must be done before
* we return to open context, so that when we free the dataset's dnode,
* we can evict its bonus buffer.
*/
}
static int
{
int err;
if (err != 0)
return (err);
if (err == 0) {
} else {
}
return (err);
}
static int
{
#ifdef _KERNEL
/*
* We will be destroying the ds; make sure its origin is unmounted if
* necessary.
*/
#endif
}
static int
{
}
int
{
int error;
else
if (error != 0) {
}
return (error);
}
/*
* Return TRUE if this objset is currently being received into.
*/
{
}