Lines Matching defs:pl

751 srlzer_enter(struct fmdump_pipeline *pl)
753 struct fmdump_srlzer *srlzer = pl->pl_srlzer;
759 srlzer_exit(struct fmdump_pipeline *pl)
761 struct fmdump_srlzer *srlzer = pl->pl_srlzer;
801 pipeline_stall(struct fmdump_pipeline *pl)
803 struct fmdump_srlzer *srlzer = pl->pl_srlzer;
806 (void) pthread_cond_wait(&pl->pl_cv, &srlzer->ds_lock);
810 pipeline_continue(struct fmdump_pipeline *pl)
812 struct fmdump_srlzer *srlzer = pl->pl_srlzer;
815 (void) pthread_cond_signal(&srlzer->ds_pipearr[pl->pl_srlzeridx].pl_cv);
827 pipeline_output(struct fmdump_pipeline *pl, const fmd_log_record_t *rp)
829 struct fmdump_srlzer *srlzer = pl->pl_srlzer;
832 int thisidx = pl->pl_srlzeridx;
865 if (wpl == pl)
875 pipeline_mark_consumed(struct fmdump_pipeline *pl)
877 struct fmdump_srlzer *srlzer = pl->pl_srlzer;
880 srlzer->ds_slot[pl->pl_srlzeridx].ss_state = FMDUMP_PIPE_PROCESSING;
885 pipeline_done(struct fmdump_pipeline *pl)
887 struct fmdump_srlzer *srlzer = pl->pl_srlzer;
890 srlzer_enter(pl);
892 srlzer->ds_slot[pl->pl_srlzeridx].ss_state = FMDUMP_PIPE_DONE;
898 srlzer_exit(pl);
902 pipeline_pollmode(struct fmdump_pipeline *pl)
904 struct fmdump_srlzer *srlzer = pl->pl_srlzer;
907 if (srlzer->ds_slot[pl->pl_srlzeridx].ss_state == FMDUMP_PIPE_POLLING)
910 srlzer_enter(pl);
912 srlzer->ds_slot[pl->pl_srlzeridx].ss_state = FMDUMP_PIPE_POLLING;
917 srlzer_exit(pl);
923 struct fmdump_pipeline *pl = (struct fmdump_pipeline *)arg;
925 fmdump_warn("skipping record in %s: %s\n", pl->pl_processing,
935 struct fmdump_pipeline *pl = (struct fmdump_pipeline *)arg;
938 fmd_log_rec_f *func = pl->pl_arg.da_fmt->do_func;
940 srlzer_enter(pl);
942 if (!pipeline_output(pl, rp))
943 pipeline_stall(pl);
945 rc = func(lp, rp, pl->pl_arg.da_fp);
946 pipeline_mark_consumed(pl);
948 srlzer_exit(pl);
954 pipeline_process(struct fmdump_pipeline *pl, char *logpath, boolean_t follow)
961 pl->pl_processing = logpath;
973 pl->pl_ops = logtypes[i].lt_ops;
974 pl->pl_arg.da_fmt =
975 &pl->pl_ops->do_formats[pl->pl_fmt];
980 if (pl->pl_ops == NULL) {
988 if (fmd_log_xiter(lp, FMD_LOG_XITER_REFS, pl->pl_arg.da_fc,
989 pl->pl_arg.da_fv, pipeline_cb, pipeline_err, (void *)pl,
999 pipeline_pollmode(pl);
1011 struct fmdump_pipeline *pl = (struct fmdump_pipeline *)arg;
1014 (void) pthread_mutex_lock(&pl->pl_lock);
1015 pl->pl_started = 1;
1016 (void) pthread_mutex_unlock(&pl->pl_lock);
1017 (void) pthread_cond_signal(&pl->pl_cv);
1019 for (ll = pl->pl_rotated; ll != NULL; ll = ll->next)
1020 pipeline_process(pl, ll->path, B_FALSE);
1022 pipeline_process(pl, pl->pl_logpath, pl->pl_follow);
1023 pipeline_done(pl);
1034 struct fmdump_pipeline *pipeline, *pl;
1094 for (i = 0, pl = &pipeline[0]; i < npipe; i++, pl++) {
1095 (void) pthread_mutex_init(&pl->pl_lock, NULL);
1096 (void) pthread_cond_init(&pl->pl_cv, NULL);
1098 pl->pl_srlzer = &srlzer;
1099 pl->pl_srlzeridx = i;
1100 pl->pl_follow = opt_f ? B_TRUE : B_FALSE;
1101 pl->pl_fmt = fmt;
1102 pl->pl_arg.da_fv = fv;
1103 pl->pl_arg.da_fc = fc;
1104 pl->pl_arg.da_fp = stdout;
1106 (void) pthread_mutex_lock(&pl->pl_lock);
1108 if (pthread_create(&pl->pl_thr, NULL,
1109 pipeline_thr, (void *)pl) != 0)
1114 for (i = 0, pl = &pipeline[0]; i < npipe; i++, pl++) {
1115 while (!pl->pl_started)
1116 (void) pthread_cond_wait(&pl->pl_cv, &pl->pl_lock);
1118 (void) pthread_mutex_unlock(&pl->pl_lock);
1121 for (i = 0, pl = &pipeline[0]; i < npipe; i++, pl++)
1122 (void) pthread_join(pl->pl_thr, NULL);