back to topotato report
topotato coverage report
Current view: top level - lib - workqueue.c (source / functions) Hit Total Coverage
Test: test_pim6_bootstrap.py::PIM6Bootstrap Lines: 107 149 71.8 %
Date: 2023-02-16 02:07:22 Functions: 15 19 78.9 %

          Line data    Source code
       1             : /*
       2             :  * Quagga Work Queue Support.
       3             :  *
       4             :  * Copyright (C) 2005 Sun Microsystems, Inc.
       5             :  *
       6             :  * This file is part of GNU Zebra.
       7             :  *
       8             :  * Quagga is free software; you can redistribute it and/or modify it
       9             :  * under the terms of the GNU General Public License as published by the
      10             :  * Free Software Foundation; either version 2, or (at your option) any
      11             :  * later version.
      12             :  *
      13             :  * Quagga is distributed in the hope that it will be useful, but
      14             :  * WITHOUT ANY WARRANTY; without even the implied warranty of
      15             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      16             :  * General Public License for more details.
      17             :  *
      18             :  * You should have received a copy of the GNU General Public License along
      19             :  * with this program; see the file COPYING; if not, write to the Free Software
      20             :  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
      21             :  */
      22             : 
      23             : #include <zebra.h>
      24             : #include "thread.h"
      25             : #include "memory.h"
      26             : #include "workqueue.h"
      27             : #include "linklist.h"
      28             : #include "command.h"
      29             : #include "log.h"
      30             : 
      31          11 : DEFINE_MTYPE(LIB, WORK_QUEUE, "Work queue");
      32          11 : DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_ITEM, "Work queue item");
      33          11 : DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_NAME, "Work queue name string");
      34             : 
      35             : /* master list of work_queues */
      36             : static struct list _work_queues;
      37             : /* pointer primarily to avoid an otherwise harmless warning on
      38             :  * ALL_LIST_ELEMENTS_RO
      39             :  */
      40             : static struct list *work_queues = &_work_queues;
      41             : 
      42             : #define WORK_QUEUE_MIN_GRANULARITY 1
      43             : 
      44           3 : static struct work_queue_item *work_queue_item_new(struct work_queue *wq)
      45             : {
      46           3 :         struct work_queue_item *item;
      47           3 :         assert(wq);
      48             : 
      49           3 :         item = XCALLOC(MTYPE_WORK_QUEUE_ITEM, sizeof(struct work_queue_item));
      50             : 
      51           3 :         return item;
      52             : }
      53             : 
      54           3 : static void work_queue_item_free(struct work_queue_item *item)
      55             : {
      56           6 :         XFREE(MTYPE_WORK_QUEUE_ITEM, item);
      57           3 :         return;
      58             : }
      59             : 
      60           3 : static void work_queue_item_remove(struct work_queue *wq,
      61             :                                    struct work_queue_item *item)
      62             : {
      63           3 :         assert(item && item->data);
      64             : 
      65             :         /* call private data deletion callback if needed */
      66           3 :         if (wq->spec.del_item_data)
      67           0 :                 wq->spec.del_item_data(wq, item->data);
      68             : 
      69           3 :         work_queue_item_dequeue(wq, item);
      70             : 
      71           3 :         work_queue_item_free(item);
      72             : 
      73           3 :         return;
      74             : }
      75             : 
      76             : /* create new work queue */
      77           1 : struct work_queue *work_queue_new(struct thread_master *m,
      78             :                                   const char *queue_name)
      79             : {
      80           1 :         struct work_queue *new;
      81             : 
      82           1 :         new = XCALLOC(MTYPE_WORK_QUEUE, sizeof(struct work_queue));
      83             : 
      84           1 :         new->name = XSTRDUP(MTYPE_WORK_QUEUE_NAME, queue_name);
      85           1 :         new->master = m;
      86           1 :         SET_FLAG(new->flags, WQ_UNPLUGGED);
      87             : 
      88           1 :         STAILQ_INIT(&new->items);
      89             : 
      90           1 :         listnode_add(work_queues, new);
      91             : 
      92           1 :         new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
      93             : 
      94             :         /* Default values, can be overridden by caller */
      95           1 :         new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
      96           1 :         new->spec.yield = THREAD_YIELD_TIME_SLOT;
      97           1 :         new->spec.retry = WORK_QUEUE_DEFAULT_RETRY;
      98             : 
      99           1 :         return new;
     100             : }
     101             : 
     102           1 : void work_queue_free_and_null(struct work_queue **wqp)
     103             : {
     104           1 :         struct work_queue *wq = *wqp;
     105             : 
     106           1 :         THREAD_OFF(wq->thread);
     107             : 
     108           1 :         while (!work_queue_empty(wq)) {
     109           0 :                 struct work_queue_item *item = work_queue_last_item(wq);
     110             : 
     111           0 :                 work_queue_item_remove(wq, item);
     112             :         }
     113             : 
     114           1 :         listnode_delete(work_queues, wq);
     115             : 
     116           1 :         XFREE(MTYPE_WORK_QUEUE_NAME, wq->name);
     117           1 :         XFREE(MTYPE_WORK_QUEUE, wq);
     118             : 
     119           1 :         *wqp = NULL;
     120           1 : }
     121             : 
     122           0 : bool work_queue_is_scheduled(struct work_queue *wq)
     123             : {
     124           0 :         return thread_is_scheduled(wq->thread);
     125             : }
     126             : 
     127           3 : static int work_queue_schedule(struct work_queue *wq, unsigned int delay)
     128             : {
     129             :         /* if appropriate, schedule work queue thread */
     130           3 :         if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) &&
     131           3 :             !thread_is_scheduled(wq->thread) && !work_queue_empty(wq)) {
     132             :                 /* Schedule timer if there's a delay, otherwise just schedule
     133             :                  * as an 'event'
     134             :                  */
     135           3 :                 if (delay > 0) {
     136           3 :                         thread_add_timer_msec(wq->master, work_queue_run, wq,
     137             :                                               delay, &wq->thread);
     138           3 :                         thread_ignore_late_timer(wq->thread);
     139             :                 } else
     140           0 :                         thread_add_event(wq->master, work_queue_run, wq, 0,
     141             :                                          &wq->thread);
     142             : 
     143             :                 /* set thread yield time, if needed */
     144           3 :                 if (thread_is_scheduled(wq->thread) &&
     145           3 :                     wq->spec.yield != THREAD_YIELD_TIME_SLOT)
     146           0 :                         thread_set_yield_time(wq->thread, wq->spec.yield);
     147           3 :                 return 1;
     148             :         } else
     149             :                 return 0;
     150             : }
     151             : 
     152           3 : void work_queue_add(struct work_queue *wq, void *data)
     153             : {
     154           3 :         struct work_queue_item *item;
     155             : 
     156           3 :         assert(wq);
     157             : 
     158           3 :         item = work_queue_item_new(wq);
     159             : 
     160           3 :         item->data = data;
     161           3 :         work_queue_item_enqueue(wq, item);
     162             : 
     163           3 :         work_queue_schedule(wq, wq->spec.hold);
     164             : 
     165           3 :         return;
     166             : }
     167             : 
     168          29 : static void work_queue_item_requeue(struct work_queue *wq,
     169             :                                     struct work_queue_item *item)
     170             : {
     171          29 :         work_queue_item_dequeue(wq, item);
     172             : 
     173             :         /* attach to end of list */
     174          29 :         work_queue_item_enqueue(wq, item);
     175          29 : }
     176             : 
     177           0 : DEFUN (show_work_queues,
     178             :        show_work_queues_cmd,
     179             :        "show work-queues",
     180             :        SHOW_STR
     181             :        "Work Queue information\n")
     182             : {
     183           0 :         struct listnode *node;
     184           0 :         struct work_queue *wq;
     185             : 
     186           0 :         vty_out(vty, "%c %8s %5s %8s %8s %21s\n", ' ', "List", "(ms) ",
     187             :                 "Q. Runs", "Yields", "Cycle Counts   ");
     188           0 :         vty_out(vty, "%c %8s %5s %8s %8s %7s %6s %8s %6s %s\n", 'P', "Items",
     189             :                 "Hold", "Total", "Total", "Best", "Gran.", "Total", "Avg.",
     190             :                 "Name");
     191             : 
     192           0 :         for (ALL_LIST_ELEMENTS_RO(work_queues, node, wq)) {
     193           0 :                 vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n",
     194           0 :                         (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
     195             :                         work_queue_item_count(wq), wq->spec.hold, wq->runs,
     196             :                         wq->yields, wq->cycles.best, wq->cycles.granularity,
     197             :                         wq->cycles.total,
     198           0 :                         (wq->runs) ? (unsigned int)(wq->cycles.total / wq->runs)
     199             :                                    : 0,
     200             :                         wq->name);
     201             :         }
     202             : 
     203           0 :         return CMD_SUCCESS;
     204             : }
     205             : 
     206           3 : void workqueue_cmd_init(void)
     207             : {
     208           3 :         install_element(VIEW_NODE, &show_work_queues_cmd);
     209           3 : }
     210             : 
     211             : /* 'plug' a queue: Stop it from being scheduled,
     212             :  * ie: prevent the queue from draining.
     213             :  */
     214           0 : void work_queue_plug(struct work_queue *wq)
     215             : {
     216           0 :         THREAD_OFF(wq->thread);
     217             : 
     218           0 :         UNSET_FLAG(wq->flags, WQ_UNPLUGGED);
     219           0 : }
     220             : 
     221             : /* unplug queue, schedule it again, if appropriate
     222             :  * Ie: Allow the queue to be drained again
     223             :  */
     224           0 : void work_queue_unplug(struct work_queue *wq)
     225             : {
     226           0 :         SET_FLAG(wq->flags, WQ_UNPLUGGED);
     227             : 
     228             :         /* if thread isnt already waiting, add one */
     229           0 :         work_queue_schedule(wq, wq->spec.hold);
     230           0 : }
     231             : 
     232             : /* timer thread to process a work queue
     233             :  * will reschedule itself if required,
     234             :  * otherwise work_queue_item_add
     235             :  */
     236           3 : void work_queue_run(struct thread *thread)
     237             : {
     238           3 :         struct work_queue *wq;
     239           3 :         struct work_queue_item *item, *titem;
     240           3 :         wq_item_status ret = WQ_SUCCESS;
     241           3 :         unsigned int cycles = 0;
     242           3 :         char yielded = 0;
     243             : 
     244           3 :         wq = THREAD_ARG(thread);
     245             : 
     246           3 :         assert(wq);
     247             : 
     248             :         /* calculate cycle granularity:
     249             :          * list iteration == 1 run
     250             :          * listnode processing == 1 cycle
     251             :          * granularity == # cycles between checks whether we should yield.
     252             :          *
     253             :          * granularity should be > 0, and can increase slowly after each run to
     254             :          * provide some hysteris, but not past cycles.best or 2*cycles.
     255             :          *
     256             :          * Best: starts low, can only increase
     257             :          *
     258             :          * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased
     259             :          *              if we run to end of time slot, can increase otherwise
     260             :          *              by a small factor.
     261             :          *
     262             :          * We could use just the average and save some work, however we want to
     263             :          * be
     264             :          * able to adjust quickly to CPU pressure. Average wont shift much if
     265             :          * daemon has been running a long time.
     266             :          */
     267           3 :         if (wq->cycles.granularity == 0)
     268           0 :                 wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
     269             : 
     270          35 :         STAILQ_FOREACH_SAFE (item, &wq->items, wq, titem) {
     271          32 :                 assert(item->data);
     272             : 
     273             :                 /* dont run items which are past their allowed retries */
     274          32 :                 if (item->ran > wq->spec.max_retries) {
     275           0 :                         work_queue_item_remove(wq, item);
     276           0 :                         continue;
     277             :                 }
     278             : 
     279             :                 /* run and take care of items that want to be retried
     280             :                  * immediately */
     281          32 :                 do {
     282          32 :                         ret = wq->spec.workfunc(wq, item->data);
     283          32 :                         item->ran++;
     284             :                 } while ((ret == WQ_RETRY_NOW)
     285          32 :                          && (item->ran < wq->spec.max_retries));
     286             : 
     287          32 :                 switch (ret) {
     288           0 :                 case WQ_QUEUE_BLOCKED: {
     289             :                         /* decrement item->ran again, cause this isn't an item
     290             :                          * specific error, and fall through to WQ_RETRY_LATER
     291             :                          */
     292           0 :                         item->ran--;
     293             :                 }
     294           0 :                 case WQ_RETRY_LATER: {
     295           0 :                         goto stats;
     296             :                 }
     297          29 :                 case WQ_REQUEUE: {
     298          29 :                         item->ran--;
     299          29 :                         work_queue_item_requeue(wq, item);
     300             :                         /* If a single node is being used with a meta-queue
     301             :                          * (e.g., zebra),
     302             :                          * update the next node as we don't want to exit the
     303             :                          * thread and
     304             :                          * reschedule it after every node. By definition,
     305             :                          * WQ_REQUEUE is
     306             :                          * meant to continue the processing; the yield logic
     307             :                          * will kick in
     308             :                          * to terminate the thread when time has exceeded.
     309             :                          */
     310          29 :                         if (titem == NULL)
     311          32 :                                 titem = item;
     312             :                         break;
     313             :                 }
     314           3 :                 case WQ_RETRY_NOW:
     315             :                 /* a RETRY_NOW that gets here has exceeded max_tries, same as
     316             :                  * ERROR */
     317             :                 /* fallthru */
     318             :                 case WQ_SUCCESS:
     319             :                 default: {
     320           3 :                         work_queue_item_remove(wq, item);
     321           3 :                         break;
     322             :                 }
     323             :                 }
     324             : 
     325             :                 /* completed cycle */
     326          32 :                 cycles++;
     327             : 
     328             :                 /* test if we should yield */
     329          32 :                 if (!(cycles % wq->cycles.granularity)
     330          18 :                     && thread_should_yield(thread)) {
     331           0 :                         yielded = 1;
     332           0 :                         goto stats;
     333             :                 }
     334             :         }
     335             : 
     336           3 : stats:
     337             : 
     338             : #define WQ_HYSTERESIS_FACTOR 4
     339             : 
     340             :         /* we yielded, check whether granularity should be reduced */
     341           3 :         if (yielded && (cycles < wq->cycles.granularity)) {
     342           0 :                 wq->cycles.granularity =
     343             :                         ((cycles > 0) ? cycles : WORK_QUEUE_MIN_GRANULARITY);
     344             :         }
     345             :         /* otherwise, should granularity increase? */
     346           3 :         else if (cycles >= (wq->cycles.granularity)) {
     347           2 :                 if (cycles > wq->cycles.best)
     348           1 :                         wq->cycles.best = cycles;
     349             : 
     350             :                 /* along with yielded check, provides hysteresis for granularity
     351             :                  */
     352           2 :                 if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
     353           2 :                               * WQ_HYSTERESIS_FACTOR))
     354           0 :                         wq->cycles.granularity *=
     355             :                                 WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
     356           2 :                 else if (cycles
     357           2 :                          > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
     358           1 :                         wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
     359             :         }
     360             : #undef WQ_HYSTERIS_FACTOR
     361             : 
     362           3 :         wq->runs++;
     363           3 :         wq->cycles.total += cycles;
     364           3 :         if (yielded)
     365           0 :                 wq->yields++;
     366             : 
     367             :         /* Is the queue done yet? If it is, call the completion callback. */
     368           3 :         if (!work_queue_empty(wq)) {
     369           0 :                 if (ret == WQ_RETRY_LATER ||
     370           0 :                     ret == WQ_QUEUE_BLOCKED)
     371           0 :                         work_queue_schedule(wq, wq->spec.retry);
     372             :                 else
     373           0 :                         work_queue_schedule(wq, 0);
     374             : 
     375           3 :         } else if (wq->spec.completion_func)
     376           0 :                 wq->spec.completion_func(wq);
     377           3 : }

Generated by: LCOV version v1.16-topotato