1N/A/*-
1N/A * See the file LICENSE for redistribution information.
1N/A *
1N/A * Copyright (c) 1996, 1997, 1998
1N/A * Sleepycat Software. All rights reserved.
1N/A */
1N/A/*
1N/A * Copyright (c) 1990, 1993, 1994, 1995, 1996
1N/A * Keith Bostic. All rights reserved.
1N/A */
1N/A/*
1N/A * Copyright (c) 1990, 1993, 1994, 1995
1N/A * The Regents of the University of California. All rights reserved.
1N/A *
1N/A * Redistribution and use in source and binary forms, with or without
1N/A * modification, are permitted provided that the following conditions
1N/A * are met:
1N/A * 1. Redistributions of source code must retain the above copyright
1N/A * notice, this list of conditions and the following disclaimer.
1N/A * 2. Redistributions in binary form must reproduce the above copyright
1N/A * notice, this list of conditions and the following disclaimer in the
1N/A * documentation and/or other materials provided with the distribution.
1N/A * 3. All advertising materials mentioning features or use of this software
1N/A * must display the following acknowledgement:
1N/A * This product includes software developed by the University of
1N/A * California, Berkeley and its contributors.
1N/A * 4. Neither the name of the University nor the names of its contributors
1N/A * may be used to endorse or promote products derived from this software
1N/A * without specific prior written permission.
1N/A *
1N/A * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
1N/A * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
1N/A * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
1N/A * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
1N/A * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
1N/A * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
1N/A * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
1N/A * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
1N/A * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
1N/A * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
1N/A * SUCH DAMAGE.
1N/A */
1N/A
1N/A#include "config.h"
1N/A
1N/A#ifndef lint
1N/Astatic const char sccsid[] = "@(#)bt_split.c 10.33 (Sleepycat) 10/13/98";
1N/A#endif /* not lint */
1N/A
1N/A#ifndef NO_SYSTEM_INCLUDES
1N/A#include <sys/types.h>
1N/A
1N/A#include <errno.h>
1N/A#include <limits.h>
1N/A#include <string.h>
1N/A#endif
1N/A
1N/A#include "db_int.h"
1N/A#include "db_page.h"
1N/A#include "btree.h"
1N/A
1N/Astatic int __bam_broot __P((DBC *, PAGE *, PAGE *, PAGE *));
1N/Astatic int __bam_page __P((DBC *, EPG *, EPG *));
1N/Astatic int __bam_pinsert __P((DBC *, EPG *, PAGE *, PAGE *));
1N/Astatic int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *));
1N/Astatic int __bam_root __P((DBC *, EPG *));
1N/Astatic int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *));
1N/A
1N/A/*
1N/A * __bam_split --
1N/A * Split a page.
1N/A *
1N/A * PUBLIC: int __bam_split __P((DBC *, void *));
1N/A */
1N/Aint
1N/A__bam_split(dbc, arg)
1N/A DBC *dbc;
1N/A void *arg;
1N/A{
1N/A BTREE *t;
1N/A CURSOR *cp;
1N/A DB *dbp;
1N/A enum { UP, DOWN } dir;
1N/A int exact, level, ret;
1N/A
1N/A dbp = dbc->dbp;
1N/A cp = dbc->internal;
1N/A
1N/A /*
1N/A * The locking protocol we use to avoid deadlock to acquire locks by
1N/A * walking down the tree, but we do it as lazily as possible, locking
1N/A * the root only as a last resort. We expect all stack pages to have
1N/A * been discarded before we're called; we discard all short-term locks.
1N/A *
1N/A * When __bam_split is first called, we know that a leaf page was too
1N/A * full for an insert. We don't know what leaf page it was, but we
1N/A * have the key/recno that caused the problem. We call XX_search to
1N/A * reacquire the leaf page, but this time get both the leaf page and
1N/A * its parent, locked. We then split the leaf page and see if the new
1N/A * internal key will fit into the parent page. If it will, we're done.
1N/A *
1N/A * If it won't, we discard our current locks and repeat the process,
1N/A * only this time acquiring the parent page and its parent, locked.
1N/A * This process repeats until we succeed in the split, splitting the
1N/A * root page as the final resort. The entire process then repeats,
1N/A * as necessary, until we split a leaf page.
1N/A *
1N/A * XXX
1N/A * A traditional method of speeding this up is to maintain a stack of
1N/A * the pages traversed in the original search. You can detect if the
1N/A * stack is correct by storing the page's LSN when it was searched and
1N/A * comparing that LSN with the current one when it's locked during the
1N/A * split. This would be an easy change for this code, but I have no
1N/A * numbers that indicate it's worthwhile.
1N/A */
1N/A t = dbp->internal;
1N/A for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
1N/A /*
1N/A * Acquire a page and its parent, locked.
1N/A */
1N/A if ((ret = (dbp->type == DB_BTREE ?
1N/A __bam_search(dbc, arg, S_WRPAIR, level, NULL, &exact) :
1N/A __bam_rsearch(dbc,
1N/A (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0)
1N/A return (ret);
1N/A
1N/A /*
1N/A * Split the page if it still needs it (it's possible another
1N/A * thread of control has already split the page). If we are
1N/A * guaranteed that two items will fit on the page, the split
1N/A * is no longer necessary.
1N/A */
1N/A if (t->bt_ovflsize * 2 <=
1N/A (db_indx_t)P_FREESPACE(cp->csp[0].page)) {
1N/A __bam_stkrel(dbc, 1);
1N/A return (0);
1N/A }
1N/A ret = cp->csp[0].page->pgno == PGNO_ROOT ?
1N/A __bam_root(dbc, &cp->csp[0]) :
1N/A __bam_page(dbc, &cp->csp[-1], &cp->csp[0]);
1N/A BT_STK_CLR(cp);
1N/A
1N/A switch (ret) {
1N/A case 0:
1N/A /* Once we've split the leaf page, we're done. */
1N/A if (level == LEAFLEVEL)
1N/A return (0);
1N/A
1N/A /* Switch directions. */
1N/A if (dir == UP)
1N/A dir = DOWN;
1N/A break;
1N/A case DB_NEEDSPLIT:
1N/A /*
1N/A * It's possible to fail to split repeatedly, as other
1N/A * threads may be modifying the tree, or the page usage
1N/A * is sufficiently bad that we don't get enough space
1N/A * the first time.
1N/A */
1N/A if (dir == DOWN)
1N/A dir = UP;
1N/A break;
1N/A default:
1N/A return (ret);
1N/A }
1N/A }
1N/A /* NOTREACHED */
1N/A}
1N/A
1N/A/*
1N/A * __bam_root --
1N/A * Split the root page of a btree.
1N/A */
1N/Astatic int
1N/A__bam_root(dbc, cp)
1N/A DBC *dbc;
1N/A EPG *cp;
1N/A{
1N/A DB *dbp;
1N/A PAGE *lp, *rp;
1N/A db_indx_t split;
1N/A int ret;
1N/A
1N/A dbp = dbc->dbp;
1N/A
1N/A /* Yeah, right. */
1N/A if (cp->page->level >= MAXBTREELEVEL) {
1N/A ret = ENOSPC;
1N/A goto err;
1N/A }
1N/A
1N/A /* Create new left and right pages for the split. */
1N/A lp = rp = NULL;
1N/A if ((ret = __bam_new(dbc, TYPE(cp->page), &lp)) != 0 ||
1N/A (ret = __bam_new(dbc, TYPE(cp->page), &rp)) != 0)
1N/A goto err;
1N/A P_INIT(lp, dbp->pgsize, lp->pgno,
1N/A PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
1N/A cp->page->level, TYPE(cp->page));
1N/A P_INIT(rp, dbp->pgsize, rp->pgno,
1N/A ISINTERNAL(cp->page) ? PGNO_INVALID : lp->pgno, PGNO_INVALID,
1N/A cp->page->level, TYPE(cp->page));
1N/A
1N/A /* Split the page. */
1N/A if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
1N/A goto err;
1N/A
1N/A /* Log the change. */
1N/A if (DB_LOGGING(dbc)) {
1N/A DBT __a;
1N/A DB_LSN __lsn;
1N/A memset(&__a, 0, sizeof(__a));
1N/A __a.data = cp->page;
1N/A __a.size = dbp->pgsize;
1N/A ZERO_LSN(__lsn);
1N/A if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbc->txn,
1N/A &LSN(cp->page), 0, dbp->log_fileid, PGNO(lp), &LSN(lp),
1N/A PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &__lsn,
1N/A &__a)) != 0)
1N/A goto err;
1N/A LSN(lp) = LSN(rp) = LSN(cp->page);
1N/A }
1N/A
1N/A /* Clean up the new root page. */
1N/A if ((ret = (dbp->type == DB_RECNO ?
1N/A __ram_root(dbc, cp->page, lp, rp) :
1N/A __bam_broot(dbc, cp->page, lp, rp))) != 0)
1N/A goto err;
1N/A
1N/A /* Adjust any cursors. Do it last so we don't have to undo it. */
1N/A __bam_ca_split(dbp, cp->page->pgno, lp->pgno, rp->pgno, split, 1);
1N/A
1N/A /* Success -- write the real pages back to the store. */
1N/A (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
1N/A (void)__BT_TLPUT(dbc, cp->lock);
1N/A (void)memp_fput(dbp->mpf, lp, DB_MPOOL_DIRTY);
1N/A (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
1N/A
1N/A return (0);
1N/A
1N/Aerr: if (lp != NULL)
1N/A (void)__bam_free(dbc, lp);
1N/A if (rp != NULL)
1N/A (void)__bam_free(dbc, rp);
1N/A (void)memp_fput(dbp->mpf, cp->page, 0);
1N/A (void)__BT_TLPUT(dbc, cp->lock);
1N/A return (ret);
1N/A}
1N/A
1N/A/*
1N/A * __bam_page --
1N/A * Split the non-root page of a btree.
1N/A */
1N/Astatic int
1N/A__bam_page(dbc, pp, cp)
1N/A DBC *dbc;
1N/A EPG *pp, *cp;
1N/A{
1N/A DB *dbp;
1N/A DB_LOCK tplock;
1N/A PAGE *lp, *rp, *tp;
1N/A db_indx_t split;
1N/A int ret;
1N/A
1N/A dbp = dbc->dbp;
1N/A lp = rp = tp = NULL;
1N/A ret = -1;
1N/A
1N/A /* Create new right page for the split. */
1N/A if ((ret = __bam_new(dbc, TYPE(cp->page), &rp)) != 0)
1N/A goto err;
1N/A P_INIT(rp, dbp->pgsize, rp->pgno,
1N/A ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->pgno,
1N/A ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->next_pgno,
1N/A cp->page->level, TYPE(cp->page));
1N/A
1N/A /* Create new left page for the split. */
1N/A if ((ret = __os_malloc(dbp->pgsize, NULL, &lp)) != 0)
1N/A goto err;
1N/A P_INIT(lp, dbp->pgsize, cp->page->pgno,
1N/A ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->prev_pgno,
1N/A ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
1N/A cp->page->level, TYPE(cp->page));
1N/A ZERO_LSN(lp->lsn);
1N/A
1N/A /*
1N/A * Split right.
1N/A *
1N/A * Only the indices are sorted on the page, i.e., the key/data pairs
1N/A * aren't, so it's simpler to copy the data from the split page onto
1N/A * two new pages instead of copying half the data to the right page
1N/A * and compacting the left page in place. Since the left page can't
1N/A * change, we swap the original and the allocated left page after the
1N/A * split.
1N/A */
1N/A if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
1N/A goto err;
1N/A
1N/A /*
1N/A * Fix up the previous pointer of any leaf page following the split
1N/A * page.
1N/A *
1N/A * !!!
1N/A * There are interesting deadlock situations here as we write-lock a
1N/A * page that's not in our direct ancestry. Consider a cursor walking
1N/A * through the leaf pages, that has the previous page read-locked and
1N/A * is waiting on a lock for the page we just split. It will deadlock
1N/A * here. If this is a problem, we can fail in the split; it's not a
1N/A * problem as the split will succeed after the cursor passes through
1N/A * the page we're splitting.
1N/A */
1N/A if (TYPE(cp->page) == P_LBTREE && rp->next_pgno != PGNO_INVALID) {
1N/A if ((ret = __bam_lget(dbc,
1N/A 0, rp->next_pgno, DB_LOCK_WRITE, &tplock)) != 0)
1N/A goto err;
1N/A if ((ret = memp_fget(dbp->mpf, &rp->next_pgno, 0, &tp)) != 0)
1N/A goto err;
1N/A }
1N/A
1N/A /* Insert the new pages into the parent page. */
1N/A if ((ret = __bam_pinsert(dbc, pp, lp, rp)) != 0)
1N/A goto err;
1N/A
1N/A /* Log the change. */
1N/A if (DB_LOGGING(dbc)) {
1N/A DBT __a;
1N/A DB_LSN __lsn;
1N/A memset(&__a, 0, sizeof(__a));
1N/A __a.data = cp->page;
1N/A __a.size = dbp->pgsize;
1N/A if (tp == NULL)
1N/A ZERO_LSN(__lsn);
1N/A if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbc->txn,
1N/A &cp->page->lsn, 0, dbp->log_fileid, PGNO(cp->page),
1N/A &LSN(cp->page), PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp),
1N/A tp == NULL ? 0 : PGNO(tp),
1N/A tp == NULL ? &__lsn : &LSN(tp), &__a)) != 0)
1N/A goto err;
1N/A
1N/A LSN(lp) = LSN(rp) = LSN(cp->page);
1N/A if (tp != NULL)
1N/A LSN(tp) = LSN(cp->page);
1N/A }
1N/A
1N/A /* Copy the allocated page into place. */
1N/A memcpy(cp->page, lp, LOFFSET(lp));
1N/A memcpy((u_int8_t *)cp->page + HOFFSET(lp),
1N/A (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
1N/A __os_free(lp, dbp->pgsize);
1N/A lp = NULL;
1N/A
1N/A /* Finish the next-page link. */
1N/A if (tp != NULL)
1N/A tp->prev_pgno = rp->pgno;
1N/A
1N/A /* Adjust any cursors. Do so last so we don't have to undo it. */
1N/A __bam_ca_split(dbp, cp->page->pgno, cp->page->pgno, rp->pgno, split, 0);
1N/A
1N/A /* Success -- write the real pages back to the store. */
1N/A (void)memp_fput(dbp->mpf, pp->page, DB_MPOOL_DIRTY);
1N/A (void)__BT_TLPUT(dbc, pp->lock);
1N/A (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
1N/A (void)__BT_TLPUT(dbc, cp->lock);
1N/A (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
1N/A if (tp != NULL) {
1N/A (void)memp_fput(dbp->mpf, tp, DB_MPOOL_DIRTY);
1N/A (void)__BT_TLPUT(dbc, tplock);
1N/A }
1N/A return (0);
1N/A
1N/Aerr: if (lp != NULL)
1N/A __os_free(lp, dbp->pgsize);
1N/A if (rp != NULL)
1N/A (void)__bam_free(dbc, rp);
1N/A if (tp != NULL) {
1N/A (void)memp_fput(dbp->mpf, tp, 0);
1N/A if (ret == DB_NEEDSPLIT)
1N/A (void)__BT_LPUT(dbc, tplock);
1N/A else
1N/A (void)__BT_TLPUT(dbc, tplock);
1N/A }
1N/A (void)memp_fput(dbp->mpf, pp->page, 0);
1N/A if (ret == DB_NEEDSPLIT)
1N/A (void)__BT_LPUT(dbc, pp->lock);
1N/A else
1N/A (void)__BT_TLPUT(dbc, pp->lock);
1N/A (void)memp_fput(dbp->mpf, cp->page, 0);
1N/A if (ret == DB_NEEDSPLIT)
1N/A (void)__BT_LPUT(dbc, cp->lock);
1N/A else
1N/A (void)__BT_TLPUT(dbc, cp->lock);
1N/A return (ret);
1N/A}
1N/A
1N/A/*
1N/A * __bam_broot --
1N/A * Fix up the btree root page after it has been split.
1N/A */
1N/Astatic int
1N/A__bam_broot(dbc, rootp, lp, rp)
1N/A DBC *dbc;
1N/A PAGE *rootp, *lp, *rp;
1N/A{
1N/A BINTERNAL bi, *child_bi;
1N/A BKEYDATA *child_bk;
1N/A DB *dbp;
1N/A DBT hdr, data;
1N/A int ret;
1N/A
1N/A dbp = dbc->dbp;
1N/A
1N/A /*
1N/A * If the root page was a leaf page, change it into an internal page.
1N/A * We copy the key we split on (but not the key's data, in the case of
1N/A * a leaf page) to the new root page.
1N/A */
1N/A P_INIT(rootp, dbp->pgsize,
1N/A PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE);
1N/A
1N/A memset(&data, 0, sizeof(data));
1N/A memset(&hdr, 0, sizeof(hdr));
1N/A
1N/A /*
1N/A * The btree comparison code guarantees that the left-most key on any
1N/A * level of the tree is never used, so it doesn't need to be filled in.
1N/A */
1N/A memset(&bi, 0, sizeof(bi));
1N/A bi.len = 0;
1N/A B_TSET(bi.type, B_KEYDATA, 0);
1N/A bi.pgno = lp->pgno;
1N/A if (F_ISSET(dbp, DB_BT_RECNUM)) {
1N/A bi.nrecs = __bam_total(lp);
1N/A RE_NREC_SET(rootp, bi.nrecs);
1N/A }
1N/A hdr.data = &bi;
1N/A hdr.size = SSZA(BINTERNAL, data);
1N/A if ((ret =
1N/A __db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0)
1N/A return (ret);
1N/A
1N/A switch (TYPE(rp)) {
1N/A case P_IBTREE:
1N/A /* Copy the first key of the child page onto the root page. */
1N/A child_bi = GET_BINTERNAL(rp, 0);
1N/A
1N/A bi.len = child_bi->len;
1N/A B_TSET(bi.type, child_bi->type, 0);
1N/A bi.pgno = rp->pgno;
1N/A if (F_ISSET(dbp, DB_BT_RECNUM)) {
1N/A bi.nrecs = __bam_total(rp);
1N/A RE_NREC_ADJ(rootp, bi.nrecs);
1N/A }
1N/A hdr.data = &bi;
1N/A hdr.size = SSZA(BINTERNAL, data);
1N/A data.data = child_bi->data;
1N/A data.size = child_bi->len;
1N/A if ((ret = __db_pitem(dbc, rootp, 1,
1N/A BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
1N/A return (ret);
1N/A
1N/A /* Increment the overflow ref count. */
1N/A if (B_TYPE(child_bi->type) == B_OVERFLOW)
1N/A if ((ret = __db_ovref(dbc,
1N/A ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
1N/A return (ret);
1N/A break;
1N/A case P_LBTREE:
1N/A /* Copy the first key of the child page onto the root page. */
1N/A child_bk = GET_BKEYDATA(rp, 0);
1N/A switch (B_TYPE(child_bk->type)) {
1N/A case B_KEYDATA:
1N/A bi.len = child_bk->len;
1N/A B_TSET(bi.type, child_bk->type, 0);
1N/A bi.pgno = rp->pgno;
1N/A if (F_ISSET(dbp, DB_BT_RECNUM)) {
1N/A bi.nrecs = __bam_total(rp);
1N/A RE_NREC_ADJ(rootp, bi.nrecs);
1N/A }
1N/A hdr.data = &bi;
1N/A hdr.size = SSZA(BINTERNAL, data);
1N/A data.data = child_bk->data;
1N/A data.size = child_bk->len;
1N/A if ((ret = __db_pitem(dbc, rootp, 1,
1N/A BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0)
1N/A return (ret);
1N/A break;
1N/A case B_DUPLICATE:
1N/A case B_OVERFLOW:
1N/A bi.len = BOVERFLOW_SIZE;
1N/A B_TSET(bi.type, child_bk->type, 0);
1N/A bi.pgno = rp->pgno;
1N/A if (F_ISSET(dbp, DB_BT_RECNUM)) {
1N/A bi.nrecs = __bam_total(rp);
1N/A RE_NREC_ADJ(rootp, bi.nrecs);
1N/A }
1N/A hdr.data = &bi;
1N/A hdr.size = SSZA(BINTERNAL, data);
1N/A data.data = child_bk;
1N/A data.size = BOVERFLOW_SIZE;
1N/A if ((ret = __db_pitem(dbc, rootp, 1,
1N/A BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
1N/A return (ret);
1N/A
1N/A /* Increment the overflow ref count. */
1N/A if (B_TYPE(child_bk->type) == B_OVERFLOW)
1N/A if ((ret = __db_ovref(dbc,
1N/A ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
1N/A return (ret);
1N/A break;
1N/A default:
1N/A return (__db_pgfmt(dbp, rp->pgno));
1N/A }
1N/A break;
1N/A default:
1N/A return (__db_pgfmt(dbp, rp->pgno));
1N/A }
1N/A return (0);
1N/A}
1N/A
1N/A/*
1N/A * __ram_root --
1N/A * Fix up the recno root page after it has been split.
1N/A */
1N/Astatic int
1N/A__ram_root(dbc, rootp, lp, rp)
1N/A DBC *dbc;
1N/A PAGE *rootp, *lp, *rp;
1N/A{
1N/A DB *dbp;
1N/A DBT hdr;
1N/A RINTERNAL ri;
1N/A int ret;
1N/A
1N/A dbp = dbc->dbp;
1N/A
1N/A /* Initialize the page. */
1N/A P_INIT(rootp, dbp->pgsize,
1N/A PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO);
1N/A
1N/A /* Initialize the header. */
1N/A memset(&hdr, 0, sizeof(hdr));
1N/A hdr.data = &ri;
1N/A hdr.size = RINTERNAL_SIZE;
1N/A
1N/A /* Insert the left and right keys, set the header information. */
1N/A ri.pgno = lp->pgno;
1N/A ri.nrecs = __bam_total(lp);
1N/A if ((ret = __db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
1N/A return (ret);
1N/A RE_NREC_SET(rootp, ri.nrecs);
1N/A ri.pgno = rp->pgno;
1N/A ri.nrecs = __bam_total(rp);
1N/A if ((ret = __db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
1N/A return (ret);
1N/A RE_NREC_ADJ(rootp, ri.nrecs);
1N/A return (0);
1N/A}
1N/A
1N/A/*
1N/A * __bam_pinsert --
1N/A * Insert a new key into a parent page, completing the split.
1N/A */
1N/Astatic int
1N/A__bam_pinsert(dbc, parent, lchild, rchild)
1N/A DBC *dbc;
1N/A EPG *parent;
1N/A PAGE *lchild, *rchild;
1N/A{
1N/A BINTERNAL bi, *child_bi;
1N/A BKEYDATA *child_bk, *tmp_bk;
1N/A BTREE *t;
1N/A DB *dbp;
1N/A DBT a, b, hdr, data;
1N/A PAGE *ppage;
1N/A RINTERNAL ri;
1N/A db_indx_t off;
1N/A db_recno_t nrecs;
1N/A u_int32_t n, nbytes, nksize;
1N/A int ret;
1N/A
1N/A dbp = dbc->dbp;
1N/A t = dbp->internal;
1N/A ppage = parent->page;
1N/A
1N/A /* If handling record numbers, count records split to the right page. */
1N/A nrecs = dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM) ?
1N/A __bam_total(rchild) : 0;
1N/A
1N/A /*
1N/A * Now we insert the new page's first key into the parent page, which
1N/A * completes the split. The parent points to a PAGE and a page index
1N/A * offset, where the new key goes ONE AFTER the index, because we split
1N/A * to the right.
1N/A *
1N/A * XXX
1N/A * Some btree algorithms replace the key for the old page as well as
1N/A * the new page. We don't, as there's no reason to believe that the
1N/A * first key on the old page is any better than the key we have, and,
1N/A * in the case of a key being placed at index 0 causing the split, the
1N/A * key is unavailable.
1N/A */
1N/A off = parent->indx + O_INDX;
1N/A
1N/A /*
1N/A * Calculate the space needed on the parent page.
1N/A *
1N/A * Prefix trees: space hack used when inserting into BINTERNAL pages.
1N/A * Retain only what's needed to distinguish between the new entry and
1N/A * the LAST entry on the page to its left. If the keys compare equal,
1N/A * retain the entire key. We ignore overflow keys, and the entire key
1N/A * must be retained for the next-to-leftmost key on the leftmost page
1N/A * of each level, or the search will fail. Applicable ONLY to internal
1N/A * pages that have leaf pages as children. Further reduction of the
1N/A * key between pairs of internal pages loses too much information.
1N/A */
1N/A switch (TYPE(rchild)) {
1N/A case P_IBTREE:
1N/A child_bi = GET_BINTERNAL(rchild, 0);
1N/A nbytes = BINTERNAL_PSIZE(child_bi->len);
1N/A
1N/A if (P_FREESPACE(ppage) < nbytes)
1N/A return (DB_NEEDSPLIT);
1N/A
1N/A /* Add a new record for the right page. */
1N/A memset(&bi, 0, sizeof(bi));
1N/A bi.len = child_bi->len;
1N/A B_TSET(bi.type, child_bi->type, 0);
1N/A bi.pgno = rchild->pgno;
1N/A bi.nrecs = nrecs;
1N/A memset(&hdr, 0, sizeof(hdr));
1N/A hdr.data = &bi;
1N/A hdr.size = SSZA(BINTERNAL, data);
1N/A memset(&data, 0, sizeof(data));
1N/A data.data = child_bi->data;
1N/A data.size = child_bi->len;
1N/A if ((ret = __db_pitem(dbc, ppage, off,
1N/A BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
1N/A return (ret);
1N/A
1N/A /* Increment the overflow ref count. */
1N/A if (B_TYPE(child_bi->type) == B_OVERFLOW)
1N/A if ((ret = __db_ovref(dbc,
1N/A ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
1N/A return (ret);
1N/A break;
1N/A case P_LBTREE:
1N/A child_bk = GET_BKEYDATA(rchild, 0);
1N/A switch (B_TYPE(child_bk->type)) {
1N/A case B_KEYDATA:
1N/A nbytes = BINTERNAL_PSIZE(child_bk->len);
1N/A nksize = child_bk->len;
1N/A if (t->bt_prefix == NULL)
1N/A goto noprefix;
1N/A if (ppage->prev_pgno == PGNO_INVALID && off <= 1)
1N/A goto noprefix;
1N/A tmp_bk = GET_BKEYDATA(lchild, NUM_ENT(lchild) - P_INDX);
1N/A if (B_TYPE(tmp_bk->type) != B_KEYDATA)
1N/A goto noprefix;
1N/A memset(&a, 0, sizeof(a));
1N/A a.size = tmp_bk->len;
1N/A a.data = tmp_bk->data;
1N/A memset(&b, 0, sizeof(b));
1N/A b.size = child_bk->len;
1N/A b.data = child_bk->data;
1N/A nksize = t->bt_prefix(&a, &b);
1N/A if ((n = BINTERNAL_PSIZE(nksize)) < nbytes)
1N/A nbytes = n;
1N/A else
1N/Anoprefix: nksize = child_bk->len;
1N/A
1N/A if (P_FREESPACE(ppage) < nbytes)
1N/A return (DB_NEEDSPLIT);
1N/A
1N/A memset(&bi, 0, sizeof(bi));
1N/A bi.len = nksize;
1N/A B_TSET(bi.type, child_bk->type, 0);
1N/A bi.pgno = rchild->pgno;
1N/A bi.nrecs = nrecs;
1N/A memset(&hdr, 0, sizeof(hdr));
1N/A hdr.data = &bi;
1N/A hdr.size = SSZA(BINTERNAL, data);
1N/A memset(&data, 0, sizeof(data));
1N/A data.data = child_bk->data;
1N/A data.size = nksize;
1N/A if ((ret = __db_pitem(dbc, ppage, off,
1N/A BINTERNAL_SIZE(nksize), &hdr, &data)) != 0)
1N/A return (ret);
1N/A break;
1N/A case B_DUPLICATE:
1N/A case B_OVERFLOW:
1N/A nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
1N/A
1N/A if (P_FREESPACE(ppage) < nbytes)
1N/A return (DB_NEEDSPLIT);
1N/A
1N/A memset(&bi, 0, sizeof(bi));
1N/A bi.len = BOVERFLOW_SIZE;
1N/A B_TSET(bi.type, child_bk->type, 0);
1N/A bi.pgno = rchild->pgno;
1N/A bi.nrecs = nrecs;
1N/A memset(&hdr, 0, sizeof(hdr));
1N/A hdr.data = &bi;
1N/A hdr.size = SSZA(BINTERNAL, data);
1N/A memset(&data, 0, sizeof(data));
1N/A data.data = child_bk;
1N/A data.size = BOVERFLOW_SIZE;
1N/A if ((ret = __db_pitem(dbc, ppage, off,
1N/A BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
1N/A return (ret);
1N/A
1N/A /* Increment the overflow ref count. */
1N/A if (B_TYPE(child_bk->type) == B_OVERFLOW)
1N/A if ((ret = __db_ovref(dbc,
1N/A ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
1N/A return (ret);
1N/A break;
1N/A default:
1N/A return (__db_pgfmt(dbp, rchild->pgno));
1N/A }
1N/A break;
1N/A case P_IRECNO:
1N/A case P_LRECNO:
1N/A nbytes = RINTERNAL_PSIZE;
1N/A
1N/A if (P_FREESPACE(ppage) < nbytes)
1N/A return (DB_NEEDSPLIT);
1N/A
1N/A /* Add a new record for the right page. */
1N/A memset(&hdr, 0, sizeof(hdr));
1N/A hdr.data = &ri;
1N/A hdr.size = RINTERNAL_SIZE;
1N/A ri.pgno = rchild->pgno;
1N/A ri.nrecs = nrecs;
1N/A if ((ret = __db_pitem(dbc,
1N/A ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0)
1N/A return (ret);
1N/A break;
1N/A default:
1N/A return (__db_pgfmt(dbp, rchild->pgno));
1N/A }
1N/A
1N/A /* Adjust the parent page's left page record count. */
1N/A if (dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM)) {
1N/A /* Log the change. */
1N/A if (DB_LOGGING(dbc) &&
1N/A (ret = __bam_cadjust_log(dbp->dbenv->lg_info,
1N/A dbc->txn, &LSN(ppage), 0, dbp->log_fileid,
1N/A PGNO(ppage), &LSN(ppage), (u_int32_t)parent->indx,
1N/A -(int32_t)nrecs, (int32_t)0)) != 0)
1N/A return (ret);
1N/A
1N/A /* Update the left page count. */
1N/A if (dbp->type == DB_RECNO)
1N/A GET_RINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
1N/A else
1N/A GET_BINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
1N/A }
1N/A
1N/A return (0);
1N/A}
1N/A
1N/A/*
1N/A * __bam_psplit --
1N/A * Do the real work of splitting the page.
1N/A */
1N/Astatic int
1N/A__bam_psplit(dbc, cp, lp, rp, splitret)
1N/A DBC *dbc;
1N/A EPG *cp;
1N/A PAGE *lp, *rp;
1N/A db_indx_t *splitret;
1N/A{
1N/A DB *dbp;
1N/A PAGE *pp;
1N/A db_indx_t half, nbytes, off, splitp, top;
1N/A int adjust, cnt, isbigkey, ret;
1N/A
1N/A dbp = dbc->dbp;
1N/A pp = cp->page;
1N/A adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
1N/A
1N/A /*
1N/A * If we're splitting the first (last) page on a level because we're
1N/A * inserting (appending) a key to it, it's likely that the data is
1N/A * sorted. Moving a single item to the new page is less work and can
1N/A * push the fill factor higher than normal. If we're wrong it's not
1N/A * a big deal, we'll just do the split the right way next time.
1N/A */
1N/A off = 0;
1N/A if (NEXT_PGNO(pp) == PGNO_INVALID &&
1N/A ((ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page) - 1) ||
1N/A (!ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page))))
1N/A off = NUM_ENT(cp->page) - adjust;
1N/A else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
1N/A off = adjust;
1N/A
1N/A if (off != 0)
1N/A goto sort;
1N/A
1N/A /*
1N/A * Split the data to the left and right pages. Try not to split on
1N/A * an overflow key. (Overflow keys on internal pages will slow down
1N/A * searches.) Refuse to split in the middle of a set of duplicates.
1N/A *
1N/A * First, find the optimum place to split.
1N/A *
1N/A * It's possible to try and split past the last record on the page if
1N/A * there's a very large record at the end of the page. Make sure this
1N/A * doesn't happen by bounding the check at the next-to-last entry on
1N/A * the page.
1N/A *
1N/A * Note, we try and split half the data present on the page. This is
1N/A * because another process may have already split the page and left
1N/A * it half empty. We don't try and skip the split -- we don't know
1N/A * how much space we're going to need on the page, and we may need up
1N/A * to half the page for a big item, so there's no easy test to decide
1N/A * if we need to split or not. Besides, if two threads are inserting
1N/A * data into the same place in the database, we're probably going to
1N/A * need more space soon anyway.
1N/A */
1N/A top = NUM_ENT(pp) - adjust;
1N/A half = (dbp->pgsize - HOFFSET(pp)) / 2;
1N/A for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
1N/A switch (TYPE(pp)) {
1N/A case P_IBTREE:
1N/A if (B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA)
1N/A nbytes +=
1N/A BINTERNAL_SIZE(GET_BINTERNAL(pp, off)->len);
1N/A else
1N/A nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
1N/A break;
1N/A case P_LBTREE:
1N/A if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
1N/A nbytes +=
1N/A BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
1N/A else
1N/A nbytes += BOVERFLOW_SIZE;
1N/A
1N/A ++off;
1N/A if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
1N/A nbytes +=
1N/A BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
1N/A else
1N/A nbytes += BOVERFLOW_SIZE;
1N/A break;
1N/A case P_IRECNO:
1N/A nbytes += RINTERNAL_SIZE;
1N/A break;
1N/A case P_LRECNO:
1N/A nbytes += BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
1N/A break;
1N/A default:
1N/A return (__db_pgfmt(dbp, pp->pgno));
1N/A }
1N/Asort: splitp = off;
1N/A
1N/A /*
1N/A * Splitp is either at or just past the optimum split point. If
1N/A * it's a big key, try and find something close by that's not.
1N/A */
1N/A if (TYPE(pp) == P_IBTREE)
1N/A isbigkey = B_TYPE(GET_BINTERNAL(pp, off)->type) != B_KEYDATA;
1N/A else if (TYPE(pp) == P_LBTREE)
1N/A isbigkey = B_TYPE(GET_BKEYDATA(pp, off)->type) != B_KEYDATA;
1N/A else
1N/A isbigkey = 0;
1N/A if (isbigkey)
1N/A for (cnt = 1; cnt <= 3; ++cnt) {
1N/A off = splitp + cnt * adjust;
1N/A if (off < (db_indx_t)NUM_ENT(pp) &&
1N/A ((TYPE(pp) == P_IBTREE &&
1N/A B_TYPE(GET_BINTERNAL(pp,off)->type) == B_KEYDATA) ||
1N/A B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)) {
1N/A splitp = off;
1N/A break;
1N/A }
1N/A if (splitp <= (db_indx_t)(cnt * adjust))
1N/A continue;
1N/A off = splitp - cnt * adjust;
1N/A if (TYPE(pp) == P_IBTREE ?
1N/A B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA :
1N/A B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA) {
1N/A splitp = off;
1N/A break;
1N/A }
1N/A }
1N/A
1N/A /*
1N/A * We can't split in the middle a set of duplicates. We know that
1N/A * no duplicate set can take up more than about 25% of the page,
1N/A * because that's the point where we push it off onto a duplicate
1N/A * page set. So, this loop can't be unbounded.
1N/A */
1N/A if (F_ISSET(dbp, DB_AM_DUP) && TYPE(pp) == P_LBTREE &&
1N/A pp->inp[splitp] == pp->inp[splitp - adjust])
1N/A for (cnt = 1;; ++cnt) {
1N/A off = splitp + cnt * adjust;
1N/A if (off < NUM_ENT(pp) &&
1N/A pp->inp[splitp] != pp->inp[off]) {
1N/A splitp = off;
1N/A break;
1N/A }
1N/A if (splitp <= (db_indx_t)(cnt * adjust))
1N/A continue;
1N/A off = splitp - cnt * adjust;
1N/A if (pp->inp[splitp] != pp->inp[off]) {
1N/A splitp = off + adjust;
1N/A break;
1N/A }
1N/A }
1N/A
1N/A
1N/A /* We're going to split at splitp. */
1N/A if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0)
1N/A return (ret);
1N/A if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
1N/A return (ret);
1N/A
1N/A *splitret = splitp;
1N/A return (0);
1N/A}
1N/A
1N/A/*
1N/A * __bam_copy --
1N/A * Copy a set of records from one page to another.
1N/A *
1N/A * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
1N/A */
1N/Aint
1N/A__bam_copy(dbp, pp, cp, nxt, stop)
1N/A DB *dbp;
1N/A PAGE *pp, *cp;
1N/A u_int32_t nxt, stop;
1N/A{
1N/A db_indx_t nbytes, off;
1N/A
1N/A /*
1N/A * Copy the rest of the data to the right page. Nxt is the next
1N/A * offset placed on the target page.
1N/A */
1N/A for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
1N/A switch (TYPE(pp)) {
1N/A case P_IBTREE:
1N/A if (B_TYPE(GET_BINTERNAL(pp, nxt)->type) == B_KEYDATA)
1N/A nbytes =
1N/A BINTERNAL_SIZE(GET_BINTERNAL(pp, nxt)->len);
1N/A else
1N/A nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
1N/A break;
1N/A case P_LBTREE:
1N/A /*
1N/A * If we're on a key and it's a duplicate, just copy
1N/A * the offset.
1N/A */
1N/A if (off != 0 && (nxt % P_INDX) == 0 &&
1N/A pp->inp[nxt] == pp->inp[nxt - P_INDX]) {
1N/A cp->inp[off] = cp->inp[off - P_INDX];
1N/A continue;
1N/A }
1N/A /* FALLTHROUGH */
1N/A case P_LRECNO:
1N/A if (B_TYPE(GET_BKEYDATA(pp, nxt)->type) == B_KEYDATA)
1N/A nbytes =
1N/A BKEYDATA_SIZE(GET_BKEYDATA(pp, nxt)->len);
1N/A else
1N/A nbytes = BOVERFLOW_SIZE;
1N/A break;
1N/A case P_IRECNO:
1N/A nbytes = RINTERNAL_SIZE;
1N/A break;
1N/A default:
1N/A return (__db_pgfmt(dbp, pp->pgno));
1N/A }
1N/A cp->inp[off] = HOFFSET(cp) -= nbytes;
1N/A memcpy(P_ENTRY(cp, off), P_ENTRY(pp, nxt), nbytes);
1N/A }
1N/A return (0);
1N/A}