strsubr.c revision d3e55dcdc881b833a707e39ae1e12d8d5d35ad2d
* qrunflag was used previously to control background scheduling of queues. It * is not used anymore, but kept here in case some module still wants to access * it via qready() and setqsched macros. * Most of the streams scheduling is done via task queues. Task queues may fail * for non-sleep dispatches, so there are two backup threads servicing failed * requests for queues and syncqs. Both of these threads also service failed * dispatches freebs requests. Queues are put in the list specified by `qhead' * and `qtail' pointers, syncqs use `sqhead' and `sqtail' pointers and freebs * requests are put into `freebs_list' which has no tail pointer. All three * lists are protected by a single `service_queue' lock and use * `services_to_run' condition variable for signaling background threads. Use of * a single lock should not be a problem because it is only used under heavy * loads when task queues start to fail and at that time it may be a good idea * to throttle scheduling requests. * NOTE: queues and syncqs should be scheduled by two separate threads because * queue servicing may be blocked waiting for a syncq which may be also * scheduled for background execution. This may create a deadlock when only one * thread is used for both. * List of queues scheduled for background processing dueue to lack of resources * in the task queues. Protected by service_queue lock; * Backup threads for servicing queues and syncqs * Bufcalls related variables. * run_queues is no longer used, but is kept in case some 3-d party * sq_max_size is the depth of the syncq (in number of messages) before * qfill_syncq() starts QFULL'ing destination queues. As its primary * consumer - IP is no longer D_MTPERMOD, but there may be other * choose a large number as the default value. For potential * performance gain, this value is tunable in /etc/system. * the number of ciputctrl structures per syncq and stream we create when * if n_ciputctrl is < min_n_ciputctrl don't even create ciputctrl_cache. * ======================== * For drivers/modules that use PERMOD or outer syncqs we keep a list of * perdm structures, new entries being added (and new syncqs allocated) when * setq() encounters a module/driver with a streamtab that it hasn't seen * The reason for this mechanism is that some modules and drivers share a * common streamtab and it is necessary for those modules and drivers to also * share a common PERMOD syncq. * perdm_list --> dm_str == streamtab_1 * dm_next --> dm_str == streamtab_2 * The dm_ref field is incremented for each new driver/module that takes * a reference to the perdm structure and hence shares the syncq. * References are held in the fmodsw_impl_t structure for each STREAMS module * or the dev_impl array (indexed by device major number) for each driver. * perdm_list -> [dm_ref == 1] -> [dm_ref == 2] -> [dm_ref == 1] -> NULL * dev_impl: ...|x|y|... module A module B * When a module/driver is unloaded the reference count is decremented and, * when it falls to zero, the perdm structure is removed from the list and * the syncq is freed (see rele_dm()). /* global esballoc throttling queue */ * esballoc tunable parameters. * routines to handle esballoc queuing. * Qinit structure and Module_info structures * for passthru read and write queues * Special form of assertion: verify that X implies Y i.e. when X is true Y * Logical equivalence. Verify that both X and Y are either TRUE or FALSE. * Verify correctness of list head/tail pointers. * Enqueue a list element `el' in the end of a list denoted by `head' and `tail' * Dequeue the first element of the list denoted by `head' and `tail' pointers * using a `link' field and put result into `el'. * Remove `el' from the list using `chase' and `curr' pointers and return result /* Handling of delayed messages on the inner syncq. */ * DEBUG versions should use function versions (to simplify tracing) and * non-DEBUG kernels should use macro versions. * Put a queue on the syncq list of queues. /* The queue should not be linked anywhere */ \
/* Head and tail may only be NULL simultaneously */ \
/* Queue may be only enqueyed on its syncq */ \
/* Check the correctness of SQ_MESSAGES flag */ \
/* Sanity check first/last elements of the list */ \
* Sanity check of priority field: empty queue should \ * and nqueues equal to zero. \ /* Sanity check of sq_nqueues field */ \
* Put this queue in priority order: higher \ * priority gets closer to the head. \ * Remove a queue from the syncq list /* Check that the queue is actually in the list */ \
/* First queue on list, make head q_sqnext */ \
/* Make prev->next == next */ \
/* Last queue on list, make tail sqprev */ \
/* Make next->prev == prev */ \
/* clear out references on this queue */ \
/* If there is nothing queued, clear SQ_MESSAGES */ \
/* Hide the definition from the header file. */ * Put a message on the queue syncq. for (i = 0; i <=
nlocks; i++) { \
for (i = 0; i <=
nlocks; i++) { \
* Run service procedures for all queues in the stream head. * Init routine run from main at boot time. panic(
"strinit: no memory for streams taskq!");
* TPI support routine initialisation. * Handle to have autopush and persistent link information per * Note: uses shutdown hook instead of destroy hook so that the * persistent links can be torn down before the destroy hooks * in the TCP/IP stack are called. /* Have to hold sd_lock to prevent siglist from changing */ * Send the "sevent" set of signals to a process. * This might send more than one signal if the process is registered * for multiple events. The caller should pass in an sevent that only * includes the events for which the process has registered. "strsendsig:proc %p info %p",
proc,
info);
"strsendsig:proc %p info %p",
proc,
info);
"strsendsig:proc %p info %p",
proc,
info);
"strsendsig:proc %p info %p",
proc,
info);
"strsendsig:proc %p info %p",
proc,
info);
"strsendsig:proc %p info %p",
proc,
info);
* registered on the given signal list that want a signal for at * least one of the specified events. * Must be called with exclusive access to siglist (caller holding sd_lock). * sd_lock and the ioctl code maintains a PID_HOLD on the pid structure * while it is in the siglist. * For performance reasons (MP scalability) the code drops pidlock * when sending signals to a single process. * When sending to a process group the code holds * pidlock to prevent the membership in the process group from changing * while walking the p_pglink list. /* pid was released but still on event list */ * XXX This unfortunately still generates * a signal when a fd is closed but * Send to process group. Hold pidlock across * Attach a stream device or module. * qp is a read queue; the new queue goes in so its next * read ptr is the argument, and the write queue corresponding * to the argument points to this queue. Return 0 on success, * or a non-zero errno on failure. * stash away a pointer to the module structure so we can /* create perdm_t if needed */ /* setq might sleep in allocator - avoid holding locks. */ * Before calling the module's open routine, set up the q_next * pointer for inserting a module in the middle of a stream. * Note that we can always set _QINSERTING and set up q_next * pointer for both inserting and pushing a module. Then there * is no need for the is_insert parameter. In insertq(), called * by qprocson(), assume that q_next of the new module always points * to the correct queue and use it for insertion. Everything should * work out fine. But in the first release of _I_INSERT, we * distinguish between inserting and pushing to make sure that * pushing a module follows the same code path as before. * If there is an outer perimeter get exclusive access during * the open procedure. Bump up the reference count on the queue. * Handle second open of stream. For modules, set the * last argument to MODOPEN and do not pass any open flags. * Ignore dummydev since this is not the first open. * successful open should have done qprocson() * Detach a stream module or device. * If clmode == 1 then the module or driver was opened and its * close routine must be called. If clmode == 0, the module * or driver was never opened or the open failed, and so its close * Make sure that all the messages on the write side syncq are * processed and nothing is left. Since we are closing, no new * messages may appear there. * Check that qprocsoff() was actually called. * Allow any threads blocked in entersq to proceed and discover * Note: This assumes that all users of entersq check QWCLOSE. * Currently runservice is the only entersq that can happen * after removeq has finished. * Removeq will have discarded all messages destined to the closing * pair of queues from the syncq. * NOTE: Calling a function inside an assert is unconventional. * However, it does not cause any problem since flush_syncq() does * not change any state except when it returns non-zero i.e. * when the assert will trigger. /* release any fmodsw_impl_t structure held on behalf of the queue */ /* freeq removes us from the outer perimeter if any */ /* Prevent service procedures from being called */ /* allow service procedures to be called again */ * Only reset QENAB if the queue was removed from the runlist. * A queue goes through 3 stages: * It is on the service list and QENAB is set. * It is removed from the service list but QENAB is still set. * QENAB gets changed to QINSERVICE. * QINSERVICE is reset (when the service procedure is done) * Thus we can not reset QENAB unless we actually removed it from the service * wait for any pending service processing to complete. * The removal of queues from the runlist is not atomic with the * clearing of the QENABLED flag and setting the INSERVICE flag. * consequently it is possible for remove_runlist in strclose * to not find the queue on the runlist but for it to be QENABLED * and not yet INSERVICE -> hence wait_svc needs to check QENABLED * Wait till the syncqs associated with the queue * will dissapear from background processing list. * This only needs to be done for non-PERMOD perimeters since * for PERMOD perimeters the syncq may be shared and will only be freed * If for PERMOD perimeters queue was on the syncq list, removeq() * should call propagate_syncq() or drain_syncq() for it. Both of these * function remove the queue from its syncq list, so sqthread will not * try to access the queue. * Disable rsq and wsq and wait for any background processing of * Put ioctl data from userland buffer `arg' into the mblk chain `bp'. * `flag' must always contain either K_TO_K or U_TO_K; STR_NOSIG may * also be set, and is passed through to allocb_cred_wait(). * Returns errno on failure, zero on success. * strdoioctl validates ioc_count, so if this assert fails it * cannot be due to user error. * Copy ioctl data to user-land. Return non-zero errno on failure, * Allocate a linkinfo entry given the write queue of the * bottom module of the top stream and the write queue of the * stream head of the bottom stream. ASSERT(
lnk_id != 0);
/* this should never wrap in practice */ * Check for a potential linking cycle. * Return 1 if a link will result in a cycle, * if the lower stream is a pipe/FIFO, return, since link * If ep->me_nodep is a FIFO (me_nodep == NULL), * ignore the edge and move on. ep->me_nodep gets * set to NULL in mux_addedge() if it is a FIFO. * Find linkinfo entry corresponding to the parameters. * Given a queue ptr, follow the chain of q_next pointers until you reach the * last queue on the chain and return it. * wait for the syncq count to drop to zero. * sq could be either outer or inner. * Wait while there are any messages for the queue in its syncq. * Test for invalid upper stream * Test for invalid lower stream. * The check for the v_type != VFIFO and having a major * number not >= devcnt is done to avoid problems with * adding mux_node entry past the end of mux_nodes[]. * For FIFO's we don't add an entry so this isn't a * STRPLUMB protects plumbing changes and should be set before * link_addpassthru()/link_rempassthru() are called, so it is set here * and cleared in the end of mlink when passthru queue is removed. * Setting of STRPLUMB prevents reopens of the stream while passthru * queue is in-place (it is not a proper module and doesn't have open * STPLEX prevents any threads from entering the stream from above. It * can't be set before the call to link_addpassthru() because putnext * from below may cause stream head I/O routines to be called and these * routines assert that STPLEX is not set. After link_addpassthru() * nothing may come from below since the pass queue syncq is blocked. * Note also that STPLEX should be cleared before the call to * link_remmpassthru() since when messages start flowing to the stream * head (e.g. because of message propagation from the pass queue) stream * head I/O routines may be called with STPLEX flag set. * When STPLEX is set, nothing may come into the stream from above and * it is safe to do a setq which will change stream head. So, the * correct sequence of actions is: * 2) Call link_addpassthru() * 4) Call setq and update the stream state * 6) Call link_rempassthru() * The same sequence applies to munlink() code. * Add passthru queue below lower mux. This will block * There may be messages in the streamhead's syncq due to messages * that arrived before link_addpassthru() was done. To avoid * background processing of the syncq happening simultaneous with * setq processing, we disable the streamhead syncq and wait until * existing background thread finishes working on it. /* setq might sleep in allocator - avoid holding locks. */ /* Note: we are holding muxifier here. */ /* create perdm_t if needed */ * XXX Remove any "odd" messages from the queue. * Keep only M_DATA, M_PROTO, M_PCPROTO. * Restore the stream head queue and then remove * the passq. Turn off STPLEX before we turn on * the stream by removing the passq. /* Wakeup anyone waiting for STRPLUMB to clear. */ * if we've made it here the linkage is all set up so we should also * set up the layered driver linkages * Mark the upper stream as having dependent links * so that strclose can clean it up. * Wake up any other processes that may have been * waiting on the lower stream. These will all /* The passthru module is removed so we may release STRPLUMB */ * Unlink a multiplexor link. Stp is the controlling stream for the * link, and linkp points to the link's entry in the linkinfo list. * The muxifier lock must be held on entry and is dropped on exit. * NOTE : Currently it is assumed that mux would process all the messages * sitting on it's queue before ACKing the UNLINK. It is the responsibility * of the mux to handle all the messages that arrive before UNLINK. * If the mux has to send down messages on its lower stream before * ACKing I_UNLINK, then it *should* know to handle messages even * after the UNLINK is acked (actually it should be able to handle till we * re-block the read side of the pass queue here). If the mux does not * open up the lower stream, any messages that arrive during UNLINK * will be put in the stream head. In the case of lower stream opening * up, some messages might land in the stream head depending on when * the message arrived and when the read side of the pass queue was * Add passthru queue below lower mux. This will block * If there was an error and this is not called via strclose, * return to the user. Otherwise, pretend there was no error "unlink ioctl, closing anyway (%d)\n",
error);
* We go ahead and drop muxifier here--it's a nasty global lock that * can slow others down. It's okay to since attempts to mlink() this * stream will be stopped because STPLEX is still set in the stdata * structure, and munlink() is stopped because mux_rmvedge() and * lbfree() have removed it from mux_nodes[] and linkinfo_list, * respectively. Note that we defer the closef() of fpdown until * after we drop muxifier since strclose() can call munlinkall(). * Get rid of outstanding service procedure runs, before we make * it a stream head, since a stream head doesn't have any service * Since we don't disable the syncq for QPERMOD, we wait for whatever * is queued up to be finished. mux should take care that nothing is * send down to this queue. We should do it now as we're going to block * passyncq if it was unblocked. * Messages could be flowing from underneath. We will * block the read side of the passq. This would be * sufficient for QPAIR and QPERQ muxes to ensure * that no data is flowing up into this queue * and hence no thread active in this instance of * lower mux. But for QPERMOD and QMTOUTPERIM there * syncqs respectively. We will wait for them to drain. * Because passq is blocked messages end up in the syncq * And qfill_syncq could possibly end up setting QFULL * which will access the rq->q_flag. Hence, we have to * acquire the QLOCK in setq. * XXX Messages can also flow from top into this * queue though the unlink is over (Ex. some instance * in putnext() called from top that has still not * accessed this queue. And also putq(lowerq) ?). * Solution : How about blocking the l_qtop queue ? * Do we really care about such pure D_MP muxes ? * We have to just wait for the outer sq_count * drop to zero. As this does not prevent new * messages to enter the outer perimeter, this * is subject to starvation. * NOTE :Because of blocksq above, messages could * be in the inner syncq only because of some * thread holding the outer perimeter exclusively. * Hence it would be sufficient to wait for the * exclusive holder of the outer perimeter to drain * the inner and outer syncqs. But we will not depend * on this feature and hence check the inner syncqs * There could be messages destined for * this queue. Let the exclusive holder * We haven't taken care of QPERMOD case yet. QPERMOD is a special * case as we don't disable its syncq or remove it off the syncq * flush_syncq changes states only when there is some messages to * free. ie when it returns non-zero value to return. * No body else should know about this queue now. * If the mux did not process the messages before * acking the I_UNLINK, free them now. * Convert the mux lower queue into a stream head queue. * Turn off STPLEX before we turn on the stream by removing the passq. * Now it is a proper stream, so STPLEX is cleared. But STRPLUMB still * needs to be set to prevent reopen() of the stream - such reopen may * try to call non-existent pass queue open routine and panic. /* clean up the layered driver linkages */ * Now all plumbing changes are finished and STRPLUMB is no * Unlink all multiplexor links for which stp is the controlling stream. * Return 0, or a non-zero errno on failure. * munlink() releases the muxifier lock. * A multiplexor link has been made. Add an * edge to the directed graph. * Save the dev_t for the purposes of str_stack_shutdown. * str_stack_shutdown assumes that the device allows reopen, since * this dev_t is the one after any cloning by xx_open(). * Would prefer finding the dev_t from before any cloning, * but specfs doesn't retain that. * A multiplexor link has been removed. Remove the * edge in the directed graph. ASSERT(0);
/* should not reach here */ * Translate the device flags (from conf.h) to the corresponding * qflag and sq_flag (type) values. /* Inner perimeter presence and scope */ /* Inner perimeter modifiers */ * The code in putnext assumes that it has the * highest concurrency by not checking sq_count. * Thus _D_MTOCSHARED can only be supported when * The code in putnext assumes that it has the * highest concurrency by not checking sq_count. * Thus _D_MTCBSHARED can only be supported when * The code in putnext assumes that it has the * highest concurrency by not checking sq_count. * Thus _D_MTSVCSHARED can only be supported when * D_MTPUTSHARED is set. Also _D_MTSVCSHARED is * supported only for QPERMOD. /* Default outer perimeter concurrency */ /* Outer perimeter modifiers */ /* Synchronous Streams extended qinit structure */ * Private flag used by a transport module to indicate * to sockfs that it supports direct-access mode without * having to go through STREAMS. /* Reject unless the module is fully-MT (no perimeter) */ "stropen: bad MT flags (0x%x) in driver '%s'",
* Set the interface values for a pair of queues (qinit structure, * packet sizes, water marks). * setq assumes that the caller does not have a claim (entersq or claimq) * Create syncqs based on qflag and sqtype. Set the SQ_TYPES_IN_FLAGS * bits in sq_flag based on the sqtype. * We are making sq_svcflags zero, * resetting SQ_DISABLED in case it was set by * wait_svc() in the munlink path. * We need to acquire the lock here for the mlink and munlink case, * where canputnext, backenable, etc can access the q_flag. /* Allocate a separate syncq for the write side */ * Assert that we do have an inner perimeter syncq and that it * does not have an outer perimeter associated with it. * Initialize struio() types. if (p->
dm_str ==
str) {
/* already present */ * Wait for any background processing that relies on the * syncq to complete before it is freed. * Make a protocol message given control and data buffers. * n.b., this can block; be careful of what locks you hold when calling it. * If sd_maxblk is less than *iosize this routine can fail part way through * (due to an allocation failure). In this case on return *iosize will contain * the amount that was consumed. Otherwise *iosize will not be modified * i.e. it will contain the amount that was consumed. /* Create control part, if any */ /* Create data part, if any */ * Make the control part of a protocol message given a control buffer. * n.b., this can block; be careful of what locks you hold when calling it. * Create control part of message, if any. * blocks by increasing the size to something more usable. * Range checking has already been done; simply try * to allocate a message block for the ctl part. * Make a protocol message given data buffers. * n.b., this can block; be careful of what locks you hold when calling it. * If sd_maxblk is less than *iosize this routine can fail part way through * (due to an allocation failure). In this case on return *iosize will contain * the amount that was consumed. Otherwise *iosize will not be modified * i.e. it will contain the amount that was consumed. * Create data part of message, if any. * Setup the stream uio portion of the * dblk for subsequent use by struioget(). * Wait for a buffer to become available. Return non-zero errno * if not able to wait, 0 if buffer is probably there. * This function waits for a read or write event to happen on a stream. * fmode can specify FNDELAY and/or FNONBLOCK. * The timeout is in ms with -1 meaning infinite. * The flag values work as follows: * READWAIT Check for read side errors, send M_READ * GETWAIT Check for read side errors, no M_READ * WRITEWAIT Check for write side errors. * NOINTR Do not return error if nonblocking or timeout. * STR_NOERROR Ignore all errors except STPLEX. * STR_NOSIG Ignore/hold signals during the duration of the call. * STR_PEEK Pass through the strgeterr(). * A strwakeq() is pending, no need to sleep. * Check for errors before going to sleep since the * caller might not have checked this while holding * If any module downstream has requested read notification * by setting SNDMREAD flag using M_SETOPTS, send a message * Send the number of bytes requested by the * read as the argument to M_READ. * If any data arrived due to inline processing * of putnext(), don't sleep. "strwaitq sleeps (2):%p, %X, %lX, %X, %p",
"strwaitq awakes(2):%X, %X, %X, %X, %X",
"strwaitq interrupt #2:%p, %X, %lX, %X, %p",
"strwaitq timeout:%p, %X, %lX, %X, %p",
* If the caller implements delayed errors (i.e. queued after data) * we can not check for errors here since data as well as an * error might have arrived at the stream head. We return to * have the caller check the read queue before checking for errors. * Perform job control discipline access checks. * Return 0 for success and the errno for failure. * If this is not the calling process's controlling terminal * or if the calling process is already in the foreground * Check to see if controlling terminal has been deallocated. }
else {
/* mode == JCWRITE or JCSETP */ * We call cv_wait_sig_swap() to cause the appropriate * action for the jobcontrol signal to take place. * If the signal is being caught, we will take the * EINTR error return. Otherwise, the default action * of causing the process to stop will take place. * In this case, we rely on the periodic cv_broadcast() on * &lbolt_cv to wake us up to loop around and test again. * We can't get here if the signal is ignored or * if the current thread is blocking the signal. * Return size of message of block type (bp->b_datap->db_type) * Allocate a stream head. * Allocate a pair of queues and a syncq for the pair * Free a pair of queues and the "attached" syncq. * Discard any messages left on the syncq(s), remove the syncq(s) from the * outer perimeter, and free the syncq(s) if they are not the "attached" syncq. * If a previously dispatched taskq job is scheduled to run * sync_service() or a service routine is scheduled for the * queues about to be freed, wait here until all service is * done on the queue and all associated queues and syncqs. * Flush the queues before q_next is set to NULL This is needed * in order to backenable any downstream queue before we go away. * Note: we are already removed from the stream so that the * backenabling will not cause any messages to be delivered to our /* Tidy up - removeq only does a half-remove from stream */ * Free any syncqs that are outside what allocq returned. /* NOTE: Uncomment the assert below once bugid 1159635 is fixed. */ /* ASSERT((qp->q_flag & QWANTW) == 0 && (wqp->q_flag & QWANTW) == 0); */ * Allocate a qband structure. * Free a qband structure. * Just like putnextctl(9F), except that allocb_wait() is used. * Consolidation Private, and of course only callable from the stream head or * routines that may block. * run any possible bufcalls. * count how many events are on the list * now so we can check to avoid looping * in low memory situations * get estimate of available memory from kmem_avail(). * awake all bufcall functions waiting for * memory whose request could be satisfied * by 'count' memory and let 'em fight for it. * too big, try again later - note * that nevent was decremented above * so we won't retry this one on this * actually run queue's service routine. "runservice starts:%p", q);
"runservice ends:(%p)", q);
* Wakeup thread waiting for the service procedure * to be run (strclose and qdetach). * Background processing of bufcalls. "streams_bufcall_service");
/* Wait for memory to become available */ /* Wait for new work to arrive */ * Background processing of streams background tasks which failed "streams_bkgrnd_service");
* Wait for work to arrive. * Handle all pending freebs requests to free memory. * Background processing of streams background tasks which failed "streams_sqbkgrnd_service");
* Wait for work to arrive. * Disable the syncq and wait for background syncq processing to complete. * If the syncq is placed on the sqhead/sqtail queue, try to remove it from the * Put a syncq on the list of syncq's to be serviced by the sqthread. * Add the argument to the end of the sqhead list and set the flag * indicating this syncq has been enabled. If it has already been * enabled, don't do anything. * This routine assumes that SQLOCK is held. * NOTE that the lock order is to have the SQLOCK first, * so if the service_syncq lock is held, we need to release it * before aquiring the SQLOCK (mostly relevant for the background * thread, and this seems to be common among the STREAMS global locks). * Note the the sq_svcflags are protected by the SQLOCK. * This is probably not important except for where I believe it * is being called. At that point, it should be held (and it * is a pain to release it just for this routine, so don't do * Do not put on list if background thread is scheduled or * Check whether we should enable sq at all. * Non PERMOD syncqs may be drained by at most one thread. * PERMOD syncqs may be drained by several threads but we limit the * total amount to the lesser of * Number of queues on the squeue and /* Attempt a taskq dispatch */ * This taskq dispatch failed, but a previous one may have succeeded. * Don't try to schedule on the background thread whilst there is * outstanding taskq processing. * System is low on resources and can't perform a non-sleeping * dispatch. Schedule the syncq for a background thread and mark the * syncq to avoid any further taskq dispatch attempts. * Note: fifo_close() depends on the mblk_t on the queue being freed * asynchronously. The asynchronous freeing of messages breaks the * recursive call chain of fifo_close() while there are I_SENDFD type of * messages refering other file pointers on the queue. Then when * closing pipes it can avoid stack overflow in case of daisy-chained * pipes, and also avoid deadlock in case of fifonode_t pairs (which * share the same fifolock_t). * Check data sanity. The dblock should have non-empty free function. * It is better to panic here then later when the dblock is freed * asynchronously when the context is lost. panic(
"freebs_enqueue: dblock %p has a NULL free callback",
/* queue the new mblk on the esballoc queue */ /* If we're the first thread to reach the threshold, process */ * Detach the message chain for processing. * Process the message chain. * taskq callback routine to free esballoced mblk's * System is low on resources and can't perform a non-sleeping * dispatch. Schedule for a background thread. * Set the QBACK or QB_BACK flag in the given queue for * the given priority band. "setqback: can't allocate qband\n");
* strsignal_nolock() posts a signal to the process(es) at the stream head. * It assumes that the stream head lock is already held, whereas strsignal() * acquires the lock first. This routine was created because a few callers * release the stream head lock before calling only to re-acquire it after * Backenable the first queue upstream from `q' with a service procedure. * our presence might not prevent other modules in our own * have a claim on the queue (some drivers do a getq on somebody * else's queue - they know that the queue itself is not going away * but the framework has to guarantee q_next in that stream.) /* find nearest back queue with service proc */ * backenable can be called either with no locks held * or with the stream frozen (the latter occurs when a module * calls rmvq with the stream frozen.) If the stream is frozen * by the caller the caller will hold all qlocks in the stream. * Note that a frozen stream doesn't freeze a mated stream, * so we explicitly check for that. * Return the appropriate errno when one of flags_to_check is set * in sd_flags. Uses the exported error routines if they are set. * Will return 0 if non error is set (or if the exported error routines * do not return an error). * If there is both a read and write error to check we prefer the read error. * Also, give preference to recorded errno's over the error functions. * The flags that are handled are: * STRDERR return sd_rerror (and clear if STRDERRNONPERSIST) * STWRERR return sd_werror (and clear if STWRERRNONPERSIST) * STRHUP return sd_werror * If the caller indicates that the operation is a peek a nonpersistent error * Read errors are non-persistent i.e. discarded once * returned to a non-peeking caller, * Write errors are non-persistent i.e. discarded once * returned to a non-peeking caller, /* sd_werror set when STRHUP */ * for twisted streams also * Complete the plumbing operation associated with stream `stp'. * This describes how the STREAMS framework handles synchronization * The key interfaces for open and close are qprocson and qprocsoff, * respectively. While the close case in general is harder both open * have close have significant similarities. * During close the STREAMS framework has to both ensure that there * are no stale references to the queue pair (and syncq) that * are being closed and also provide the guarantees that are documented * If there are stale references to the queue that is closing it can * result in kernel memory corruption or kernel panics. * Note that is it up to the module/driver to ensure that it itself * does not have any stale references to the closing queues once its close * routine returns. This includes: * associated with the queues. For timeout and bufcall callbacks the * module/driver also has to ensure (or wait for) any callbacks that * - If the module/driver is using esballoc it has to ensure that any * esballoc free functions do not refer to a queue that has closed. * (Note that in general the close routine can not wait for the esballoc'ed * messages to be freed since that can cause a deadlock.) * - Cancelling any interrupts that refer to the closing queues and * also ensuring that there are no interrupts in progress that will * refer to the closing queues once the close routine returns. * - For multiplexors removing any driver global state that refers to * the closing queue and also ensuring that there are no threads in * the multiplexor that has picked up a queue pointer but not yet * In addition, a driver/module can only reference the q_next pointer * in its open, close, put, or service procedures or in a * stream. Thus it can not reference the q_next pointer in an interrupt * routine or a timeout, bufcall or esballoc callback routine. Likewise * it can not reference q_next of a different queue e.g. in a mux that * passes messages from one queues put/service procedure to another queue. * In all the cases when the driver/module can not access the q_next * field it must use the *next* versions e.g. canputnext instead of * canput(q->q_next) and putnextctl instead of putctl(q->q_next, ...). * Assuming that the driver/module conforms to the above constraints * the STREAMS framework has to avoid stale references to q_next for all * the framework internal cases which include (but are not limited to): * - Messages on a syncq that have a reference to the queue through b_queue. * - Messages on an outer perimeter (syncq) that have a reference to the * - Threads that use q_nfsrv (e.g. canput) to find a queue. * Note that only canput and bcanput use q_nfsrv without any locking. * The STREAMS framework providing the qprocsoff(9F) guarantees means that * after qprocsoff returns, the framework has to ensure that no threads can * enter the put or service routines for the closing read or write-side queue. * In addition to preventing "direct" entry into the put procedures * the framework also has to prevent messages being drained from * the syncq or the outer perimeter. * XXX Note that currently qdetach does relies on D_MTOCEXCL as the only * mechanism to prevent qwriter(PERIM_OUTER) from running after * qprocsoff has returned. * Note that if a module/driver uses put(9F) on one of its own queues * it is up to the module/driver to ensure that the put() doesn't * get called when the queue is closing. * The framework aspects of the above "contract" is implemented by * qprocsoff, removeq, and strlock: * - qprocsoff (disable_svc) sets QWCLOSE to prevent runservice from * entering the service procedures. * - strlock acquires the sd_lock and sd_reflock to prevent putnext, * canputnext, backenable etc from dereferencing the q_next that will * - strlock waits for sd_refcnt to be zero to wait for e.g. any canputnext * - optionally for every syncq in the stream strlock acquires all the * sq_lock's and waits for all sq_counts to drop to a value that indicates * that no thread executes in the put or service procedures and that no * thread is draining into the module/driver. This ensures that no * currently executing hence no such thread can end up with the old stale * - qdetach (wait_svc) makes sure that any scheduled or running threads * have either finished or observed the QWCLOSE flag and gone away. * Get all the locks necessary to change q_next. * Wait for sd_refcnt to reach 0 and, if sqlist is present, wait for the * sq_count of each syncq in the list to drop to sq_rmqcount, indicating that * the only threads inside the sqncq are threads currently calling removeq(). * Since threads calling removeq() are in the process of removing their queues * from the stream, we do not need to worry about them accessing a stale q_next * pointer and thus we do not need to wait for them to exit (in fact, waiting * for them can cause deadlock). * This routine is subject to starvation since it does not set any flag to * prevent threads from entering a module in the stream(i.e. sq_count can * increase on some syncq while it is waiting on some other syncq.) * Assumes that only one thread attempts to call strlock for a given * stream. If this is not the case the two threads would deadlock. * This assumption is guaranteed since strlock is only called by insertq * and removeq and streams plumbing changes are single-threaded for * a given stream using the STWOPEN, STRCLOSE, and STRPLUMB flags. * For pipes, it is not difficult to atomically designate a pair of streams * to be mated. Once mated atomically by the framework the twisted pair remain * configured that way until dismantled atomically by the framework. * When plumbing takes place on a twisted stream it is necessary to ensure that * this operation is done exclusively on the twisted stream since two such * operations, each initiated on different ends of the pipe will deadlock * waiting for each other to complete. * On entry, no locks should be held. * The locks acquired and held by strlock depends on a few factors. * - If sqlist is non-NULL all the syncq locks in the sqlist will be acquired * and held on exit and all sq_count are at an acceptable level. * - In all cases, sd_lock and sd_reflock are acquired and held on exit with * Wait for any claimstr to go away. * Note that the selection of locking order is not * important, just that they are always aquired in * the same order. To assure this, we choose this * order based on the value of the pointer, and since * the pointer will not change for the life of this * pair, we will always grab the locks in the same * order (and hence, prevent deadlocks). /* Failed - drop all locks that we have acquired so far */ * The wait loop below may starve when there are many threads * claiming the syncq. This is especially a problem with permod * syncqs (IP). To lessen the impact of the problem we increment * sq_needexcl and clear fastbits so that putnexts will slow * down and call sqenable instead of draining right away. * Drop all the locks that strlock acquired. * When the module has service procedure, we need check if the next * module which has service procedure is in flow control to trigger * Given two read queues, insert a new single one after another. * This routine acquires all the necessary locks in order to change * q_next and related pointer using strlock(). * It depends on the stream head ensuring that there are no concurrent * insertq or removeq on the same stream. The stream head ensures this * using the flags STWOPEN, STRCLOSE, and STRPLUMB. * Note that no syncq locks are held during the q_next change. This is * applied to all streams since, unlike removeq, there is no problem of stale * pointers when adding a module to the stream. Thus drivers/modules that do a * canput(rq->q_next) would never get a closed/freed queue pointer even if we * applied this optimization to all streams. * set_nfsrv_ptr() needs to know if this is an insertion or not, * so only reset this flag after calling it. /* The QEND flag might have to be updated for the upstream guy */ * If this was a module insertion, bump the push count. /* check if the write Q needs backenable */ /* check if the read Q needs backenable */ * Given a read queue, unlink it from any neighbors. * This routine acquires all the necessary locks in order to * change q_next and related pointers and also guard against * stale references (e.g. through q_next) to the queue that * is being removed. It also plays part of the role in ensuring * that the module's/driver's put procedure doesn't get called * after qprocsoff returns. * Removeq depends on the stream head ensuring that there are * no concurrent insertq or removeq on the same stream. The * stream head ensures this using the flags STWOPEN, STRCLOSE and * The set of locks needed to remove the queue is different in * Acquire sd_lock, sd_reflock, and all the syncq locks in the stream after * waiting for the syncq reference count to drop to 0 indicating that no * non-close threads are present anywhere in the stream. This ensures that any * module/driver can reference q_next in its open, close, put, or service * The sq_rmqcount counter tracks the number of threads inside removeq(). * strlock() ensures that there is either no threads executing inside perimeter * or there is only a thread calling qprocsoff(). * strlock() compares the value of sq_count with the number of threads inside * removeq() and waits until sq_count is equal to sq_rmqcount. We need to wakeup * any threads waiting in strlock() when the sq_rmqcount increases. "removeq:%p %p",
qp,
wqp);
* For queues using Synchronous streams, we must wait for all threads in * rwnext() to drain out before proceeding. /* First, we need wakeup any threads blocked in rwnext() */ /* The QEND flag might have to be updated for the upstream guy */ * Move any messages destined for the put procedures to the next * syncq in line. Otherwise free them. * Quick check to see whether there are any messages or events. * If this was a module removal, decrement the push count. * Make sure any messages that were propagated are drained. * Also clear any QFULL bit caused by messages that were propagated. * For the driver calling qprocsoff, propagate_syncq * frees all the messages instead of putting it in * We come here for any pop of a module except for the * case of driver being removed. We don't call emptysq * if we did not move any messages. This will avoid holding * PERMOD syncq locks in emptysq * Prevent further entry by setting a flag (like SQ_FROZEN, SQ_BLOCKED or * If maxcnt is not -1 it assumes that caller has "maxcnt" claim(s) on the * sync queue and waits until sq_count reaches maxcnt. * if maxcnt is -1 there's no need to grab sq_putlocks since the caller * does not care about putnext threads that are in the middle of calling put * This routine is used for both inner and outer syncqs. * SQ_FROZEN will be set if there is a frozen stream that has a * queue which also refers to this "shared" syncq. * SQ_BLOCKED will be set if there is "off" queue which also * refers to this "shared" syncq. * Reset a flag that was set with blocksq. * Can not use this routine to reset SQ_WRITER. * If "isouter" is set then the syncq is assumed to be an outer perimeter * and drain_syncq is not called. Instead we rely on the qwriter_outer thread * to handle the queued qwriter operations. * no need to grab sq_putlocks here. See comment in strsubr.h that explains when /* drain_syncq drops SQLOCK */ * Reset a flag that was set with blocksq. * Does not drain the syncq. Use emptysq() for that. * Returns 1 if SQ_QUEUED is set. Otherwise 0. * no need to grab sq_putlocks here. See comment in strsubr.h that explains when * Empty all the messages on a syncq. * no need to grab sq_putlocks here. See comment in strsubr.h that explains when * To prevent potential recursive invocation of drain_syncq we * do not call drain_syncq if count is non-zero. /* drain_syncq() drops SQLOCK */ * Ordered insert while removing duplicates. * Walk the write side queues until we hit either the driver * or a twist in the stream (_SAMESTR will return false in both * these cases) then turn around and walk the read side queues * back up to the stream head. * Allocate and build a list of all syncqs in a stream and the syncq(s) * associated with the "q" parameter. The resulting list is sorted in a * canonical order and is free of duplicates. * Assumes the passed queue is a _RD(q). * Allocate 2 syncql_t's for each pushed module. Note that * the sqlist_t structure already has 4 syncql_t's built in: * 2 for the stream head, and 2 for the driver/other stream head. * Free the list created by sqlist_alloc() * Prevent any new entries into any syncq in this stream. * Get a sorted list with all the duplicates removed containing * all the syncqs referenced by this stream. * Release the block on new entries into this stream * Get a sorted list with all the duplicates removed containing * all the syncqs referenced by this stream. * Have to drop the SQ_FROZEN flag on all the syncqs before * starting to drain them; otherwise the draining might * cause a freezestr in some module on the stream (which * Check if anyone has frozen this stream with freezestr * Obsoleted interface. Should not be used. * Enter a perimeter. c_inner and c_outer specifies which concurrency bits * Wait if SQ_QUEUED is set to preserve ordering between messages and qwriter * calls and the running of open, close and service procedures. * if c_inner bit is set no need to grab sq_putlocks since we don't care * if other threads have entered or are entering put entry point. * if c_inner bit is set it might have been posible to use * open/close path for IP) but since the count may need to be decremented in * qwait() we wouldn't know which counter to decrement. Currently counter is * selected by current cpu_seqid and current CPU can change at any moment. XXX * in the future we might use curthread id bits to select the counter and this * would stay constant across routine calls. * Increment ref count to keep closes out of this queue. /* Make sure all putcounts now use slowlock. */ * Wait until we can enter the inner perimeter. * If we want exclusive access we wait until sq_count is 0. * We have to do this before entering the outer perimeter in order * to preserve put/close message ordering. /* Check if we need to enter the outer perimeter */ * We have to enter the outer perimeter exclusively before * we can increment sq_count to avoid deadlock. This implies * that we have to re-check sq_flags and sq_count. * is it possible to have c_inner set when c_outer is not set? * there should be no need to recheck sq_putcounts * because outer_enter() has already waited for them to clear * after setting SQ_WRITER. * SUMCHECK_SQ_PUTCOUNTS should return the sum instead * of doing an ASSERT internally. Others should do * ASSERT(SUMCHECK_SQ_PUTCOUNTS(sq) == 0); * without the need to #ifdef DEBUG it. * leave a syncq. announce to framework that closes may proceed. * c_inner and c_outer specifies which concurrency bits * must never be called from driver or module put entry point. * no need to grab sq_putlocks here. See comment in strsubr.h that explains when * decrement ref count, drain the syncq if possible, and wake up * The syncq needs to be drained. "Exit" the syncq * before calling drain_syncq. /* Check if we need to exit the outer perimeter */ /* XXX will this ever be true? */ /* Check if we need to exit the outer perimeter */ * Prevent q_next from changing in this stream by incrementing sq_count. * no need to grab sq_putlocks here. See comment in strsubr.h that explains when * no need to grab sq_putlocks here. See comment in strsubr.h that explains when * To prevent potential recursive invocation of * drain_syncq we do not call drain_syncq if count is * Prevent q_next from changing in this stream by incrementing sd_refcnt. /* Outer perimeter code */ * The outer syncq uses the fields and flags in the syncq slightly * differently from the inner syncqs. * sq_count Incremented when there are pending or running * writers at the outer perimeter to prevent the set of * inner syncqs that belong to the outer perimeter from * sq_head/tail List of deferred qwriter(OUTER) operations. * SQ_BLOCKED Set to prevent traversing of sq_next,sq_prev while * inner syncqs are added to or removed from the * SQ_WRITER A thread is currently traversing all the inner syncqs * setting the SQ_WRITER flag. * Get write access at the outer perimeter. * Note that read access is done by entersq, putnext, and put by simply * incrementing sq_count in the inner syncq. * Waits until "flags" is no longer set in the outer to prevent multiple * threads from having write access at the same time. SQ_WRITER has to be part * until the outer_exit is finished. * outer_enter is vulnerable to starvation since it does not prevent new * threads from entering the inner syncqs while it is waiting for sq_count to * Set SQ_WRITER on all the inner syncqs while holding * the SQLOCK on the outer syncq. This ensures that the changing * of SQ_WRITER is atomic under the outer SQLOCK. * Get everybody out of the syncqs sequentially. * Note that we don't actually need to aqiure the PUTLOCKS, since * we have already cleared the fastbit, and set QWRITER. By * definition, the count can not increase since putnext will * take the slowlock path (and the purpose of aquiring the * putlocks was to make sure it didn't increase while we were * Note that we still aquire the PUTLOCKS to be safe. * Verify that none of the flags got set while we * were waiting for the sq_counts to drop. * If this happens we exit and retry entering the * Drop the write access at the outer perimeter. * Read access is dropped implicitly (by putnext, put, and leavesq) by * Atomically (from the perspective of threads calling become_writer) * drop the write access at the outer perimeter by holding * SQLOCK(outer) across all the dropsq calls and the resetting of * This defines a locking order between the outer perimeter * SQLOCK and the inner perimeter SQLOCKs. * sq_onext is stable since sq_count has not yet been decreased. * Reset the SQ_WRITER flags in all syncqs. * After dropping SQ_WRITER on the outer syncq we empty all the * Add another syncq to an outer perimeter. * Block out all other access to the outer perimeter while it is being * Assumes that the caller has *not* done an outer_enter. * Vulnerable to starvation in blocksq. /* Get exclusive access to the outer perimeter list */ * Remove a syncq from an outer perimeter. * Block out all other access to the outer perimeter while it is being * Assumes that the caller has *not* done an outer_enter. * Vulnerable to starvation in blocksq. /* Get exclusive access to the outer perimeter list */ * Queue a deferred qwriter(OUTER) callback for this outer perimeter. * If this is the first callback for this outer perimeter then add * this outer perimeter to the list of outer perimeters that * the qwriter_outer_thread will process. * Increments sq_count in the outer syncq to prevent the membership * of the outer perimeter (in terms of inner syncqs) to change while * the callback is pending. * Try and upgrade to write access at the outer perimeter. If this can * not be done without blocking then queue the callback to be done * by the qwriter_outer_thread. * This routine can only be called from put or service procedures plus * asynchronous callback routines that have properly entered to * queue (with entersq.) Thus qwriter(OUTER) assumes the caller has one claim * on the syncq associated with q. panic(
"qwriter(PERIM_OUTER): no outer perimeter");
* If some thread is traversing sq_next, or if we are blocked by * outer_insert or outer_remove, or if the we already have queued * callbacks, then queue this callback for later processing. * Also queue the qwriter for an interrupt thread in order * to reduce the time spent running at high IPL. * to identify there are events. * Queue the become_writer request. * The queueing is atomic under SQLOCK(outer) in order * to synchronize with outer_exit. * queue_writer will drop the outer SQLOCK /* Must set SQ_WRITER on inner perimeter */ * The outer could have been SQ_BLOCKED thus * SQ_WRITER might not be set on the inner. * We are half-way to exclusive access to the outer perimeter. * while the inner syncqs are traversed. * Check if we can run the function immediately. Mark all * syncqs with the writer flag to prevent new entries into * put and service procedures. * Set SQ_WRITER on all the inner syncqs while holding * the SQLOCK on the outer syncq. This ensures that the changing * of SQ_WRITER is atomic under the outer SQLOCK. * Some other thread has a read claim on the outer perimeter. * Queue the callback for deferred processing. * queue_writer will set SQ_QUEUED before we drop SQ_WRITER * so that other qwriter(OUTER) calls will queue their * callbacks as well. queue_writer increments sq_count so we * decrement to compensate for the our increment. * Dropping SQ_WRITER enables the writer thread to work * on this outer perimeter. /* queue_writer dropper the lock */ /* Can run it immediately */ * Dequeue all writer callbacks from the outer perimeter and run them. * queues cannot be placed on the queuelist on the outer * Drop the message if the queue is closing. * Make sure that the queue is "claimed" when the callback * is run in order to satisfy various ASSERTs. * The list of messages on the inner syncq is effectively hashed * by destination queue. These destination queues are doubly * linked lists (hopefully) in priority order. Messages are then * Additional messages are linked together by the b_next/b_prev * elements in the mblk, with (similar to putq()) the first message * having a NULL b_prev and the last message having a NULL b_next. * Events, such as qwriter callbacks, are put onto a list in FIFO * order referenced by sq_evhead, and sq_evtail. This is a singly * linked list, and messages here MUST be processed in the order queued. * Run the events on the syncq event list (sq_evhead). * Assumes there is only one claim on the syncq, it is * already exclusive (SQ_EXCL set), and the SQLOCK held. * Messages here are processed in order, with the SQ_EXCL bit * held all the way through till the last message is processed. * We need to process all of the events on this list. It * is possible that new events will be added while we are * away processing a callback, so on every loop, we start * back at the beginning of the list. * We have to reaccess sq_evhead since there is a * possibility of a new entry while we were running * Messages from the event queue must be taken off in * re-read the flags, since they could have changed. * Put messages on the event list. * If we can go exclusive now, do so and process the event list, otherwise * let the last claim service this list (or wake the sqthread). * This procedure assumes SQLOCK is held. To run the event list, it * must be called with no claims. * This is a callback. Add it to the list of callbacks * and see about upgrading. * We have set SQ_EVENTS, so threads will have to * unwind out of the perimiter, and new entries will * not grab a putlock. But we still need to know * how many threads have already made a claim to the * syncq, so grab the putlocks, and sum the counts. * If there are no claims on the syncq, we can upgrade * to exclusive, and run the event list. * NOTE: We hold the SQLOCK, so we can just grab the * We have no claim, so we need to check if there * are no others, then we can upgrade. * There are currently no claims on * the syncq by this thread (at least on this entry). The thread who has * the claim should drain syncq. * Can't upgrade - other threads inside. * Need to set SQ_EXCL and make a claim on the syncq. /* Process the events list */ * We don't need to acquire the putlocks to release * SQ_EXCL, since we are exclusive, and hold the SQLOCK. * sq_run_events should have released SQ_EXCL * If anything happened while we were running the * events (or was there before), we need to process * them now. We shouldn't be exclusive sine we * released the perimiter above (plus, we asserted * Perform delayed processing. The caller has to make sure that it is safe * to enter the syncq (e.g. by checking that none of the SQ_STAYAWAY bits are * Assume that the caller has NO claims on the syncq. However, a claim * on the syncq does not indicate that a thread is draining the syncq. * There may be more claims on the syncq than there are threads draining * (i.e. #_threads_draining <= sq_count) * drain_syncq has to terminate when one of the SQ_STAYAWAY bits gets set * in order to preserve qwriter(OUTER) ordering constraints. * sq_putcount only needs to be checked when dispatching the queued * writer call for CIPUT sync queue, but this is handled in sq_run_events. "drain_syncq start:%p",
sq);
* If SQ_EXCL is set, someone else is processing this syncq - let him * This routine can be called by a background thread if * it was scheduled by a hi-priority thread. SO, if there are * NOT messages queued, return (remember, we have the SQLOCK, * and it cannot change until we release it). Wakeup any waiters also. * If this is not a concurrent put perimiter, we need to * become exclusive to drain. Also, if not CIPUT, we would * not have acquired a putlock, so we don't need to check * the putcounts. If not entering with a claim, we test * This is where we make a claim to the syncq. * This can either be done by incrementing a putlock, or * the sq_count. But since we already have the SQLOCK * here, we just bump the sq_count. * Note that after we make a claim, we need to let the code * fall through to the end of this routine to clean itself * up. A return in the while loop will put the syncq in a * If we are told to stayaway or went exclusive, * If there are events to run, do so. * We have one claim to the syncq, so if there are * more than one, other threads are running. /* Can't upgrade - other threads inside */ * we have the only claim, run the events, * sq_run_events will clear the SQ_EXCL flag. * If this is a CIPUT perimiter, we need * to drop the SQ_EXCL flag so we can properly * continue draining the syncq. * And go back to the beginning just in case * anything changed while we were away. * Find the queue that is not draining. * q_draining is protected by QLOCK which we do not hold. * But if it was set, then a thread was draining, and if it gets * cleared, then it was because the thread has successfully * drained the syncq, or a GOAWAY state occured. For the GOAWAY * state to happen, a thread needs the SQLOCK which we hold, and * if there was such a flag, we whould have already seen it. * We have a queue to work on, and we hold the * SQLOCK and one claim, call qdrain_syncq. * This means we need to release the SQLOCK and * aquire the QLOCK (OK since we have a claim). * Note that qdrain_syncq will actually dequeue * this queue from the sq_head list when it is * convinced all the work is done and release * the QLOCK before returning. /* The queue is drained */ * NOTE: After this point qp should not be used since it may be * sq->sq_head cannot change because we hold the * sqlock. However, a thread CAN decide that it is no longer * going to drain that queue. However, this should be due to * a GOAWAY state, and we should see that here. * This loop is not very efficient. One solution may be adding a second * pointer to the "draining" queue, but it is difficult to do when * queues are inserted in the middle due to priority ordering. Another * possibility is to yank the queue out of the sq list and put it onto * the "draining list" and then put it back if it can't be drained. /* Drop SQ_EXCL for non-CIPUT perimiters */ /* Wake up any waiters. */ "drain_syncq end:%p",
sq);
* qdrain_syncq can be called (currently) from only one of two places: * putnext (or some variation of it). * If called from drain_syncq, we found it in the list * of queue's needing service, so there is work to be done (or it * wouldn't be on the list). * If called from some putnext variation, it was because the * perimiter is open, but messages are blocking a putnext and * there is not a thread working on it. Now a thread could start * working on it while we are getting ready to do so ourself, but * the thread would set the q_draining flag, and we can spin out. * As for qwait(_sig), I think I shall let it continue to call * drain_syncq directly (after all, it will get here eventually). * qdrain_syncq has to terminate when: * - one of the SQ_STAYAWAY bits gets set to preserve qwriter(OUTER) ordering * - SQ_EVENTS gets set to preserve qwriter(INNER) ordering * Will release QLOCK before returning "drain_syncq start:%p",
sq);
* For non-CIPUT perimiters, we should be called with the * exclusive bit set already. For non-CIPUT perimiters we * will be doing a concurrent drain, so it better not be set. * All outer pointers are set, or none of them are * This is OK without the putlocks, because we have one * claim either from the sq_count, or a putcount. We could * get an erroneous value from other counts, but ours won't * change, so one way or another, we will have at least a * The first thing to do here, is find out if a thread is already * draining this queue or the queue is closing. If so, we are done, * just return. Also, if there are no messages, we are done as well. * Note that we check the q_sqhead since there is s window of * opportunity for us to enter here because Q_SQQUEUED was set, but is * If the perimiter is exclusive, there is nothing we can * Note that there is nothing to prevent this case from changing * right after this check, but the spin-out will catch it. /* Tell other threads that we are draining this queue */ * Because we can enter this routine just because * a putnext is blocked, we need to spin out if * the perimiter wants to go exclusive as well * as just blocked. We need to spin out also if * events are queued on the syncq. * Don't check for SQ_EXCL, because non-CIPUT * perimiters would set it, and it can't become * exclusive while we hold a claim. * Since we are in qdrain_syncq, we already know the queue, * but for sanity, we want to check this against the qp that * was passed in by bp->b_queue. * We would have the following check in the DEBUG code: * if (bp->b_prev != NULL) { * ASSERT(bp->b_prev == (void (*)())q->q_qinfo->qi_putp); * This can't be done, however, since IP modifies qinfo * structure at run-time (switching between IPv4 qinfo and IPv6 * qinfo), invalidating the check. * So the assignment to func is left here, but the ASSERT itself * is removed until the whole issue is resolved. * We should decrement q_syncqmsgs only after executing the * put procedure to avoid a possible race with putnext(). * In putnext() though it sees Q_SQQUEUED is set, there is * an optimization which allows putnext to call the put * procedure directly if (q_syncqmsgs == 0) and thus * a message reodering could otherwise occur. * Clear QFULL in the next service procedure queue if * this is the last message destined to that queue. * It would make better sense to have some sort of * tunable for the low water mark, but these symantics * are not yet defined. So, alas, we use a constant. * Always clear SQ_EXCL when CIPUT in order to handle * The putp() can call qwriter and get exclusive access * IFF this is the only claim. So, we need to test for * this possibility so we can aquire the mutex and clear * We should either have no queues on the syncq, or we were * told to goaway by a waiter (which we will wake up at the * Remove the q from the syncq list if all the messages are * Since the queue is removed from the list, reset its priority. * Remember, the q_draining flag is used to let another * thread know that there is a thread currently draining * the messages for a queue. Since we are now done with * this queue (even if there may be messages still there), * we need to clear this flag so some thread will work /* called with a claim, so OK to drop all locks. */ "drain_syncq end:%p",
sq);
/* END OF QDRAIN_SYNCQ */ * This is the mate to qdrain_syncq, except that it is putting the * message onto the the queue instead draining. Since the * message is destined for the queue that is selected, there is * no need to identify the function because the message is * intended for the put routine for the queue. But this * routine will do it anyway just in case (but only for debug kernels). * After the message is enqueued on the syncq, it calls putnext_tail() * which will schedule a background thread to actually process the message. * Assumes that there is a claim on the syncq (sq->sq_count > 0) and * SQLOCK(sq) and QLOCK(q) are not held. * Set QFULL in next service procedure queue (that cares) if not * already set and if there are already more messages on the syncq * than sq_max_size. If sq_max_size is 0, no flow control will be * The fq here is the next queue with a service procedure. * This is where we would fail canputnext, so this is where we * LOCKING HIERARCHY: In the case when fq != q we need to * a) Take QLOCK(fq) to set QFULL flag and * b) Take sd_reflock in the case of the hot stream to update * We already have QLOCK at this point. To avoid cross-locks with * freezestr() which grabs all QLOCKs and with strlock() which grabs * both SQLOCK and sd_reflock, we need to drop respective locks first. * to trace the queue that the message is intended for. Note * that the original use was to identify the queue and function * to call on the drain. In the new syncq, we have the context * of the queue that we are draining, so call it's putproc and * don't rely on the saved values. But for debug this is still * Enqueue the message on the list. * SQPUT_MP() accesses q_syncqmsgs. We are already holding QLOCK to * protect it. So its ok to acquire SQLOCK after SQPUT_MP(). * And queue on syncq for scheduling, if not already queued. * Note that we need the SQLOCK for this, and for testing flags * at the end to see if we will drain. So grab it now, and * release it before we call qdrain_syncq or return. * All of these conditions MUST be true! * SQLOCK is still held, so sq_count can be safely decremented. /* Should not reference sq or q after this point. */ * Remove all messages from a syncq (if qp is NULL) or remove all messages * that would be put into qp by drain_syncq. * Used when deleting the syncq (qp == NULL) or when detaching * Return non-zero if one or more messages were freed. * no need to grab sq_putlocks here. See comment in strsubr.h that explains when * NOTE: This function assumes that it is called from the close() context and * that all the queues in the syncq are going aay. For this reason it doesn't * currently valid, but it is useful to rethink this function to behave properly * Before we leave, we need to make sure there are no * events listed for this queue. All events for this queue /* Delete this message */ * Update sq_evtail if the last element * - match qp if qp is set, remove it's messages * Yank the messages as a list off the queue * We do not have QLOCK(q) here (which is safe due to * assumptions mentioned above). To obtain the lock we * need to release SQLOCK which may allow lots of things * to change upon us. This place requires more analysis. * Free each of the messages. * Now remove the queue from the syncq. * If qp was specified, we are done with it and are * going to drop SQLOCK(sq) and return. We wakeup syncq * waiters while we still have the SQLOCK. /* Drop SQLOCK across clr_qfull */ * We avoid doing the test that drain_syncq does and * unconditionally clear qfull for every flushed * message. Since flush_syncq is only called during * close this should not be a problem. * The head was removed by SQRM_Q above. * reread the new head and flush it. * Propagate all messages from a syncq to the next syncq that are associated * with the specified queue. If the queue is attached to a driver or if the * messages have been added due to a qwriter(PERIM_INNER), free the messages. * Assumes that the stream is strlock()'ed. We don't come here if there * are no messages to propagate. * NOTE : If the queue is attached to a driver, all the messages are freed * as there is no point in propagating the messages from the driver syncq * to the closing stream head which will in turn get freed later. * As entersq() does not increment the sq_count for * the write side, check sq_count for non-QPERQ * propagate_syncq() can be called because of either messages on the * queue syncq or because on events on the queue syncq. Do actual * message propagations if there are any messages. * Walk the list of messages, and free them if this is a driver, * otherwise reset the b_prev and b_queue value to the new putp. * Afterward, we will just add the head to the end of the next * syncq, and point the tail to the end of this one. /* Change the q values for this message */ * Attach list of messages to the end of the new queue (if there * is a list of messages). * When messages are moved from high priority queue to * another queue, the destination queue priority is * Before we leave, we need to make sure there are no * events listed for this queue. All events for this queue /* Delete this message */ * Update sq_evtail if the last element /* Wake up any waiter before leaving. */ * Try and upgrade to exclusive access at the inner perimeter. If this can * not be done without blocking then request will be queued on the syncq * and drain_syncq will run it later. * This routine can only be called from put or service procedures plus * asynchronous callback routines that have properly entered to * queue (with entersq.) Thus qwriter_inner assumes the caller has one claim * on the syncq associated with q. * Can upgrade. This case also handles nested qwriter calls * (when the qwriter callback function calls qwriter). In that * case SQ_EXCL is already set. * Assumes that leavesq, putnext, and drain_syncq will reset * until putnext, leavesq, or drain_syncq drops it. * That way we handle nested qwriter(INNER) without dropping * SQ_EXCL until the outermost qwriter callback routine is * Synchronous callback support functions * Allocate a callback parameter structure. * Assumes that caller initializes the flags and the id. * Acquires SQLOCK(sq) if non-NULL is returned. * Only try tryhard allocation if the caller is ready to panic. "callbparams_free: not found\n"));
"callbparams_free_id: not found\n"));
* Callback wrapper function used by once-only callbacks that can be * cancelled (qtimeout and qbufcall) * Contains inline version of entersq(sq, SQ_CALLBACK) that can be * cancelled by the qun* functions. /* Can not handle exlusive entry at outer perimeter */ /* timeout has been cancelled */ * We drop the lock only for leavesq to re-acquire it. * Possible optimization is inline of leavesq. * no need to grab sq_putlocks here. See comment in strsubr.h that * explains when sq_putlocks are used. * sq_count (or one of the sq_putcounts) has already been * decremented by the caller, and if SQ_QUEUED, we need to call * drain_syncq (the global syncq drain). * If putnext_tail is called with the SQ_EXCL bit set, we are in * one of two states, non-CIPUT perimiter, and we need to clear * it, or we went exclusive in the put procedure. In any case, * we want to clear the bit now, and it is probably easier to do * this at the beginning of this function (remember, we hold * the SQLOCK). Lastly, if there are other messages queued * on the syncq (and not for our destination), enable the syncq /* Clear SQ_EXCL if set in passflags */ * We have cleared SQ_EXCL if we were asked to, and started * the wakeup process for waiters. If there are no writers * then we need to drain the syncq if we were told to, or * enable the background thread to do it. /* drain_syncq will take care of events in the list */ /* Drop the SQLOCK on exit */ "putnext_end:(%p, %p, %p) done",
NULL,
qp,
sq);
/* Fast check if there is any work to do before getting the lock. */ * Do not reset QFULL (and backenable) if the q_count is the reason * If queue is empty i.e q_mblkcnt is zero, queue can not be full. * If both q_count and q_mblkcnt are less than the hiwat mark, * A little more confusing, how about this way: * if someone wants to write, * both counts are less than the lowat mark * Set the forward service procedure pointer. * Called at insert-time to cache a queue's next forward service procedure in * q_nfsrv; used by canput() and canputnext(). If the queue to be inserted * has a service procedure then q_nfsrv points to itself. If the queue to be * inserted does not have a service procedure, then q_nfsrv points to the next * queue forward that has a service procedure. If the queue is at the logical * end of the stream (driver for write side, stream head for the read side) * and does not have a service procedure, then q_nfsrv also points to itself. * Insert the driver, initialize the driver and stream head. * _I_INSERT does not allow inserting a driver. Make sure * that it is not an insertion. * set up read side q_nfsrv pointer. This MUST be done * before setting the write side, because the setting of * the write side for a fifo may depend on it. * Suppose we have a fifo that only has pipemod pushed. * pipemod has no read or write service procedures, so * nfsrv for both pipemod queues points to prev_rq (the * stream read head). Now push bufmod (which has only a * read service procedure). Doing the write side first, * wnew->q_nfsrv is set to pipemod's writeq nfsrv, which * is WRONG; the next queue forward from wnew with a * service procedure will be rnew, not the stream read head. * Since the downstream queue (which in the case of a fifo * is the read queue rnew) can affect upstream queues, it * needs to be done first. Setting up the read side first * sets nfsrv for both pipemod queues to rnew and then * when the write side is set up, wnew-q_nfsrv will also * use _OTHERQ() because, if this is a pipe, next * module may have been pushed from other end and * q_next could be a read queue. /* set up write side q_nfsrv pointer */ * For insertion, need to update nfsrv of the modules * above which do not have a service routine. * fifo, wnew/rnew will also be the middle of * a fifo and wnew's nfsrv is same as rnew's. * Reset the forward service procedure pointer; called at remove-time. /* Reset the write side q_nfsrv pointer for _I_REMOVE */ /* reset the read side q_nfsrv pointer */ /* Note that rqp->q_next cannot be NULL */ * This routine should be called after all stream geometry changes to update * the stream head cached struio() rd/wr queue pointers. Note must be called * with the streamlock()ed. * Note: only enables Synchronous STREAMS for a side of a Stream which has * an explicit synchronous barrier module queue. That is, a queue that * has specified a struio() type. * Not stremahead, but a mux, so no Synchronous STREAMS. * Scan the write queue(s) while synchronous * until we find a qinfo uio type specified. * Scan the read queue(s) while synchronous * until we find a qinfo uio type specified. * pass_wput, unblocks the passthru queues, so that * messages can arrive at muxs lower read queue, before * Create a new queue and block it and then insert it * below the stream head on the lower stream. * This prevents any messages from arriving during the setq * been acked or nacked or if a message is generated and sent * down muxs write put procedure. * After the new queue is inserted, all messages coming from below are * blocked. The call to strlock will ensure that all activity in the stream head * read queue syncq is stopped (sq_count drops to zero). /* setq might sleep in allocator - avoid holding locks. */ * Use strlock() to wait for the stream head sq_count to drop to zero * since we are going to change q_ptr in the stream head. Note that * insertq() doesn't wait for any syncq counts to drop to zero. * Let messages flow up into the mux by removing * Wait for the condition variable pointed to by `cvp' to be signaled, * or for `tim' milliseconds to elapse, whichever comes first. If `tim' * is negative, then there is no time limit. If `nosigs' is non-zero, * then the wait will be non-interruptible. * Returns >0 if signaled, 0 if interrupted, or -1 upon timeout. * convert milliseconds to clock ticks * Wait until the stream head can determine if it is at the mark but * don't wait forever to prevent a race condition between the "mark" state * in the stream head and any mark state in the caller/user of this routine. * This is used by sockets and for a socket it would be incorrect * to return a failure for SIOCATMARK when there is no data in the receive * queue and the marked urgent data is traveling up the stream. * This routine waits until the mark is known by waiting for one of these * The stream head read queue becoming non-empty (including an EOF) * The STRATMARK flag being set. (Due to a MSGMARKNEXT message.) * The STRNOTATMARK flag being set (which indicates that the transport * has sent a MSGNOTMARKNEXT message to indicate that it is not at * The routine returns 1 if the stream is at the mark; 0 if it can * be determined that the stream is not at the mark. * If the wait times out and it can't determine * whether or not the stream might be at the mark the routine will return -1. * Note: This routine should only be used when a mark is pending i.e., * in the socket case the SIGURG has been posted. * Note2: This can not wakeup just because synchronous streams indicate * that data is available since it is not possible to use the synchronous * streams interfaces to determine the b_flag value for the data queued below /* Wait for 100 milliseconds for any state change. */ * Set a read side error. If persist is set change the socket error * to persistent. If errfunc is set install the function as the exported * Set a write side error. If persist is set change the socket error * Make the stream return 0 (EOF) when all data has been read. * No effect on write side. /* Used within framework when the queue is already locked */ * Do not place on run queue if already enabled or closing. * mark queue enabled and place on run list if it is not already being * serviced. If it is serviced, the runservice() function will detect * that QENAB is set and call service procedure before clearing /* Record the time of qenable */ * Put the queue in the stp list and schedule it for background * processing if it is not already scheduled or if stream head does not * intent to process it in the foreground later by setting * If there are already something on the list, stp flags should show * If no one will drain this stream we are the first producer and * need to schedule it for background thread. * No one will service this stream later, so we have to * Task queue failed so fail over to the backup * It is safe to clear STRS_SCHEDULED flag because it * was set by this thread above. * Failover scheduling is protected by service_queue * Wakeup background queue processing thread. * The queue in the list should have * QENAB flag set and should not have * QINSERVICE flag set. QINSERVICE is * set when the queue is dequeued and * qenable_locked doesn't enqueue a * queue with QINSERVICE set. /* if we came here from the background thread, clear the flag */ /* let drain_syncq know that it's being called in the background */ * Note that SQ_WRITER is used on the outer perimeter * to signal that a qwriter(OUTER) is either investigating * running or that it is actually running a function. * All inner syncq are empty and have SQ_WRITER set * to block entering the outer perimeter. * We do not need to explicitly call write_now since * outer_exit does it for us. * Background processing of the stream queue list. * Foreground processing of the stream queue list. * We are going to drain this stream queue list, so qenable_locked will * not schedule it until we finish. * Help backup background thread to drain the qhead/qtail list. * Replace the cred currently in the mblk with a different one. /* Associate values for M_DATA type */ /* get hardware checksum attribute */ * Checksum buffer *bp for len bytes with psum partial checksum, * or 0 if none, and return the 16 bit partial checksum. * Bp is 16 bit aligned and len is multiple of 16 bit word. * Bp isn't 16 bit aligned. * Normalize psum to 16 bits before returning the new partial * checksum. The max psum value before normalization is 0x3FDFE. return ((
psum >>
16) + (
psum &
0xFFFF));
/* NOTE: Do not add code after this point. */ * replacement for QLOCK macro for those that can't use it. * Initialize the STR stack instance, which tracks autopush and persistent * set up mux_node structures. * Note: run at zone shutdown and not destroy so that the PLINKs are * gone by the time other cleanup happens from the destroy callbacks. /* Undo all the I_PLINKs for this zone */ /* Close layered handles */ * Free the structure; str_stack_shutdown did the other cleanup work.