stream.c revision 08ee25ae754a3b9ee3786f304f066ae14dc6e379
* Every dblk has an "attached mblk" as part of its constructed state. * The mblk is allocated by the dblk's constructor and remains attached * until the message is either dup'ed or pulled up. In the dupb() case * the mblk association doesn't matter until the last free, at which time * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects * the mblk association because it swaps the leading mblks of two messages, * so it is responsible for swapping their db_mblk pointers accordingly. * From a constructed-state viewpoint it doesn't matter that a dblk's * attached mblk can change while the message is allocated; all that * matters is that the dblk has *some* attached mblk when it's freed. * The sizes of the allocb() small-message caches are not magical. * They represent a good trade-off between internal and external * fragmentation for current workloads. They should be reevaluated * periodically, especially if allocations larger than DBLK_MAX_CACHE * become common. We use 64-byte alignment so that dblks don't * straddle cache lines unnecessarily. 16,
80,
144,
208,
272,
336,
528,
1040,
1488,
1936,
2576,
3920,
8192,
12112,
16384,
20304,
24576,
28496,
32768,
36688,
40960,
44880,
49152,
53072,
57344,
61264,
65536,
69456,
64,
128,
320,
576,
1088,
1536,
1984,
2624,
3968,
8192,
12160,
16384,
20352,
24576,
28544,
32768,
36736,
40960,
44928,
49152,
53120,
57344,
61312,
65536,
69504,
* We are in the middle of a page, dblk should * be allocated on the same page * buf size is multiple of page size, dblk and * buffer are allocated separately. /* Initialize Multidata caches */ * Reallocate a block for another use. Try hard to use the old block. * If the old data is wanted (copy), leave b_wptr at the end of the data, * otherwise return b_wptr = b_rptr. * This routine is private and unstable. * If the data is wanted and it will fit where it is, no /* XXX other mp state could be copied too, db_flags ... ? */ /* set credp and projid to be 'unspecified' before returning to cache */ /* Reset the struioflag and the checksum flag fields */ /* and the COOKED flag */ * atomic_add_32_nv() just decremented db_ref, so we no longer * have a reference to the dblk, which means another thread * could free it. Therefore we cannot examine the dblk to * determine whether ours was the last reference. Instead, * we extract the new and minimum reference counts from rtfu. * Note that all we're really saying is "if (ref != refmin)". * First-dup optimization. The enabling assumption is that there * can can never be a race (in correct code) to dup the first copy * of a message. Therefore we don't need to do it atomically. * If db_ref is maxed out we can't dup this message anymore. /* set credp and projid to be 'unspecified' before returning to cache */ * Generic esballoc used to implement the four flavors: [d]esballoc[a]. * Note that this is structured to allow the common case (i.e. * STREAMS flowtracing disabled) to call gesballoc() with tail * Same as esballoc() but sleeps waiting for memory. * Note that this is structured to allow the common case (i.e. * STREAMS flowtracing disabled) to call gesballoc() with tail * Note that this is structured to allow the common case (i.e. * STREAMS flowtracing disabled) to call gesballoc() with tail * Note that this is structured to allow the common case (i.e. * STREAMS flowtracing disabled) to call gesballoc() with tail * Note that this is structured to allow the common case (i.e. * STREAMS flowtracing disabled) to call gesballoc() with tail /* set credp and projid to be 'unspecified' before returning to cache */ /* set credp and projid to be 'unspecified' before returning to cache */ * This routine is consolidation private for STREAMS internal use * This routine may only be called from sync routines (i.e., not * from put or service procedures). It is located here (rather * than strsubr.c) so that we don't have to expose all of the * allocb() implementation details in header files. FTRACE_1(
"allocb_wait (NOSIG): mp=0x%lx",
* Call function 'func' with 'arg' when a class zero block can * be allocated with priority 'pri'. * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials * ioc_id, rval and error of the struct ioctl to set up an ioctl call. * This provides consistency for all internal allocators of ioctl. * Allocate enough space for any of the ioctl related messages. * Set the mblk_t information and ptrs correctly. * test if block of given size can be allocated with a request of * 'pri' is no longer used, but is retained for compatibility. * Call function 'func' with argument 'arg' when there is a reasonably * good chance that a block of size 'size' can be allocated. * 'pri' is no longer used, but is retained for compatibility. static long bid =
1;
/* always odd to save checking for zero */ * After bcp is linked into strbcalls and strbcall_lock is dropped there * should be no references to bcp since it may be freed by * runbufcalls(). Since bcp_id field is returned, we save its value in * add newly allocated stream event to existing * Cancel a bufcall request. * Duplicate a message block by block (uses dupb), returning * a pointer to the duplicate message. * Returns a non-NULL value only if the entire message * Copy data from message and data block to newly allocated message and * data block. Returns new message block pointer, or NULL if error. * The alignment of rptr (w.r.t. word alignment) will be the same in the copy * as in the original even when db_base is not word aligned. (bug 1052877) * Special handling for Multidata message; this should be * removed once a copy-callback routine is made available. /* See comments below on potential issues. */ * Well, here is a potential issue. If we are trying to * trace a flow, and we copy the message, we might lose * information about where this message might have been. * So we should inherit the FT data. On the other hand, * a user might be interested only in alloc to free data. * So I guess the real answer is to provide a tunable. * Copy data from message to newly allocated message using new * data blocks. Returns a pointer to the new message, or NULL if error. * link a message block to tail of message * unlink a message block from head of message * return pointer to new message. * NULL if message becomes empty. * remove a message block "bp" from message "mp" * Return pointer to new message or NULL if no message remains. * Return -1 if bp is not found in message. * Concatenate and align first len bytes of common * message type. Len == -1, means concat everything. * Returns 1 on success, 0 on failure * After the pullup, mp points to the pulled up data. * We won't handle Multidata message, since it contains * metadata which this function has no knowledge of; we * assert on DEBUG, and return failure otherwise. * If the length is less than that of the first mblk, * we want to pull up the message into an aligned mblk. * Though not part of the spec, some callers assume it. *
bp = *
mp;
/* swap mblks so bp heads the old msg... */ mp->
b_cont =
bp;
/* tack on whatever wasn't pulled up */ * Concatenate and align at least the first len bytes of common message * type. Len == -1 means concatenate everything. The original message is * unaltered. Returns a pointer to a new message on success, otherwise * We won't handle Multidata message, since it contains * metadata which this function has no knowledge of; we * assert on DEBUG, and return failure otherwise. * Copy all of the first msg type into one new mblk, then dupmsg * and link the rest onto this. ASSERT(n >= 0);
/* allow zero-length mblk_t's */ * Trim bytes from message * len > 0, trim from head * len < 0, trim from tail * Returns 1 on success, 0 on failure. * We won't handle Multidata message, since it contains * metadata which this function has no knowledge of; we * assert on DEBUG, and return failure otherwise. * If this is not the first zero length * Find the last message of same type * If this is not the first message * and we have taken away everything * from this message, remove it * get number of data bytes in message * Get a message off head of queue * If queue has no buffers then mark queue * with QWANTR. (queue wants to be read by * someone when data becomes available) * If there is something to take off then do so. * If queue falls below hi water mark turn off QFULL * flag. Decrement weighted count of queue. * Also turn off QWANTR because queue is being read. * The queue count is maintained on a per-band basis. * Priority band 0 (normal messages) uses q_count, * q_lowat, etc. Non-zero priority bands use the * fields in their respective qband structures * (qb_count, qb_lowat, etc.) All messages appear * on the same list, linked via their b_next pointers. * q_first is the head of the list. q_count does * not reflect the size of all the messages on the * queue. It only reflects those messages in the * normal band of flow. The one exception to this * deals with high priority messages. They are in * their own conceptual "band", but are accounted * If queue count is below the lo water mark and QWANTW * is set, enable the closest backq which has a service * procedure and turn off the QWANTW flag. * getq could be built on top of rmvq, but isn't because * of performance considerations. * A note on the use of q_count and q_mblkcnt: * q_count is the traditional byte count for messages that * have been put on a queue. Documentation tells us that * do. What was needed, however, is a mechanism to prevent * runaway streams from consuming all of the resources, * and particularly be able to flow control zero-length * messages. q_mblkcnt is used for this purpose. It * counts the number of mblk's that are being put on * the queue. The intention here, is that each mblk should * contain one byte of data and, for the purpose of * flow-control, logically does. A queue will become * full when EITHER of these values (q_count and q_mblkcnt) * reach the highwater mark. It will clear when BOTH * of them drop below the highwater mark. And it will * backenable when BOTH of them drop below the lowwater * to find a reasonably accurate q_count, and the * framework can still try and limit resource usage. * Inlined from qbackenable(). * Quick check without holding the lock. * Calculate number of data bytes in a single data message block taking * multidata messages into account. * Like getq() but does not backenable. This is used by the stream * head when a putback() is likely. The caller must call qbackenable() * after it is done with accessing the queue. /* freezestr should allow its caller to call getq/putq */ /* Get message byte count for q_count accounting */ * Determine if a backenable is needed after removing a message in the * NOTE: This routine assumes that something like getq_noenab() has been * For the read side it is ok to hold sd_lock across calling this (and the * stream head often does). * But for the write side strwakeq might be invoked and it acquires sd_lock. * Quick check without holding the lock. * OK since after getq() has lowered the q_count these flags * would not change unless either the qbackenable() is done by * another thread (which is ok) or the queue has gotten QFULL * in which case another backenable will take place when the queue /* freezestr should allow its caller to call getq/putq */ /* Have to drop the lock across strwakeq and backenable */ * Remove a message from a queue. The queue count and other * flow control parameters are adjusted and the back queue * rmvq can be called with the stream frozen, but other utility functions * holding QLOCK, and by streams modules without any locks/frozen. * qbackenable can handle a frozen stream but not a "random" * qlock being held. Drop lock across qbackenable. * Like rmvq() but without any backenabling. * This exists to handle SR_CONSOL_DATA in strrput(). /* Don't drop lock on exit */ if (
mp->
b_band != 0) {
/* Adjust band pointers */ * Remove the message from the list. /* Get the size of the message for q_count accounting */ if (
mp->
b_band == 0) {
/* Perform q_count accounting */ }
else {
/* Perform qb_count accounting */ * If flag is set, remove all messages. Otherwise, remove * only non-control messages. If queue falls below its low * water mark, and QWANTW is set, enable the nearest upstream * Historical note: when merging the M_FLUSH code in strrput with this * code one difference was discovered. flushq did not have a check * for q_lowat == 0 in the backenabling test. * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed * if one exists on the queue. unsigned char qbf[
NBAND];
/* band flushing backenable flags */ * If any band can now be written to, and there is a writer * for that band, then backenable the closest service procedure. * The real flushing takes place in flushq_common. This is done so that * a flag which specifies whether or not M_PCPROTO messages should be flushed * or not. Currently the only place that uses this flag is the stream head. * Flush the queue of messages of the given priority band. * There is some duplication of code between flushq and flushband. * This is because we want to optimize the code as much as possible. * The assumption is that there will be more messages in the normal * (priority 0) band than in any other. * Historical note: when merging the M_FLUSH code in strrput with this * code one difference was discovered. flushband had an extra check for * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0 * case. That check does not match the man page for flushband and was not * in the strrput flush code hence it was removed. * rmvq_noenab() and freemsg() are called for each mblk that * meets the criteria. The loop is executed until the last * mblk has been processed. * If any mblk(s) has been freed, we know that qbackenable() * will need to be called. * Return 1 if the queue is not full. If the queue is full, return * 0 (may not put message) and set QWANTW flag (caller wants to write /* this is for loopback transports, they should not do a canput */ /* Find next forward module that has a service procedure */ * This is the new canput for use with priority bands. Return 1 if the * band is not full. If the band is full, return 0 (may not put message) * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to /* Find next forward module that has a service procedure */ "bcanput:%p %X %d", q,
pri, 0);
* No band exists yet, so return success. "bcanput:%p %X %d", q,
pri,
1);
"bcanput:%p %X %d", q,
pri, 0);
"bcanput:%p %X %d", q,
pri,
1);
* Put a message on a queue. * Messages are enqueued on a priority basis. The priority classes * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0), * and B_NORMAL (type < QPCTL && band == 0). * Add appropriate weighted data block sizes to queue count. * If queue hits high water mark then set QFULL flag. * If QNOENAB is not set (putq is allowed to enable the queue), * enable the queue only if the message is PRIORITY, * or the QWANTR flag is set (indicating that the service procedure * is ready to read the queue. This implies that a service * procedure must NEVER put a high priority message back on its own * queue, as this would result in an infinite loop (!). * Make sanity checks and if qband structure is not yet bp->
b_band = 0;
/* force to be correct */ * The qband structure for this priority band is * not on the queue yet, so we have to allocate * one on the fly. It would be wasteful to * associate the qband structures with every * queue when the queues are allocated. This is * because most queues will only need the normal * band of flow which can be described entirely * If queue is empty, add the message and initialize the pointers. * Otherwise, adjust message pointers and queue pointers based on * the type of the message and where it belongs on the queue. Some * code is duplicated to minimize the number of conditionals and * hopefully minimize the amount of time this routine takes. }
else if (!
qbp) {
/* bp->b_band == 0 */ * If queue class of message is less than or equal to * that of the last one on the queue, tack on to the end. }
else {
/* bp->b_band != 0 */ * Insert bp after the last message in this band. * Tack bp on end of queue. /* Get message byte count for q_count accounting */ * Put stuff back at beginning of Q according to priority order. * See comment on putq above for details. * Make sanity checks and if qband structure is not yet bp->
b_band = 0;
/* force to be correct */ * If queue is empty or if message is high priority, * place on the front of the queue. }
else if (
qbp) {
/* bp->b_band != 0 */ * Insert bp before the first message in this band. * Tack bp on end of queue. }
else {
/* bp->b_band == 0 && !QPCTL */ * If the queue class or band is less than that of the last * message on the queue, tack bp on the end of the queue. /* Get message byte count for q_count accounting */ * Insert a message before an existing message on the queue. If the * existing message is NULL, the new messages is placed on the end of * the queue. The queue class of the new message is ignored. However, * the priority band of the new message must adhere to the following * emp->b_prev->b_band >= mp->b_band >= emp->b_band. * All flow control parameters are updated. * insq can be called with the stream frozen, but other utility functions * holding QLOCK, and by streams modules without any locks/frozen. /* Don't drop lock on exit */ mp->
b_band = 0;
/* force to be correct */ "insq: attempt to insert message out of order " /* Get mblk and byte count for q_count accounting */ if (
qbp) {
/* adjust qband pointers and count */ * Create and put a control message on queue. * Control message with a single-byte parameter * Return the queue upstream from this one * Send a block back up the queue in reverse from this * one (e.g. to respond to ioctls) * Streams Queue Scheduling * Queues are enabled through qenable() when they have messages to * process. They are serviced by queuerun(), which runs each enabled * queue's service procedure. The call to queuerun() is processor * dependent - the general principle is that it be run whenever a queue * is enabled but before returning to user level. For system calls, * the function runqueues() is called if their action causes a queue * to be enabled. For device interrupts, queuerun() should be * called before returning from the last level of interrupt. Beyond * this, no timing assumptions should be made about queue scheduling. * Enable a queue: put it on list of those whose service procedures are * ready to run and set up the scheduling mechanism. * The broadcast is done outside the mutex -> to avoid the woken thread * from contending with the mutex. This is OK 'cos the queue has been * enqueued on the runlist and flagged safely at this point. * Return number of messages on queue * noenable - set queue so that putq() will not enable it. * enableok - set queue so that putq() can enable it. * Performance concern, strwrite looks at the module below * the stream head for the maxpsz each time it does a write * we now cache it at the stream head. Check to see if this * queue is sitting directly below the stream head. * If the stream is not frozen drop the current QLOCK and * acquire the sd_wrq QLOCK which protects sd_qn_* * Performance concern, strwrite looks at the module below * the stream head for the maxpsz each time it does a write * we now cache it at the stream head. Check to see if this * queue is sitting directly below the stream head. * If the stream is not frozen drop the current QLOCK and * acquire the sd_wrq QLOCK which protects sd_qn_* * QWANTWSYNC or QWANTR or QWANTW, * deferred wakeup will be done. Also if strpoll() in progress then a * deferred pollwakeup will be done. * Plumbing may change while taking the type so store the * queue in a temporary variable. It doesn't matter even * if the we take the type from the previous plumbing, * that's because if the plumbing has changed when we were * holding the queue in a temporary variable, we can continue * processing the message the way it would have been processed * in the old plumbing, without any side effects but a bit * extra processing for partial ip header checksum. * This has been done to avoid holding the sd_lock which is * Either this mblk has already been processed * or there is no more room in this mblk (?). * A fault has occured and some bytes were moved to the * current mblk, the uio_t has already been updated by * the appropriate uio routine, so also update the mblk * to reflect this in case this same mblk chain is used * again (after the fault has been handled). * Try to enter queue synchronously. Any attempt to enter a closing queue will * fails. The qp->q_rwcnt keeps track of the number of successful entries so * that removeq() will not try to close the queue while a thread is inside the * Decrease the count of threads running in sync stream queue and wake up any * threads blocked in removeq(). * The purpose of rwnext() is to call the rw procedure of the next * (downstream) modules queue. * treated as put entrypoint for perimeter syncronization. * There's no need to grab sq_putlocks here (which only exist for CIPUT * sync queues). If it is CIPUT sync queue sq_count is incremented and it does * not matter if any regular put entrypoints have been already entered. We * can't increment one of the sq_putcounts (instead of sq_count) because * qwait_rw won't know which counter to decrement. * It would be reasonable to add the lockless FASTPUT logic. * Prevent q_next from changing by holding sd_lock until acquiring * SQLOCK. Note that a read-side rwnext from the streamhead will * already have sd_lock acquired. In either case sd_lock is always * released after acquiring SQLOCK. * The streamhead read-side holding sd_lock when calling rwnext is * required to prevent a race condition were M_DATA mblks flowing * up the read-side of the stream could be bypassed by a rwnext() * down-call. In this case sd_lock acts as the streamhead perimeter. * Not a synchronous module or no r/w procedure for this * queue, so just return EINVAL and let the caller handle it. * if this queue is being closed, return. * Wait until we can enter the inner perimeter. * Stream plumbing changed while waiting for inner perimeter * so just return EINVAL and let the caller handle it. * Note: The only message ordering guarantee that rwnext() makes is * for the write queue flow-control case. All others (r/w queue * with q_count > 0 (or q_first != 0)) are the resposibilty of * the queue's rw procedure. This could be genralized here buy * running the queue's service procedure, but that wouldn't be * the most efficent for all cases. * Write queue may be flow controlled. If so, * mark the queue for wakeup when it's not. * The queue is protected from being freed by sq_count, so it is * safe to call rwnext_exit and reacquire SQLOCK(sq). * The only purpose of this ASSERT is to preserve calling stack * Safe to always drop SQ_EXCL: * Not SQ_CIPUT means we set SQ_EXCL above * For SQ_CIPUT SQ_EXCL will only be set if the put procedure * did a qwriter(INNER) in which case nobody else * is in the inner perimeter and we are exiting. * I would like to make the following assertion: * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || * which indicates that if we are both putshared and exclusive, * we became exclusive while executing the putproc, and the only * claim on the syncq was the one we dropped a few lines above. * But other threads that enter putnext while the syncq is exclusive * need to make a claim as they may need to drop SQLOCK in the * has_writers case to avoid deadlocks. If these threads are * delayed or preempted, it is possible that the writer thread can * find out that there are other claims making the (sq_count == 0) * The purpose of infonext() is to call the info procedure of the next * (downstream) modules queue. * treated as put entrypoint for perimeter syncronization. * There's no need to grab sq_putlocks here (which only exist for CIPUT * sync queues). If it is CIPUT sync queue regular sq_count is incremented and * it does not matter if any regular put entrypoints have been already * Prevent q_next from changing by holding sd_lock until * Wait until we can enter the inner perimeter. * The only purpose of this ASSERT is to preserve calling stack * I am not certain the next comment is correct here. I need to consider * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT * might cause other problems. It just might be safer to drop it if * !SQ_CIPUT because that is when we set it. * Safe to always drop SQ_EXCL: * Not SQ_CIPUT means we set SQ_EXCL above * For SQ_CIPUT SQ_EXCL will only be set if the put procedure * did a qwriter(INNER) in which case nobody else * is in the inner perimeter and we are exiting. * I would like to make the following assertion: * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) || * which indicates that if we are both putshared and exclusive, * we became exclusive while executing the putproc, and the only * claim on the syncq was the one we dropped a few lines above. * But other threads that enter putnext while the syncq is exclusive * need to make a claim as they may need to drop SQLOCK in the * has_writers case to avoid deadlocks. If these threads are * delayed or preempted, it is possible that the writer thread can * find out that there are other claims making the (sq_count == 0) * Return nonzero if the queue is responsible for struio(), else return 0. * called by create_putlock. for (i = 0; i <=
1; i++) {
* putnext checks sq_ciputctrl without holding * SQLOCK. if it is not NULL putnext assumes * sq_nciputctrl is initialized. membar below * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for * starting from q and down to the driver. * This should be called after the affected queues are part of stream * geometry. It should be called from driver/module open routine after * qprocson() call. It is also called from nfs syscall where it is known that * stream is configured and won't change its geometry during create_putlock * caller normally uses 0 value for the stream argument to speed up MT putnext * into the perimeter of q for example because its perimeter is per module * caller normally uses non 0 value for the stream argument to hint the system * that the stream of q is a very contended global system stream * (e.g. NFS/UDP) and the part of the stream from q to the driver is * Caller insures stream plumbing won't happen while we are here and therefore * q_next can be safely used. * putnext checks sd_ciputctrl without holding * sd_lock. if it is not NULL putnext assumes * sd_nciputctrl is initialized. membar below * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows * Data currently record per event is a hrtime stamp, queue address, event * type, and a per type datum. Much of the STREAMS framework is instrumented * for automatic flow tracing (when enabled). Events can be defined and used * by STREAMS modules and drivers. * str_ftevent() - Add a flow-trace event to a dblk. * str_ftfree() - Free flow-trace data * fthdr_cache - pointer to the kmem cache for trace header. * ftblk_cache - pointer to the kmem cache for trace data blocks. int str_ftnever =
1;
/* Don't do STREAMS flow tracing */ * Tail doesn't have room, so need a new tail. * To make this MT safe, first, allocate a new * ftblk, and initialize it. To make life a * little easier, reserve the first slot (mostly * by making ix = 1). When we are finished with * the initialization, CAS this pointer to the * tail. If this succeeds, this is the new * "next" block. Otherwise, another thread * got here first, so free the block and start /* free up all flow data? */ * Just in case there is another thread about * to get the next index, we need to make sure * the value is there for it. * It is possible that the module info is broke * Instead of panicing or doing other unmentionables, * we shall put a dummy name as the mid, and continue. * Clear out the hash, have the tail point to itself, and free * any continuation blocks.