Line data Source code
1 : /* Thread management routine
2 : * Copyright (C) 1998, 2000 Kunihiro Ishiguro <kunihiro@zebra.org>
3 : *
4 : * This file is part of GNU Zebra.
5 : *
6 : * GNU Zebra is free software; you can redistribute it and/or modify it
7 : * under the terms of the GNU General Public License as published by the
8 : * Free Software Foundation; either version 2, or (at your option) any
9 : * later version.
10 : *
11 : * GNU Zebra is distributed in the hope that it will be useful, but
12 : * WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 : * General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License along
17 : * with this program; see the file COPYING; if not, write to the Free Software
18 : * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 : */
20 :
21 : /* #define DEBUG */
22 :
23 : #include <zebra.h>
24 : #include <sys/resource.h>
25 :
26 : #include "thread.h"
27 : #include "memory.h"
28 : #include "frrcu.h"
29 : #include "log.h"
30 : #include "hash.h"
31 : #include "command.h"
32 : #include "sigevent.h"
33 : #include "network.h"
34 : #include "jhash.h"
35 : #include "frratomic.h"
36 : #include "frr_pthread.h"
37 : #include "lib_errors.h"
38 : #include "libfrr_trace.h"
39 : #include "libfrr.h"
40 :
41 36 : DEFINE_MTYPE_STATIC(LIB, THREAD, "Thread");
42 36 : DEFINE_MTYPE_STATIC(LIB, THREAD_MASTER, "Thread master");
43 36 : DEFINE_MTYPE_STATIC(LIB, THREAD_POLL, "Thread Poll Info");
44 36 : DEFINE_MTYPE_STATIC(LIB, THREAD_STATS, "Thread stats");
45 :
46 45218 : DECLARE_LIST(thread_list, struct thread, threaditem);
47 :
48 : struct cancel_req {
49 : int flags;
50 : struct thread *thread;
51 : void *eventobj;
52 : struct thread **threadref;
53 : };
54 :
55 : /* Flags for task cancellation */
56 : #define THREAD_CANCEL_FLAG_READY 0x01
57 :
58 910 : static int thread_timer_cmp(const struct thread *a, const struct thread *b)
59 : {
60 910 : if (a->u.sands.tv_sec < b->u.sands.tv_sec)
61 : return -1;
62 615 : if (a->u.sands.tv_sec > b->u.sands.tv_sec)
63 : return 1;
64 161 : if (a->u.sands.tv_usec < b->u.sands.tv_usec)
65 : return -1;
66 25 : if (a->u.sands.tv_usec > b->u.sands.tv_usec)
67 : return 1;
68 : return 0;
69 : }
70 :
71 1509 : DECLARE_HEAP(thread_timer_list, struct thread, timeritem, thread_timer_cmp);
72 :
73 : #if defined(__APPLE__)
74 : #include <mach/mach.h>
75 : #include <mach/mach_time.h>
76 : #endif
77 :
78 : #define AWAKEN(m) \
79 : do { \
80 : const unsigned char wakebyte = 0x01; \
81 : write(m->io_pipe[1], &wakebyte, 1); \
82 : } while (0);
83 :
84 : /* control variable for initializer */
85 : static pthread_once_t init_once = PTHREAD_ONCE_INIT;
86 : pthread_key_t thread_current;
87 :
88 : static pthread_mutex_t masters_mtx = PTHREAD_MUTEX_INITIALIZER;
89 : static struct list *masters;
90 :
91 : static void thread_free(struct thread_master *master, struct thread *thread);
92 :
93 : #ifndef EXCLUDE_CPU_TIME
94 : #define EXCLUDE_CPU_TIME 0
95 : #endif
96 : #ifndef CONSUMED_TIME_CHECK
97 : #define CONSUMED_TIME_CHECK 0
98 : #endif
99 :
100 : bool cputime_enabled = !EXCLUDE_CPU_TIME;
101 : unsigned long cputime_threshold = CONSUMED_TIME_CHECK;
102 : unsigned long walltime_threshold = CONSUMED_TIME_CHECK;
103 :
104 : /* CLI start ---------------------------------------------------------------- */
105 : #include "lib/thread_clippy.c"
106 :
107 1471 : static unsigned int cpu_record_hash_key(const struct cpu_thread_history *a)
108 : {
109 1471 : int size = sizeof(a->func);
110 :
111 1471 : return jhash(&a->func, size, 0);
112 : }
113 :
114 1211 : static bool cpu_record_hash_cmp(const struct cpu_thread_history *a,
115 : const struct cpu_thread_history *b)
116 : {
117 1211 : return a->func == b->func;
118 : }
119 :
120 260 : static void *cpu_record_hash_alloc(struct cpu_thread_history *a)
121 : {
122 260 : struct cpu_thread_history *new;
123 260 : new = XCALLOC(MTYPE_THREAD_STATS, sizeof(struct cpu_thread_history));
124 260 : new->func = a->func;
125 260 : new->funcname = a->funcname;
126 260 : return new;
127 : }
128 :
129 260 : static void cpu_record_hash_free(void *a)
130 : {
131 260 : struct cpu_thread_history *hist = a;
132 :
133 260 : XFREE(MTYPE_THREAD_STATS, hist);
134 260 : }
135 :
136 0 : static void vty_out_cpu_thread_history(struct vty *vty,
137 : struct cpu_thread_history *a)
138 : {
139 0 : vty_out(vty,
140 : "%5zu %10zu.%03zu %9zu %8zu %9zu %8zu %9zu %9zu %9zu %10zu",
141 0 : a->total_active, a->cpu.total / 1000, a->cpu.total % 1000,
142 0 : a->total_calls, (a->cpu.total / a->total_calls), a->cpu.max,
143 0 : (a->real.total / a->total_calls), a->real.max,
144 0 : a->total_cpu_warn, a->total_wall_warn, a->total_starv_warn);
145 0 : vty_out(vty, " %c%c%c%c%c %s\n",
146 0 : a->types & (1 << THREAD_READ) ? 'R' : ' ',
147 0 : a->types & (1 << THREAD_WRITE) ? 'W' : ' ',
148 0 : a->types & (1 << THREAD_TIMER) ? 'T' : ' ',
149 0 : a->types & (1 << THREAD_EVENT) ? 'E' : ' ',
150 0 : a->types & (1 << THREAD_EXECUTE) ? 'X' : ' ', a->funcname);
151 0 : }
152 :
153 0 : static void cpu_record_hash_print(struct hash_bucket *bucket, void *args[])
154 : {
155 0 : struct cpu_thread_history *totals = args[0];
156 0 : struct cpu_thread_history copy;
157 0 : struct vty *vty = args[1];
158 0 : uint8_t *filter = args[2];
159 :
160 0 : struct cpu_thread_history *a = bucket->data;
161 :
162 0 : copy.total_active =
163 0 : atomic_load_explicit(&a->total_active, memory_order_seq_cst);
164 0 : copy.total_calls =
165 0 : atomic_load_explicit(&a->total_calls, memory_order_seq_cst);
166 0 : copy.total_cpu_warn =
167 0 : atomic_load_explicit(&a->total_cpu_warn, memory_order_seq_cst);
168 0 : copy.total_wall_warn =
169 0 : atomic_load_explicit(&a->total_wall_warn, memory_order_seq_cst);
170 0 : copy.total_starv_warn = atomic_load_explicit(&a->total_starv_warn,
171 : memory_order_seq_cst);
172 0 : copy.cpu.total =
173 0 : atomic_load_explicit(&a->cpu.total, memory_order_seq_cst);
174 0 : copy.cpu.max = atomic_load_explicit(&a->cpu.max, memory_order_seq_cst);
175 0 : copy.real.total =
176 0 : atomic_load_explicit(&a->real.total, memory_order_seq_cst);
177 0 : copy.real.max =
178 0 : atomic_load_explicit(&a->real.max, memory_order_seq_cst);
179 0 : copy.types = atomic_load_explicit(&a->types, memory_order_seq_cst);
180 0 : copy.funcname = a->funcname;
181 :
182 0 : if (!(copy.types & *filter))
183 0 : return;
184 :
185 0 : vty_out_cpu_thread_history(vty, ©);
186 0 : totals->total_active += copy.total_active;
187 0 : totals->total_calls += copy.total_calls;
188 0 : totals->total_cpu_warn += copy.total_cpu_warn;
189 0 : totals->total_wall_warn += copy.total_wall_warn;
190 0 : totals->total_starv_warn += copy.total_starv_warn;
191 0 : totals->real.total += copy.real.total;
192 0 : if (totals->real.max < copy.real.max)
193 0 : totals->real.max = copy.real.max;
194 0 : totals->cpu.total += copy.cpu.total;
195 0 : if (totals->cpu.max < copy.cpu.max)
196 0 : totals->cpu.max = copy.cpu.max;
197 : }
198 :
199 0 : static void cpu_record_print(struct vty *vty, uint8_t filter)
200 : {
201 0 : struct cpu_thread_history tmp;
202 0 : void *args[3] = {&tmp, vty, &filter};
203 0 : struct thread_master *m;
204 0 : struct listnode *ln;
205 :
206 0 : if (!cputime_enabled)
207 0 : vty_out(vty,
208 : "\n"
209 : "Collecting CPU time statistics is currently disabled. Following statistics\n"
210 : "will be zero or may display data from when collection was enabled. Use the\n"
211 : " \"service cputime-stats\" command to start collecting data.\n"
212 : "\nCounters and wallclock times are always maintained and should be accurate.\n");
213 :
214 0 : memset(&tmp, 0, sizeof(tmp));
215 0 : tmp.funcname = "TOTAL";
216 0 : tmp.types = filter;
217 :
218 0 : frr_with_mutex (&masters_mtx) {
219 0 : for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
220 0 : const char *name = m->name ? m->name : "main";
221 :
222 0 : char underline[strlen(name) + 1];
223 0 : memset(underline, '-', sizeof(underline));
224 0 : underline[sizeof(underline) - 1] = '\0';
225 :
226 0 : vty_out(vty, "\n");
227 0 : vty_out(vty, "Showing statistics for pthread %s\n",
228 : name);
229 0 : vty_out(vty, "-------------------------------%s\n",
230 : underline);
231 0 : vty_out(vty, "%30s %18s %18s\n", "",
232 : "CPU (user+system):", "Real (wall-clock):");
233 0 : vty_out(vty,
234 : "Active Runtime(ms) Invoked Avg uSec Max uSecs");
235 0 : vty_out(vty, " Avg uSec Max uSecs");
236 0 : vty_out(vty,
237 : " CPU_Warn Wall_Warn Starv_Warn Type Thread\n");
238 :
239 0 : if (m->cpu_record->count)
240 0 : hash_iterate(
241 : m->cpu_record,
242 : (void (*)(struct hash_bucket *,
243 : void *))cpu_record_hash_print,
244 : args);
245 : else
246 0 : vty_out(vty, "No data to display yet.\n");
247 :
248 0 : vty_out(vty, "\n");
249 : }
250 : }
251 :
252 0 : vty_out(vty, "\n");
253 0 : vty_out(vty, "Total thread statistics\n");
254 0 : vty_out(vty, "-------------------------\n");
255 0 : vty_out(vty, "%30s %18s %18s\n", "",
256 : "CPU (user+system):", "Real (wall-clock):");
257 0 : vty_out(vty, "Active Runtime(ms) Invoked Avg uSec Max uSecs");
258 0 : vty_out(vty, " Avg uSec Max uSecs CPU_Warn Wall_Warn");
259 0 : vty_out(vty, " Type Thread\n");
260 :
261 0 : if (tmp.total_calls > 0)
262 0 : vty_out_cpu_thread_history(vty, &tmp);
263 0 : }
264 :
265 0 : static void cpu_record_hash_clear(struct hash_bucket *bucket, void *args[])
266 : {
267 0 : uint8_t *filter = args[0];
268 0 : struct hash *cpu_record = args[1];
269 :
270 0 : struct cpu_thread_history *a = bucket->data;
271 :
272 0 : if (!(a->types & *filter))
273 : return;
274 :
275 0 : hash_release(cpu_record, bucket->data);
276 : }
277 :
278 0 : static void cpu_record_clear(uint8_t filter)
279 : {
280 0 : uint8_t *tmp = &filter;
281 0 : struct thread_master *m;
282 0 : struct listnode *ln;
283 :
284 0 : frr_with_mutex (&masters_mtx) {
285 0 : for (ALL_LIST_ELEMENTS_RO(masters, ln, m)) {
286 0 : frr_with_mutex (&m->mtx) {
287 0 : void *args[2] = {tmp, m->cpu_record};
288 0 : hash_iterate(
289 : m->cpu_record,
290 : (void (*)(struct hash_bucket *,
291 : void *))cpu_record_hash_clear,
292 : args);
293 : }
294 : }
295 : }
296 0 : }
297 :
298 0 : static uint8_t parse_filter(const char *filterstr)
299 : {
300 0 : int i = 0;
301 0 : int filter = 0;
302 :
303 0 : while (filterstr[i] != '\0') {
304 0 : switch (filterstr[i]) {
305 0 : case 'r':
306 : case 'R':
307 0 : filter |= (1 << THREAD_READ);
308 0 : break;
309 0 : case 'w':
310 : case 'W':
311 0 : filter |= (1 << THREAD_WRITE);
312 0 : break;
313 0 : case 't':
314 : case 'T':
315 0 : filter |= (1 << THREAD_TIMER);
316 0 : break;
317 0 : case 'e':
318 : case 'E':
319 0 : filter |= (1 << THREAD_EVENT);
320 0 : break;
321 0 : case 'x':
322 : case 'X':
323 0 : filter |= (1 << THREAD_EXECUTE);
324 0 : break;
325 : default:
326 : break;
327 : }
328 0 : ++i;
329 : }
330 0 : return filter;
331 : }
332 :
333 0 : DEFUN_NOSH (show_thread_cpu,
334 : show_thread_cpu_cmd,
335 : "show thread cpu [FILTER]",
336 : SHOW_STR
337 : "Thread information\n"
338 : "Thread CPU usage\n"
339 : "Display filter (rwtex)\n")
340 : {
341 0 : uint8_t filter = (uint8_t)-1U;
342 0 : int idx = 0;
343 :
344 0 : if (argv_find(argv, argc, "FILTER", &idx)) {
345 0 : filter = parse_filter(argv[idx]->arg);
346 0 : if (!filter) {
347 0 : vty_out(vty,
348 : "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
349 : argv[idx]->arg);
350 0 : return CMD_WARNING;
351 : }
352 : }
353 :
354 0 : cpu_record_print(vty, filter);
355 0 : return CMD_SUCCESS;
356 : }
357 :
358 0 : DEFPY (service_cputime_stats,
359 : service_cputime_stats_cmd,
360 : "[no] service cputime-stats",
361 : NO_STR
362 : "Set up miscellaneous service\n"
363 : "Collect CPU usage statistics\n")
364 : {
365 0 : cputime_enabled = !no;
366 0 : return CMD_SUCCESS;
367 : }
368 :
369 0 : DEFPY (service_cputime_warning,
370 : service_cputime_warning_cmd,
371 : "[no] service cputime-warning (1-4294967295)",
372 : NO_STR
373 : "Set up miscellaneous service\n"
374 : "Warn for tasks exceeding CPU usage threshold\n"
375 : "Warning threshold in milliseconds\n")
376 : {
377 0 : if (no)
378 0 : cputime_threshold = 0;
379 : else
380 0 : cputime_threshold = cputime_warning * 1000;
381 : return CMD_SUCCESS;
382 : }
383 :
384 : ALIAS (service_cputime_warning,
385 : no_service_cputime_warning_cmd,
386 : "no service cputime-warning",
387 : NO_STR
388 : "Set up miscellaneous service\n"
389 : "Warn for tasks exceeding CPU usage threshold\n")
390 :
391 0 : DEFPY (service_walltime_warning,
392 : service_walltime_warning_cmd,
393 : "[no] service walltime-warning (1-4294967295)",
394 : NO_STR
395 : "Set up miscellaneous service\n"
396 : "Warn for tasks exceeding total wallclock threshold\n"
397 : "Warning threshold in milliseconds\n")
398 : {
399 0 : if (no)
400 0 : walltime_threshold = 0;
401 : else
402 0 : walltime_threshold = walltime_warning * 1000;
403 : return CMD_SUCCESS;
404 : }
405 :
406 : ALIAS (service_walltime_warning,
407 : no_service_walltime_warning_cmd,
408 : "no service walltime-warning",
409 : NO_STR
410 : "Set up miscellaneous service\n"
411 : "Warn for tasks exceeding total wallclock threshold\n")
412 :
413 0 : static void show_thread_poll_helper(struct vty *vty, struct thread_master *m)
414 0 : {
415 0 : const char *name = m->name ? m->name : "main";
416 0 : char underline[strlen(name) + 1];
417 0 : struct thread *thread;
418 0 : uint32_t i;
419 :
420 0 : memset(underline, '-', sizeof(underline));
421 0 : underline[sizeof(underline) - 1] = '\0';
422 :
423 0 : vty_out(vty, "\nShowing poll FD's for %s\n", name);
424 0 : vty_out(vty, "----------------------%s\n", underline);
425 0 : vty_out(vty, "Count: %u/%d\n", (uint32_t)m->handler.pfdcount,
426 : m->fd_limit);
427 0 : for (i = 0; i < m->handler.pfdcount; i++) {
428 0 : vty_out(vty, "\t%6d fd:%6d events:%2d revents:%2d\t\t", i,
429 0 : m->handler.pfds[i].fd, m->handler.pfds[i].events,
430 0 : m->handler.pfds[i].revents);
431 :
432 0 : if (m->handler.pfds[i].events & POLLIN) {
433 0 : thread = m->read[m->handler.pfds[i].fd];
434 :
435 0 : if (!thread)
436 0 : vty_out(vty, "ERROR ");
437 : else
438 0 : vty_out(vty, "%s ", thread->xref->funcname);
439 : } else
440 0 : vty_out(vty, " ");
441 :
442 0 : if (m->handler.pfds[i].events & POLLOUT) {
443 0 : thread = m->write[m->handler.pfds[i].fd];
444 :
445 0 : if (!thread)
446 0 : vty_out(vty, "ERROR\n");
447 : else
448 0 : vty_out(vty, "%s\n", thread->xref->funcname);
449 : } else
450 0 : vty_out(vty, "\n");
451 : }
452 0 : }
453 :
454 0 : DEFUN_NOSH (show_thread_poll,
455 : show_thread_poll_cmd,
456 : "show thread poll",
457 : SHOW_STR
458 : "Thread information\n"
459 : "Show poll FD's and information\n")
460 : {
461 0 : struct listnode *node;
462 0 : struct thread_master *m;
463 :
464 0 : frr_with_mutex (&masters_mtx) {
465 0 : for (ALL_LIST_ELEMENTS_RO(masters, node, m)) {
466 0 : show_thread_poll_helper(vty, m);
467 : }
468 : }
469 :
470 0 : return CMD_SUCCESS;
471 : }
472 :
473 :
474 0 : DEFUN (clear_thread_cpu,
475 : clear_thread_cpu_cmd,
476 : "clear thread cpu [FILTER]",
477 : "Clear stored data in all pthreads\n"
478 : "Thread information\n"
479 : "Thread CPU usage\n"
480 : "Display filter (rwtexb)\n")
481 : {
482 0 : uint8_t filter = (uint8_t)-1U;
483 0 : int idx = 0;
484 :
485 0 : if (argv_find(argv, argc, "FILTER", &idx)) {
486 0 : filter = parse_filter(argv[idx]->arg);
487 0 : if (!filter) {
488 0 : vty_out(vty,
489 : "Invalid filter \"%s\" specified; must contain at leastone of 'RWTEXB'\n",
490 : argv[idx]->arg);
491 0 : return CMD_WARNING;
492 : }
493 : }
494 :
495 0 : cpu_record_clear(filter);
496 0 : return CMD_SUCCESS;
497 : }
498 :
499 0 : static void show_thread_timers_helper(struct vty *vty, struct thread_master *m)
500 0 : {
501 0 : const char *name = m->name ? m->name : "main";
502 0 : char underline[strlen(name) + 1];
503 0 : struct thread *thread;
504 :
505 0 : memset(underline, '-', sizeof(underline));
506 0 : underline[sizeof(underline) - 1] = '\0';
507 :
508 0 : vty_out(vty, "\nShowing timers for %s\n", name);
509 0 : vty_out(vty, "-------------------%s\n", underline);
510 :
511 0 : frr_each (thread_timer_list, &m->timer, thread) {
512 0 : vty_out(vty, " %-50s%pTH\n", thread->hist->funcname, thread);
513 : }
514 0 : }
515 :
516 0 : DEFPY_NOSH (show_thread_timers,
517 : show_thread_timers_cmd,
518 : "show thread timers",
519 : SHOW_STR
520 : "Thread information\n"
521 : "Show all timers and how long they have in the system\n")
522 : {
523 0 : struct listnode *node;
524 0 : struct thread_master *m;
525 :
526 0 : frr_with_mutex (&masters_mtx) {
527 0 : for (ALL_LIST_ELEMENTS_RO(masters, node, m))
528 0 : show_thread_timers_helper(vty, m);
529 : }
530 :
531 0 : return CMD_SUCCESS;
532 : }
533 :
534 12 : void thread_cmd_init(void)
535 : {
536 12 : install_element(VIEW_NODE, &show_thread_cpu_cmd);
537 12 : install_element(VIEW_NODE, &show_thread_poll_cmd);
538 12 : install_element(ENABLE_NODE, &clear_thread_cpu_cmd);
539 :
540 12 : install_element(CONFIG_NODE, &service_cputime_stats_cmd);
541 12 : install_element(CONFIG_NODE, &service_cputime_warning_cmd);
542 12 : install_element(CONFIG_NODE, &no_service_cputime_warning_cmd);
543 12 : install_element(CONFIG_NODE, &service_walltime_warning_cmd);
544 12 : install_element(CONFIG_NODE, &no_service_walltime_warning_cmd);
545 :
546 12 : install_element(VIEW_NODE, &show_thread_timers_cmd);
547 12 : }
548 : /* CLI end ------------------------------------------------------------------ */
549 :
550 :
551 260 : static void cancelreq_del(void *cr)
552 : {
553 260 : XFREE(MTYPE_TMP, cr);
554 260 : }
555 :
556 : /* initializer, only ever called once */
557 12 : static void initializer(void)
558 : {
559 12 : pthread_key_create(&thread_current, NULL);
560 12 : }
561 :
562 32 : struct thread_master *thread_master_create(const char *name)
563 32 : {
564 32 : struct thread_master *rv;
565 32 : struct rlimit limit;
566 :
567 32 : pthread_once(&init_once, &initializer);
568 :
569 32 : rv = XCALLOC(MTYPE_THREAD_MASTER, sizeof(struct thread_master));
570 :
571 : /* Initialize master mutex */
572 32 : pthread_mutex_init(&rv->mtx, NULL);
573 32 : pthread_cond_init(&rv->cancel_cond, NULL);
574 :
575 : /* Set name */
576 32 : name = name ? name : "default";
577 32 : rv->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
578 :
579 : /* Initialize I/O task data structures */
580 :
581 : /* Use configured limit if present, ulimit otherwise. */
582 32 : rv->fd_limit = frr_get_fd_limit();
583 32 : if (rv->fd_limit == 0) {
584 32 : getrlimit(RLIMIT_NOFILE, &limit);
585 32 : rv->fd_limit = (int)limit.rlim_cur;
586 : }
587 :
588 32 : rv->read = XCALLOC(MTYPE_THREAD_POLL,
589 : sizeof(struct thread *) * rv->fd_limit);
590 :
591 32 : rv->write = XCALLOC(MTYPE_THREAD_POLL,
592 : sizeof(struct thread *) * rv->fd_limit);
593 :
594 32 : char tmhashname[strlen(name) + 32];
595 32 : snprintf(tmhashname, sizeof(tmhashname), "%s - threadmaster event hash",
596 : name);
597 32 : rv->cpu_record = hash_create_size(
598 : 8, (unsigned int (*)(const void *))cpu_record_hash_key,
599 : (bool (*)(const void *, const void *))cpu_record_hash_cmp,
600 : tmhashname);
601 :
602 32 : thread_list_init(&rv->event);
603 32 : thread_list_init(&rv->ready);
604 32 : thread_list_init(&rv->unuse);
605 32 : thread_timer_list_init(&rv->timer);
606 :
607 : /* Initialize thread_fetch() settings */
608 32 : rv->spin = true;
609 32 : rv->handle_signals = true;
610 :
611 : /* Set pthread owner, should be updated by actual owner */
612 32 : rv->owner = pthread_self();
613 32 : rv->cancel_req = list_new();
614 32 : rv->cancel_req->del = cancelreq_del;
615 32 : rv->canceled = true;
616 :
617 : /* Initialize pipe poker */
618 32 : pipe(rv->io_pipe);
619 32 : set_nonblocking(rv->io_pipe[0]);
620 32 : set_nonblocking(rv->io_pipe[1]);
621 :
622 : /* Initialize data structures for poll() */
623 32 : rv->handler.pfdsize = rv->fd_limit;
624 32 : rv->handler.pfdcount = 0;
625 32 : rv->handler.pfds = XCALLOC(MTYPE_THREAD_MASTER,
626 : sizeof(struct pollfd) * rv->handler.pfdsize);
627 32 : rv->handler.copy = XCALLOC(MTYPE_THREAD_MASTER,
628 : sizeof(struct pollfd) * rv->handler.pfdsize);
629 :
630 : /* add to list of threadmasters */
631 64 : frr_with_mutex (&masters_mtx) {
632 32 : if (!masters)
633 12 : masters = list_new();
634 :
635 32 : listnode_add(masters, rv);
636 : }
637 :
638 32 : return rv;
639 : }
640 :
641 0 : void thread_master_set_name(struct thread_master *master, const char *name)
642 : {
643 0 : frr_with_mutex (&master->mtx) {
644 0 : XFREE(MTYPE_THREAD_MASTER, master->name);
645 0 : master->name = XSTRDUP(MTYPE_THREAD_MASTER, name);
646 : }
647 0 : }
648 :
649 : #define THREAD_UNUSED_DEPTH 10
650 :
651 : /* Move thread to unuse list. */
652 1428 : static void thread_add_unuse(struct thread_master *m, struct thread *thread)
653 : {
654 1428 : pthread_mutex_t mtxc = thread->mtx;
655 :
656 1428 : assert(m != NULL && thread != NULL);
657 :
658 1428 : thread->hist->total_active--;
659 1428 : memset(thread, 0, sizeof(struct thread));
660 1428 : thread->type = THREAD_UNUSED;
661 :
662 : /* Restore the thread mutex context. */
663 1428 : thread->mtx = mtxc;
664 :
665 1428 : if (thread_list_count(&m->unuse) < THREAD_UNUSED_DEPTH) {
666 1378 : thread_list_add_tail(&m->unuse, thread);
667 1378 : return;
668 : }
669 :
670 50 : thread_free(m, thread);
671 : }
672 :
673 : /* Free all unused thread. */
674 96 : static void thread_list_free(struct thread_master *m,
675 : struct thread_list_head *list)
676 : {
677 96 : struct thread *t;
678 :
679 248 : while ((t = thread_list_pop(list)))
680 152 : thread_free(m, t);
681 96 : }
682 :
683 64 : static void thread_array_free(struct thread_master *m,
684 : struct thread **thread_array)
685 : {
686 64 : struct thread *t;
687 64 : int index;
688 :
689 65600 : for (index = 0; index < m->fd_limit; ++index) {
690 65536 : t = thread_array[index];
691 65536 : if (t) {
692 32 : thread_array[index] = NULL;
693 32 : thread_free(m, t);
694 : }
695 : }
696 64 : XFREE(MTYPE_THREAD_POLL, thread_array);
697 64 : }
698 :
699 : /*
700 : * thread_master_free_unused
701 : *
702 : * As threads are finished with they are put on the
703 : * unuse list for later reuse.
704 : * If we are shutting down, Free up unused threads
705 : * So we can see if we forget to shut anything off
706 : */
707 0 : void thread_master_free_unused(struct thread_master *m)
708 : {
709 0 : frr_with_mutex (&m->mtx) {
710 : struct thread *t;
711 0 : while ((t = thread_list_pop(&m->unuse)))
712 0 : thread_free(m, t);
713 : }
714 0 : }
715 :
716 : /* Stop thread scheduler. */
717 32 : void thread_master_free(struct thread_master *m)
718 : {
719 32 : struct thread *t;
720 :
721 64 : frr_with_mutex (&masters_mtx) {
722 32 : listnode_delete(masters, m);
723 32 : if (masters->count == 0) {
724 12 : list_delete(&masters);
725 : }
726 : }
727 :
728 32 : thread_array_free(m, m->read);
729 32 : thread_array_free(m, m->write);
730 39 : while ((t = thread_timer_list_pop(&m->timer)))
731 7 : thread_free(m, t);
732 32 : thread_list_free(m, &m->event);
733 32 : thread_list_free(m, &m->ready);
734 32 : thread_list_free(m, &m->unuse);
735 32 : pthread_mutex_destroy(&m->mtx);
736 32 : pthread_cond_destroy(&m->cancel_cond);
737 32 : close(m->io_pipe[0]);
738 32 : close(m->io_pipe[1]);
739 32 : list_delete(&m->cancel_req);
740 32 : m->cancel_req = NULL;
741 :
742 32 : hash_clean(m->cpu_record, cpu_record_hash_free);
743 32 : hash_free(m->cpu_record);
744 32 : m->cpu_record = NULL;
745 :
746 32 : XFREE(MTYPE_THREAD_MASTER, m->name);
747 32 : XFREE(MTYPE_THREAD_MASTER, m->handler.pfds);
748 32 : XFREE(MTYPE_THREAD_MASTER, m->handler.copy);
749 32 : XFREE(MTYPE_THREAD_MASTER, m);
750 32 : }
751 :
752 : /* Return remain time in milliseconds. */
753 20 : unsigned long thread_timer_remain_msec(struct thread *thread)
754 : {
755 20 : int64_t remain;
756 :
757 20 : if (!thread_is_scheduled(thread))
758 : return 0;
759 :
760 20 : frr_with_mutex (&thread->mtx) {
761 20 : remain = monotime_until(&thread->u.sands, NULL) / 1000LL;
762 : }
763 :
764 20 : return remain < 0 ? 0 : remain;
765 : }
766 :
767 : /* Return remain time in seconds. */
768 13 : unsigned long thread_timer_remain_second(struct thread *thread)
769 : {
770 13 : return thread_timer_remain_msec(thread) / 1000LL;
771 : }
772 :
773 0 : struct timeval thread_timer_remain(struct thread *thread)
774 : {
775 0 : struct timeval remain;
776 0 : frr_with_mutex (&thread->mtx) {
777 0 : monotime_until(&thread->u.sands, &remain);
778 : }
779 0 : return remain;
780 : }
781 :
782 0 : static int time_hhmmss(char *buf, int buf_size, long sec)
783 : {
784 0 : long hh;
785 0 : long mm;
786 0 : int wr;
787 :
788 0 : assert(buf_size >= 8);
789 :
790 0 : hh = sec / 3600;
791 0 : sec %= 3600;
792 0 : mm = sec / 60;
793 0 : sec %= 60;
794 :
795 0 : wr = snprintf(buf, buf_size, "%02ld:%02ld:%02ld", hh, mm, sec);
796 :
797 0 : return wr != 8;
798 : }
799 :
800 0 : char *thread_timer_to_hhmmss(char *buf, int buf_size,
801 : struct thread *t_timer)
802 : {
803 0 : if (t_timer) {
804 0 : time_hhmmss(buf, buf_size,
805 0 : thread_timer_remain_second(t_timer));
806 : } else {
807 0 : snprintf(buf, buf_size, "--:--:--");
808 : }
809 0 : return buf;
810 : }
811 :
812 : /* Get new thread. */
813 1471 : static struct thread *thread_get(struct thread_master *m, uint8_t type,
814 : void (*func)(struct thread *), void *arg,
815 : const struct xref_threadsched *xref)
816 : {
817 1471 : struct thread *thread = thread_list_pop(&m->unuse);
818 1471 : struct cpu_thread_history tmp;
819 :
820 1471 : if (!thread) {
821 241 : thread = XCALLOC(MTYPE_THREAD, sizeof(struct thread));
822 : /* mutex only needs to be initialized at struct creation. */
823 241 : pthread_mutex_init(&thread->mtx, NULL);
824 241 : m->alloc++;
825 : }
826 :
827 1471 : thread->type = type;
828 1471 : thread->add_type = type;
829 1471 : thread->master = m;
830 1471 : thread->arg = arg;
831 1471 : thread->yield = THREAD_YIELD_TIME_SLOT; /* default */
832 1471 : thread->ref = NULL;
833 1471 : thread->ignore_timer_late = false;
834 :
835 : /*
836 : * So if the passed in funcname is not what we have
837 : * stored that means the thread->hist needs to be
838 : * updated. We keep the last one around in unused
839 : * under the assumption that we are probably
840 : * going to immediately allocate the same
841 : * type of thread.
842 : * This hopefully saves us some serious
843 : * hash_get lookups.
844 : */
845 1471 : if ((thread->xref && thread->xref->funcname != xref->funcname)
846 1471 : || thread->func != func) {
847 1471 : tmp.func = func;
848 1471 : tmp.funcname = xref->funcname;
849 1471 : thread->hist =
850 1471 : hash_get(m->cpu_record, &tmp,
851 : (void *(*)(void *))cpu_record_hash_alloc);
852 : }
853 1471 : thread->hist->total_active++;
854 1471 : thread->func = func;
855 1471 : thread->xref = xref;
856 :
857 1471 : return thread;
858 : }
859 :
860 241 : static void thread_free(struct thread_master *master, struct thread *thread)
861 : {
862 : /* Update statistics. */
863 241 : assert(master->alloc > 0);
864 241 : master->alloc--;
865 :
866 : /* Free allocated resources. */
867 241 : pthread_mutex_destroy(&thread->mtx);
868 241 : XFREE(MTYPE_THREAD, thread);
869 241 : }
870 :
871 21008 : static int fd_poll(struct thread_master *m, const struct timeval *timer_wait,
872 : bool *eintr_p)
873 : {
874 21008 : sigset_t origsigs;
875 21008 : unsigned char trash[64];
876 21008 : nfds_t count = m->handler.copycount;
877 :
878 : /*
879 : * If timer_wait is null here, that means poll() should block
880 : * indefinitely, unless the thread_master has overridden it by setting
881 : * ->selectpoll_timeout.
882 : *
883 : * If the value is positive, it specifies the maximum number of
884 : * milliseconds to wait. If the timeout is -1, it specifies that
885 : * we should never wait and always return immediately even if no
886 : * event is detected. If the value is zero, the behavior is default.
887 : */
888 21008 : int timeout = -1;
889 :
890 : /* number of file descriptors with events */
891 21008 : int num;
892 :
893 21008 : if (timer_wait != NULL
894 20242 : && m->selectpoll_timeout == 0) // use the default value
895 20242 : timeout = (timer_wait->tv_sec * 1000)
896 20242 : + (timer_wait->tv_usec / 1000);
897 766 : else if (m->selectpoll_timeout > 0) // use the user's timeout
898 0 : timeout = m->selectpoll_timeout;
899 766 : else if (m->selectpoll_timeout
900 : < 0) // effect a poll (return immediately)
901 0 : timeout = 0;
902 :
903 21008 : zlog_tls_buffer_flush();
904 21008 : rcu_read_unlock();
905 21007 : rcu_assert_read_unlocked();
906 :
907 : /* add poll pipe poker */
908 21007 : assert(count + 1 < m->handler.pfdsize);
909 21007 : m->handler.copy[count].fd = m->io_pipe[0];
910 21007 : m->handler.copy[count].events = POLLIN;
911 21007 : m->handler.copy[count].revents = 0x00;
912 :
913 : /* We need to deal with a signal-handling race here: we
914 : * don't want to miss a crucial signal, such as SIGTERM or SIGINT,
915 : * that may arrive just before we enter poll(). We will block the
916 : * key signals, then check whether any have arrived - if so, we return
917 : * before calling poll(). If not, we'll re-enable the signals
918 : * in the ppoll() call.
919 : */
920 :
921 21007 : sigemptyset(&origsigs);
922 21007 : if (m->handle_signals) {
923 : /* Main pthread that handles the app signals */
924 20456 : if (frr_sigevent_check(&origsigs)) {
925 : /* Signal to process - restore signal mask and return */
926 0 : pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
927 0 : num = -1;
928 0 : *eintr_p = true;
929 0 : goto done;
930 : }
931 : } else {
932 : /* Don't make any changes for the non-main pthreads */
933 551 : pthread_sigmask(SIG_SETMASK, NULL, &origsigs);
934 : }
935 :
936 : #if defined(HAVE_PPOLL)
937 21007 : struct timespec ts, *tsp;
938 :
939 21007 : if (timeout >= 0) {
940 20242 : ts.tv_sec = timeout / 1000;
941 20242 : ts.tv_nsec = (timeout % 1000) * 1000000;
942 20242 : tsp = &ts;
943 : } else
944 : tsp = NULL;
945 :
946 21007 : num = ppoll(m->handler.copy, count + 1, tsp, &origsigs);
947 21008 : pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
948 : #else
949 : /* Not ideal - there is a race after we restore the signal mask */
950 : pthread_sigmask(SIG_SETMASK, &origsigs, NULL);
951 : num = poll(m->handler.copy, count + 1, timeout);
952 : #endif
953 :
954 21008 : done:
955 :
956 21008 : if (num < 0 && errno == EINTR)
957 12 : *eintr_p = true;
958 :
959 21008 : if (num > 0 && m->handler.copy[count].revents != 0 && num--)
960 2082 : while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0)
961 : ;
962 :
963 21008 : rcu_read_lock();
964 :
965 21007 : return num;
966 : }
967 :
968 : /* Add new read thread. */
969 1311 : void _thread_add_read_write(const struct xref_threadsched *xref,
970 : struct thread_master *m,
971 : void (*func)(struct thread *), void *arg, int fd,
972 : struct thread **t_ptr)
973 : {
974 1311 : int dir = xref->thread_type;
975 1311 : struct thread *thread = NULL;
976 1311 : struct thread **thread_array;
977 :
978 1311 : if (dir == THREAD_READ)
979 932 : frrtrace(9, frr_libfrr, schedule_read, m,
980 : xref->funcname, xref->xref.file, xref->xref.line,
981 : t_ptr, fd, 0, arg, 0);
982 : else
983 379 : frrtrace(9, frr_libfrr, schedule_write, m,
984 : xref->funcname, xref->xref.file, xref->xref.line,
985 : t_ptr, fd, 0, arg, 0);
986 :
987 1311 : assert(fd >= 0);
988 1311 : if (fd >= m->fd_limit)
989 0 : assert(!"Number of FD's open is greater than FRR currently configured to handle, aborting");
990 :
991 1311 : frr_with_mutex (&m->mtx) {
992 1311 : if (t_ptr && *t_ptr)
993 : // thread is already scheduled; don't reschedule
994 : break;
995 :
996 : /* default to a new pollfd */
997 1062 : nfds_t queuepos = m->handler.pfdcount;
998 :
999 1062 : if (dir == THREAD_READ)
1000 928 : thread_array = m->read;
1001 : else
1002 134 : thread_array = m->write;
1003 :
1004 : /* if we already have a pollfd for our file descriptor, find and
1005 : * use it */
1006 5885 : for (nfds_t i = 0; i < m->handler.pfdcount; i++)
1007 5671 : if (m->handler.pfds[i].fd == fd) {
1008 : queuepos = i;
1009 :
1010 : #ifdef DEV_BUILD
1011 : /*
1012 : * What happens if we have a thread already
1013 : * created for this event?
1014 : */
1015 : if (thread_array[fd])
1016 : assert(!"Thread already scheduled for file descriptor");
1017 : #endif
1018 : break;
1019 : }
1020 :
1021 : /* make sure we have room for this fd + pipe poker fd */
1022 1062 : assert(queuepos + 1 < m->handler.pfdsize);
1023 :
1024 1062 : thread = thread_get(m, dir, func, arg, xref);
1025 :
1026 1062 : m->handler.pfds[queuepos].fd = fd;
1027 1062 : m->handler.pfds[queuepos].events |=
1028 : (dir == THREAD_READ ? POLLIN : POLLOUT);
1029 :
1030 1062 : if (queuepos == m->handler.pfdcount)
1031 214 : m->handler.pfdcount++;
1032 :
1033 1062 : if (thread) {
1034 1062 : frr_with_mutex (&thread->mtx) {
1035 1062 : thread->u.fd = fd;
1036 1062 : thread_array[thread->u.fd] = thread;
1037 : }
1038 :
1039 1062 : if (t_ptr) {
1040 1026 : *t_ptr = thread;
1041 1026 : thread->ref = t_ptr;
1042 : }
1043 : }
1044 :
1045 1062 : AWAKEN(m);
1046 : }
1047 1311 : }
1048 :
1049 187 : static void _thread_add_timer_timeval(const struct xref_threadsched *xref,
1050 : struct thread_master *m,
1051 : void (*func)(struct thread *), void *arg,
1052 : struct timeval *time_relative,
1053 : struct thread **t_ptr)
1054 : {
1055 187 : struct thread *thread;
1056 187 : struct timeval t;
1057 :
1058 187 : assert(m != NULL);
1059 :
1060 187 : assert(time_relative);
1061 :
1062 187 : frrtrace(9, frr_libfrr, schedule_timer, m,
1063 : xref->funcname, xref->xref.file, xref->xref.line,
1064 : t_ptr, 0, 0, arg, (long)time_relative->tv_sec);
1065 :
1066 : /* Compute expiration/deadline time. */
1067 187 : monotime(&t);
1068 187 : timeradd(&t, time_relative, &t);
1069 :
1070 198 : frr_with_mutex (&m->mtx) {
1071 187 : if (t_ptr && *t_ptr)
1072 : /* thread is already scheduled; don't reschedule */
1073 11 : return;
1074 :
1075 176 : thread = thread_get(m, THREAD_TIMER, func, arg, xref);
1076 :
1077 352 : frr_with_mutex (&thread->mtx) {
1078 176 : thread->u.sands = t;
1079 176 : thread_timer_list_add(&m->timer, thread);
1080 176 : if (t_ptr) {
1081 176 : *t_ptr = thread;
1082 176 : thread->ref = t_ptr;
1083 : }
1084 : }
1085 :
1086 : /* The timer list is sorted - if this new timer
1087 : * might change the time we'll wait for, give the pthread
1088 : * a chance to re-compute.
1089 : */
1090 352 : if (thread_timer_list_first(&m->timer) == thread)
1091 260 : AWAKEN(m);
1092 : }
1093 : #define ONEYEAR2SEC (60 * 60 * 24 * 365)
1094 176 : if (time_relative->tv_sec > ONEYEAR2SEC)
1095 176 : flog_err(
1096 : EC_LIB_TIMER_TOO_LONG,
1097 : "Timer: %pTHD is created with an expiration that is greater than 1 year",
1098 : thread);
1099 : }
1100 :
1101 :
1102 : /* Add timer event thread. */
1103 95 : void _thread_add_timer(const struct xref_threadsched *xref,
1104 : struct thread_master *m, void (*func)(struct thread *),
1105 : void *arg, long timer, struct thread **t_ptr)
1106 : {
1107 95 : struct timeval trel;
1108 :
1109 95 : assert(m != NULL);
1110 :
1111 95 : trel.tv_sec = timer;
1112 95 : trel.tv_usec = 0;
1113 :
1114 95 : _thread_add_timer_timeval(xref, m, func, arg, &trel, t_ptr);
1115 95 : }
1116 :
1117 : /* Add timer event thread with "millisecond" resolution */
1118 54 : void _thread_add_timer_msec(const struct xref_threadsched *xref,
1119 : struct thread_master *m,
1120 : void (*func)(struct thread *), void *arg,
1121 : long timer, struct thread **t_ptr)
1122 : {
1123 54 : struct timeval trel;
1124 :
1125 54 : assert(m != NULL);
1126 :
1127 54 : trel.tv_sec = timer / 1000;
1128 54 : trel.tv_usec = 1000 * (timer % 1000);
1129 :
1130 54 : _thread_add_timer_timeval(xref, m, func, arg, &trel, t_ptr);
1131 54 : }
1132 :
1133 : /* Add timer event thread with "timeval" resolution */
1134 38 : void _thread_add_timer_tv(const struct xref_threadsched *xref,
1135 : struct thread_master *m,
1136 : void (*func)(struct thread *), void *arg,
1137 : struct timeval *tv, struct thread **t_ptr)
1138 : {
1139 38 : _thread_add_timer_timeval(xref, m, func, arg, tv, t_ptr);
1140 38 : }
1141 :
1142 : /* Add simple event thread. */
1143 415 : void _thread_add_event(const struct xref_threadsched *xref,
1144 : struct thread_master *m, void (*func)(struct thread *),
1145 : void *arg, int val, struct thread **t_ptr)
1146 : {
1147 415 : struct thread *thread = NULL;
1148 :
1149 415 : frrtrace(9, frr_libfrr, schedule_event, m,
1150 : xref->funcname, xref->xref.file, xref->xref.line,
1151 : t_ptr, 0, val, arg, 0);
1152 :
1153 415 : assert(m != NULL);
1154 :
1155 415 : frr_with_mutex (&m->mtx) {
1156 415 : if (t_ptr && *t_ptr)
1157 : /* thread is already scheduled; don't reschedule */
1158 : break;
1159 :
1160 221 : thread = thread_get(m, THREAD_EVENT, func, arg, xref);
1161 221 : frr_with_mutex (&thread->mtx) {
1162 221 : thread->u.val = val;
1163 221 : thread_list_add_tail(&m->event, thread);
1164 : }
1165 :
1166 221 : if (t_ptr) {
1167 197 : *t_ptr = thread;
1168 197 : thread->ref = t_ptr;
1169 : }
1170 :
1171 221 : AWAKEN(m);
1172 : }
1173 415 : }
1174 :
1175 : /* Thread cancellation ------------------------------------------------------ */
1176 :
1177 : /**
1178 : * NOT's out the .events field of pollfd corresponding to the given file
1179 : * descriptor. The event to be NOT'd is passed in the 'state' parameter.
1180 : *
1181 : * This needs to happen for both copies of pollfd's. See 'thread_fetch'
1182 : * implementation for details.
1183 : *
1184 : * @param master
1185 : * @param fd
1186 : * @param state the event to cancel. One or more (OR'd together) of the
1187 : * following:
1188 : * - POLLIN
1189 : * - POLLOUT
1190 : */
1191 134 : static void thread_cancel_rw(struct thread_master *master, int fd, short state,
1192 : int idx_hint)
1193 : {
1194 134 : bool found = false;
1195 :
1196 : /* find the index of corresponding pollfd */
1197 134 : nfds_t i;
1198 :
1199 : /* Cancel POLLHUP too just in case some bozo set it */
1200 134 : state |= POLLHUP;
1201 :
1202 : /* Some callers know the index of the pfd already */
1203 134 : if (idx_hint >= 0) {
1204 0 : i = idx_hint;
1205 0 : found = true;
1206 : } else {
1207 : /* Have to look for the fd in the pfd array */
1208 532 : for (i = 0; i < master->handler.pfdcount; i++)
1209 532 : if (master->handler.pfds[i].fd == fd) {
1210 : found = true;
1211 : break;
1212 : }
1213 : }
1214 :
1215 134 : if (!found) {
1216 0 : zlog_debug(
1217 : "[!] Received cancellation request for nonexistent rw job");
1218 0 : zlog_debug("[!] threadmaster: %s | fd: %d",
1219 : master->name ? master->name : "", fd);
1220 0 : return;
1221 : }
1222 :
1223 : /* NOT out event. */
1224 134 : master->handler.pfds[i].events &= ~(state);
1225 :
1226 : /* If all events are canceled, delete / resize the pollfd array. */
1227 134 : if (master->handler.pfds[i].events == 0) {
1228 134 : memmove(master->handler.pfds + i, master->handler.pfds + i + 1,
1229 134 : (master->handler.pfdcount - i - 1)
1230 : * sizeof(struct pollfd));
1231 134 : master->handler.pfdcount--;
1232 134 : master->handler.pfds[master->handler.pfdcount].fd = 0;
1233 134 : master->handler.pfds[master->handler.pfdcount].events = 0;
1234 : }
1235 :
1236 : /* If we have the same pollfd in the copy, perform the same operations,
1237 : * otherwise return. */
1238 134 : if (i >= master->handler.copycount)
1239 : return;
1240 :
1241 134 : master->handler.copy[i].events &= ~(state);
1242 :
1243 134 : if (master->handler.copy[i].events == 0) {
1244 134 : memmove(master->handler.copy + i, master->handler.copy + i + 1,
1245 134 : (master->handler.copycount - i - 1)
1246 : * sizeof(struct pollfd));
1247 134 : master->handler.copycount--;
1248 134 : master->handler.copy[master->handler.copycount].fd = 0;
1249 134 : master->handler.copy[master->handler.copycount].events = 0;
1250 : }
1251 : }
1252 :
1253 : /*
1254 : * Process task cancellation given a task argument: iterate through the
1255 : * various lists of tasks, looking for any that match the argument.
1256 : */
1257 12 : static void cancel_arg_helper(struct thread_master *master,
1258 : const struct cancel_req *cr)
1259 : {
1260 12 : struct thread *t;
1261 12 : nfds_t i;
1262 12 : int fd;
1263 12 : struct pollfd *pfd;
1264 :
1265 : /* We're only processing arg-based cancellations here. */
1266 12 : if (cr->eventobj == NULL)
1267 : return;
1268 :
1269 : /* First process the ready lists. */
1270 12 : frr_each_safe(thread_list, &master->event, t) {
1271 0 : if (t->arg != cr->eventobj)
1272 0 : continue;
1273 0 : thread_list_del(&master->event, t);
1274 0 : if (t->ref)
1275 0 : *t->ref = NULL;
1276 0 : thread_add_unuse(master, t);
1277 : }
1278 :
1279 12 : frr_each_safe(thread_list, &master->ready, t) {
1280 0 : if (t->arg != cr->eventobj)
1281 0 : continue;
1282 0 : thread_list_del(&master->ready, t);
1283 0 : if (t->ref)
1284 0 : *t->ref = NULL;
1285 0 : thread_add_unuse(master, t);
1286 : }
1287 :
1288 : /* If requested, stop here and ignore io and timers */
1289 12 : if (CHECK_FLAG(cr->flags, THREAD_CANCEL_FLAG_READY))
1290 : return;
1291 :
1292 : /* Check the io tasks */
1293 72 : for (i = 0; i < master->handler.pfdcount;) {
1294 60 : pfd = master->handler.pfds + i;
1295 :
1296 60 : if (pfd->events & POLLIN)
1297 60 : t = master->read[pfd->fd];
1298 : else
1299 0 : t = master->write[pfd->fd];
1300 :
1301 60 : if (t && t->arg == cr->eventobj) {
1302 0 : fd = pfd->fd;
1303 :
1304 : /* Found a match to cancel: clean up fd arrays */
1305 0 : thread_cancel_rw(master, pfd->fd, pfd->events, i);
1306 :
1307 : /* Clean up thread arrays */
1308 0 : master->read[fd] = NULL;
1309 0 : master->write[fd] = NULL;
1310 :
1311 : /* Clear caller's ref */
1312 0 : if (t->ref)
1313 0 : *t->ref = NULL;
1314 :
1315 0 : thread_add_unuse(master, t);
1316 :
1317 : /* Don't increment 'i' since the cancellation will have
1318 : * removed the entry from the pfd array
1319 : */
1320 : } else
1321 60 : i++;
1322 : }
1323 :
1324 : /* Check the timer tasks */
1325 12 : t = thread_timer_list_first(&master->timer);
1326 54 : while (t) {
1327 42 : struct thread *t_next;
1328 :
1329 42 : t_next = thread_timer_list_next(&master->timer, t);
1330 :
1331 42 : if (t->arg == cr->eventobj) {
1332 0 : thread_timer_list_del(&master->timer, t);
1333 0 : if (t->ref)
1334 0 : *t->ref = NULL;
1335 0 : thread_add_unuse(master, t);
1336 : }
1337 :
1338 : t = t_next;
1339 : }
1340 : }
1341 :
1342 : /**
1343 : * Process cancellation requests.
1344 : *
1345 : * This may only be run from the pthread which owns the thread_master.
1346 : *
1347 : * @param master the thread master to process
1348 : * @REQUIRE master->mtx
1349 : */
1350 22432 : static void do_thread_cancel(struct thread_master *master)
1351 : {
1352 22432 : struct thread_list_head *list = NULL;
1353 22432 : struct thread **thread_array = NULL;
1354 22432 : struct thread *thread;
1355 22432 : struct cancel_req *cr;
1356 22432 : struct listnode *ln;
1357 :
1358 45123 : for (ALL_LIST_ELEMENTS_RO(master->cancel_req, ln, cr)) {
1359 : /*
1360 : * If this is an event object cancellation, search
1361 : * through task lists deleting any tasks which have the
1362 : * specified argument - use this handy helper function.
1363 : */
1364 260 : if (cr->eventobj) {
1365 12 : cancel_arg_helper(master, cr);
1366 12 : continue;
1367 : }
1368 :
1369 : /*
1370 : * The pointer varies depending on whether the cancellation
1371 : * request was made asynchronously or not. If it was, we
1372 : * need to check whether the thread even exists anymore
1373 : * before cancelling it.
1374 : */
1375 248 : thread = (cr->thread) ? cr->thread : *cr->threadref;
1376 :
1377 248 : if (!thread)
1378 4 : continue;
1379 :
1380 244 : list = NULL;
1381 244 : thread_array = NULL;
1382 :
1383 : /* Determine the appropriate queue to cancel the thread from */
1384 244 : switch (thread->type) {
1385 134 : case THREAD_READ:
1386 134 : thread_cancel_rw(master, thread->u.fd, POLLIN, -1);
1387 134 : thread_array = master->read;
1388 134 : break;
1389 0 : case THREAD_WRITE:
1390 0 : thread_cancel_rw(master, thread->u.fd, POLLOUT, -1);
1391 0 : thread_array = master->write;
1392 0 : break;
1393 110 : case THREAD_TIMER:
1394 110 : thread_timer_list_del(&master->timer, thread);
1395 110 : break;
1396 0 : case THREAD_EVENT:
1397 0 : list = &master->event;
1398 0 : break;
1399 0 : case THREAD_READY:
1400 0 : list = &master->ready;
1401 0 : break;
1402 0 : default:
1403 0 : continue;
1404 : break;
1405 : }
1406 :
1407 244 : if (list) {
1408 0 : thread_list_del(list, thread);
1409 244 : } else if (thread_array) {
1410 134 : thread_array[thread->u.fd] = NULL;
1411 : }
1412 :
1413 244 : if (thread->ref)
1414 244 : *thread->ref = NULL;
1415 :
1416 244 : thread_add_unuse(thread->master, thread);
1417 : }
1418 :
1419 : /* Delete and free all cancellation requests */
1420 22431 : if (master->cancel_req)
1421 22431 : list_delete_all_node(master->cancel_req);
1422 :
1423 : /* Wake up any threads which may be blocked in thread_cancel_async() */
1424 22431 : master->canceled = true;
1425 22431 : pthread_cond_broadcast(&master->cancel_cond);
1426 22431 : }
1427 :
1428 : /*
1429 : * Helper function used for multiple flavors of arg-based cancellation.
1430 : */
1431 12 : static void cancel_event_helper(struct thread_master *m, void *arg, int flags)
1432 : {
1433 12 : struct cancel_req *cr;
1434 :
1435 12 : assert(m->owner == pthread_self());
1436 :
1437 : /* Only worth anything if caller supplies an arg. */
1438 12 : if (arg == NULL)
1439 : return;
1440 :
1441 12 : cr = XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1442 :
1443 12 : cr->flags = flags;
1444 :
1445 24 : frr_with_mutex (&m->mtx) {
1446 12 : cr->eventobj = arg;
1447 12 : listnode_add(m->cancel_req, cr);
1448 12 : do_thread_cancel(m);
1449 : }
1450 : }
1451 :
1452 : /**
1453 : * Cancel any events which have the specified argument.
1454 : *
1455 : * MT-Unsafe
1456 : *
1457 : * @param m the thread_master to cancel from
1458 : * @param arg the argument passed when creating the event
1459 : */
1460 12 : void thread_cancel_event(struct thread_master *master, void *arg)
1461 : {
1462 12 : cancel_event_helper(master, arg, 0);
1463 12 : }
1464 :
1465 : /*
1466 : * Cancel ready tasks with an arg matching 'arg'
1467 : *
1468 : * MT-Unsafe
1469 : *
1470 : * @param m the thread_master to cancel from
1471 : * @param arg the argument passed when creating the event
1472 : */
1473 0 : void thread_cancel_event_ready(struct thread_master *m, void *arg)
1474 : {
1475 :
1476 : /* Only cancel ready/event tasks */
1477 0 : cancel_event_helper(m, arg, THREAD_CANCEL_FLAG_READY);
1478 0 : }
1479 :
1480 : /**
1481 : * Cancel a specific task.
1482 : *
1483 : * MT-Unsafe
1484 : *
1485 : * @param thread task to cancel
1486 : */
1487 240 : void thread_cancel(struct thread **thread)
1488 : {
1489 240 : struct thread_master *master;
1490 :
1491 240 : if (thread == NULL || *thread == NULL)
1492 : return;
1493 :
1494 240 : master = (*thread)->master;
1495 :
1496 240 : frrtrace(9, frr_libfrr, thread_cancel, master,
1497 : (*thread)->xref->funcname, (*thread)->xref->xref.file,
1498 : (*thread)->xref->xref.line, NULL, (*thread)->u.fd,
1499 : (*thread)->u.val, (*thread)->arg, (*thread)->u.sands.tv_sec);
1500 :
1501 240 : assert(master->owner == pthread_self());
1502 :
1503 480 : frr_with_mutex (&master->mtx) {
1504 240 : struct cancel_req *cr =
1505 240 : XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1506 240 : cr->thread = *thread;
1507 240 : listnode_add(master->cancel_req, cr);
1508 240 : do_thread_cancel(master);
1509 : }
1510 :
1511 240 : *thread = NULL;
1512 : }
1513 :
1514 : /**
1515 : * Asynchronous cancellation.
1516 : *
1517 : * Called with either a struct thread ** or void * to an event argument,
1518 : * this function posts the correct cancellation request and blocks until it is
1519 : * serviced.
1520 : *
1521 : * If the thread is currently running, execution blocks until it completes.
1522 : *
1523 : * The last two parameters are mutually exclusive, i.e. if you pass one the
1524 : * other must be NULL.
1525 : *
1526 : * When the cancellation procedure executes on the target thread_master, the
1527 : * thread * provided is checked for nullity. If it is null, the thread is
1528 : * assumed to no longer exist and the cancellation request is a no-op. Thus
1529 : * users of this API must pass a back-reference when scheduling the original
1530 : * task.
1531 : *
1532 : * MT-Safe
1533 : *
1534 : * @param master the thread master with the relevant event / task
1535 : * @param thread pointer to thread to cancel
1536 : * @param eventobj the event
1537 : */
1538 8 : void thread_cancel_async(struct thread_master *master, struct thread **thread,
1539 : void *eventobj)
1540 : {
1541 8 : assert(!(thread && eventobj) && (thread || eventobj));
1542 :
1543 8 : if (thread && *thread)
1544 4 : frrtrace(9, frr_libfrr, thread_cancel_async, master,
1545 : (*thread)->xref->funcname, (*thread)->xref->xref.file,
1546 : (*thread)->xref->xref.line, NULL, (*thread)->u.fd,
1547 : (*thread)->u.val, (*thread)->arg,
1548 : (*thread)->u.sands.tv_sec);
1549 : else
1550 4 : frrtrace(9, frr_libfrr, thread_cancel_async, master, NULL, NULL,
1551 : 0, NULL, 0, 0, eventobj, 0);
1552 :
1553 8 : assert(master->owner != pthread_self());
1554 :
1555 16 : frr_with_mutex (&master->mtx) {
1556 8 : master->canceled = false;
1557 :
1558 8 : if (thread) {
1559 8 : struct cancel_req *cr =
1560 8 : XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1561 8 : cr->threadref = thread;
1562 8 : listnode_add(master->cancel_req, cr);
1563 0 : } else if (eventobj) {
1564 0 : struct cancel_req *cr =
1565 0 : XCALLOC(MTYPE_TMP, sizeof(struct cancel_req));
1566 0 : cr->eventobj = eventobj;
1567 0 : listnode_add(master->cancel_req, cr);
1568 : }
1569 8 : AWAKEN(master);
1570 :
1571 16 : while (!master->canceled)
1572 8 : pthread_cond_wait(&master->cancel_cond, &master->mtx);
1573 : }
1574 :
1575 8 : if (thread)
1576 8 : *thread = NULL;
1577 8 : }
1578 : /* ------------------------------------------------------------------------- */
1579 :
1580 20812 : static struct timeval *thread_timer_wait(struct thread_timer_list_head *timers,
1581 : struct timeval *timer_val)
1582 : {
1583 20812 : if (!thread_timer_list_count(timers))
1584 : return NULL;
1585 :
1586 20046 : struct thread *next_timer = thread_timer_list_first(timers);
1587 20046 : monotime_until(&next_timer->u.sands, timer_val);
1588 20046 : return timer_val;
1589 : }
1590 :
1591 1172 : static struct thread *thread_run(struct thread_master *m, struct thread *thread,
1592 : struct thread *fetch)
1593 : {
1594 1172 : *fetch = *thread;
1595 1172 : thread_add_unuse(m, thread);
1596 1171 : return fetch;
1597 : }
1598 :
1599 896 : static int thread_process_io_helper(struct thread_master *m,
1600 : struct thread *thread, short state,
1601 : short actual_state, int pos)
1602 : {
1603 896 : struct thread **thread_array;
1604 :
1605 : /*
1606 : * poll() clears the .events field, but the pollfd array we
1607 : * pass to poll() is a copy of the one used to schedule threads.
1608 : * We need to synchronize state between the two here by applying
1609 : * the same changes poll() made on the copy of the "real" pollfd
1610 : * array.
1611 : *
1612 : * This cleans up a possible infinite loop where we refuse
1613 : * to respond to a poll event but poll is insistent that
1614 : * we should.
1615 : */
1616 896 : m->handler.pfds[pos].events &= ~(state);
1617 :
1618 896 : if (!thread) {
1619 0 : if ((actual_state & (POLLHUP|POLLIN)) != POLLHUP)
1620 0 : flog_err(EC_LIB_NO_THREAD,
1621 : "Attempting to process an I/O event but for fd: %d(%d) no thread to handle this!",
1622 : m->handler.pfds[pos].fd, actual_state);
1623 0 : return 0;
1624 : }
1625 :
1626 896 : if (thread->type == THREAD_READ)
1627 762 : thread_array = m->read;
1628 : else
1629 134 : thread_array = m->write;
1630 :
1631 896 : thread_array[thread->u.fd] = NULL;
1632 896 : thread_list_add_tail(&m->ready, thread);
1633 896 : thread->type = THREAD_READY;
1634 :
1635 896 : return 1;
1636 : }
1637 :
1638 : /**
1639 : * Process I/O events.
1640 : *
1641 : * Walks through file descriptor array looking for those pollfds whose .revents
1642 : * field has something interesting. Deletes any invalid file descriptors.
1643 : *
1644 : * @param m the thread master
1645 : * @param num the number of active file descriptors (return value of poll())
1646 : */
1647 938 : static void thread_process_io(struct thread_master *m, unsigned int num)
1648 : {
1649 938 : unsigned int ready = 0;
1650 938 : struct pollfd *pfds = m->handler.copy;
1651 :
1652 6343 : for (nfds_t i = 0; i < m->handler.copycount && ready < num; ++i) {
1653 : /* no event for current fd? immediately continue */
1654 5405 : if (pfds[i].revents == 0)
1655 4461 : continue;
1656 :
1657 944 : ready++;
1658 :
1659 : /*
1660 : * Unless someone has called thread_cancel from another
1661 : * pthread, the only thing that could have changed in
1662 : * m->handler.pfds while we were asleep is the .events
1663 : * field in a given pollfd. Barring thread_cancel() that
1664 : * value should be a superset of the values we have in our
1665 : * copy, so there's no need to update it. Similarily,
1666 : * barring deletion, the fd should still be a valid index
1667 : * into the master's pfds.
1668 : *
1669 : * We are including POLLERR here to do a READ event
1670 : * this is because the read should fail and the
1671 : * read function should handle it appropriately
1672 : */
1673 944 : if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
1674 762 : thread_process_io_helper(m, m->read[pfds[i].fd], POLLIN,
1675 : pfds[i].revents, i);
1676 : }
1677 944 : if (pfds[i].revents & POLLOUT)
1678 134 : thread_process_io_helper(m, m->write[pfds[i].fd],
1679 : POLLOUT, pfds[i].revents, i);
1680 :
1681 : /* if one of our file descriptors is garbage, remove the same
1682 : * from
1683 : * both pfds + update sizes and index */
1684 944 : if (pfds[i].revents & POLLNVAL) {
1685 48 : memmove(m->handler.pfds + i, m->handler.pfds + i + 1,
1686 48 : (m->handler.pfdcount - i - 1)
1687 : * sizeof(struct pollfd));
1688 48 : m->handler.pfdcount--;
1689 48 : m->handler.pfds[m->handler.pfdcount].fd = 0;
1690 48 : m->handler.pfds[m->handler.pfdcount].events = 0;
1691 :
1692 48 : memmove(pfds + i, pfds + i + 1,
1693 48 : (m->handler.copycount - i - 1)
1694 : * sizeof(struct pollfd));
1695 48 : m->handler.copycount--;
1696 48 : m->handler.copy[m->handler.copycount].fd = 0;
1697 48 : m->handler.copy[m->handler.copycount].events = 0;
1698 :
1699 48 : i--;
1700 : }
1701 : }
1702 938 : }
1703 :
1704 : /* Add all timers that have popped to the ready list. */
1705 20996 : static unsigned int thread_process_timers(struct thread_master *m,
1706 : struct timeval *timenow)
1707 : {
1708 20996 : struct timeval prev = *timenow;
1709 20996 : bool displayed = false;
1710 20996 : struct thread *thread;
1711 20996 : unsigned int ready = 0;
1712 :
1713 21055 : while ((thread = thread_timer_list_first(&m->timer))) {
1714 20232 : if (timercmp(timenow, &thread->u.sands, <))
1715 : break;
1716 59 : prev = thread->u.sands;
1717 59 : prev.tv_sec += 4;
1718 : /*
1719 : * If the timer would have popped 4 seconds in the
1720 : * past then we are in a situation where we are
1721 : * really getting behind on handling of events.
1722 : * Let's log it and do the right thing with it.
1723 : */
1724 59 : if (timercmp(timenow, &prev, >)) {
1725 0 : atomic_fetch_add_explicit(
1726 : &thread->hist->total_starv_warn, 1,
1727 : memory_order_seq_cst);
1728 0 : if (!displayed && !thread->ignore_timer_late) {
1729 0 : flog_warn(
1730 : EC_LIB_STARVE_THREAD,
1731 : "Thread Starvation: %pTHD was scheduled to pop greater than 4s ago",
1732 : thread);
1733 0 : displayed = true;
1734 : }
1735 : }
1736 :
1737 59 : thread_timer_list_pop(&m->timer);
1738 59 : thread->type = THREAD_READY;
1739 59 : thread_list_add_tail(&m->ready, thread);
1740 59 : ready++;
1741 : }
1742 :
1743 20996 : return ready;
1744 : }
1745 :
1746 : /* process a list en masse, e.g. for event thread lists */
1747 21008 : static unsigned int thread_process(struct thread_list_head *list)
1748 : {
1749 21008 : struct thread *thread;
1750 21008 : unsigned int ready = 0;
1751 :
1752 21225 : while ((thread = thread_list_pop(list))) {
1753 217 : thread->type = THREAD_READY;
1754 217 : thread_list_add_tail(&thread->master->ready, thread);
1755 217 : ready++;
1756 : }
1757 21007 : return ready;
1758 : }
1759 :
1760 :
1761 : /* Fetch next ready thread. */
1762 1180 : struct thread *thread_fetch(struct thread_master *m, struct thread *fetch)
1763 : {
1764 1180 : struct thread *thread = NULL;
1765 1180 : struct timeval now;
1766 1180 : struct timeval zerotime = {0, 0};
1767 1180 : struct timeval tv;
1768 1180 : struct timeval *tw = NULL;
1769 1180 : bool eintr_p = false;
1770 1180 : int num = 0;
1771 :
1772 22188 : do {
1773 : /* Handle signals if any */
1774 22188 : if (m->handle_signals)
1775 21333 : frr_sigevent_process();
1776 :
1777 22180 : pthread_mutex_lock(&m->mtx);
1778 :
1779 : /* Process any pending cancellation requests */
1780 22180 : do_thread_cancel(m);
1781 :
1782 : /*
1783 : * Attempt to flush ready queue before going into poll().
1784 : * This is performance-critical. Think twice before modifying.
1785 : */
1786 22179 : if ((thread = thread_list_pop(&m->ready))) {
1787 1172 : fetch = thread_run(m, thread, fetch);
1788 1171 : if (fetch->ref)
1789 1139 : *fetch->ref = NULL;
1790 1171 : pthread_mutex_unlock(&m->mtx);
1791 1172 : if (!m->ready_run_loop)
1792 1135 : GETRUSAGE(&m->last_getrusage);
1793 1172 : m->ready_run_loop = true;
1794 1172 : break;
1795 : }
1796 :
1797 21008 : m->ready_run_loop = false;
1798 : /* otherwise, tick through scheduling sequence */
1799 :
1800 : /*
1801 : * Post events to ready queue. This must come before the
1802 : * following block since events should occur immediately
1803 : */
1804 21008 : thread_process(&m->event);
1805 :
1806 : /*
1807 : * If there are no tasks on the ready queue, we will poll()
1808 : * until a timer expires or we receive I/O, whichever comes
1809 : * first. The strategy for doing this is:
1810 : *
1811 : * - If there are events pending, set the poll() timeout to zero
1812 : * - If there are no events pending, but there are timers
1813 : * pending, set the timeout to the smallest remaining time on
1814 : * any timer.
1815 : * - If there are neither timers nor events pending, but there
1816 : * are file descriptors pending, block indefinitely in poll()
1817 : * - If nothing is pending, it's time for the application to die
1818 : *
1819 : * In every case except the last, we need to hit poll() at least
1820 : * once per loop to avoid starvation by events
1821 : */
1822 21008 : if (!thread_list_count(&m->ready))
1823 20812 : tw = thread_timer_wait(&m->timer, &tv);
1824 :
1825 21007 : if (thread_list_count(&m->ready) ||
1826 20046 : (tw && !timercmp(tw, &zerotime, >)))
1827 : tw = &zerotime;
1828 :
1829 765 : if (!tw && m->handler.pfdcount == 0) { /* die */
1830 0 : pthread_mutex_unlock(&m->mtx);
1831 0 : fetch = NULL;
1832 0 : break;
1833 : }
1834 :
1835 : /*
1836 : * Copy pollfd array + # active pollfds in it. Not necessary to
1837 : * copy the array size as this is fixed.
1838 : */
1839 21007 : m->handler.copycount = m->handler.pfdcount;
1840 21007 : memcpy(m->handler.copy, m->handler.pfds,
1841 : m->handler.copycount * sizeof(struct pollfd));
1842 :
1843 21007 : pthread_mutex_unlock(&m->mtx);
1844 : {
1845 21008 : eintr_p = false;
1846 21008 : num = fd_poll(m, tw, &eintr_p);
1847 : }
1848 21008 : pthread_mutex_lock(&m->mtx);
1849 :
1850 : /* Handle any errors received in poll() */
1851 21008 : if (num < 0) {
1852 12 : if (eintr_p) {
1853 12 : pthread_mutex_unlock(&m->mtx);
1854 : /* loop around to signal handler */
1855 12 : continue;
1856 : }
1857 :
1858 : /* else die */
1859 0 : flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s",
1860 : safe_strerror(errno));
1861 0 : pthread_mutex_unlock(&m->mtx);
1862 0 : fetch = NULL;
1863 0 : break;
1864 : }
1865 :
1866 : /* Post timers to ready queue. */
1867 20996 : monotime(&now);
1868 20996 : thread_process_timers(m, &now);
1869 :
1870 : /* Post I/O to ready queue. */
1871 20996 : if (num > 0)
1872 938 : thread_process_io(m, num);
1873 :
1874 20996 : pthread_mutex_unlock(&m->mtx);
1875 :
1876 21008 : } while (!thread && m->spin);
1877 :
1878 1172 : return fetch;
1879 : }
1880 :
1881 1290 : static unsigned long timeval_elapsed(struct timeval a, struct timeval b)
1882 : {
1883 1290 : return (((a.tv_sec - b.tv_sec) * TIMER_SECOND_MICRO)
1884 1290 : + (a.tv_usec - b.tv_usec));
1885 : }
1886 :
1887 1290 : unsigned long thread_consumed_time(RUSAGE_T *now, RUSAGE_T *start,
1888 : unsigned long *cputime)
1889 : {
1890 : #ifdef HAVE_CLOCK_THREAD_CPUTIME_ID
1891 :
1892 : #ifdef __FreeBSD__
1893 : /*
1894 : * FreeBSD appears to have an issue when calling clock_gettime
1895 : * with CLOCK_THREAD_CPUTIME_ID really close to each other
1896 : * occassionally the now time will be before the start time.
1897 : * This is not good and FRR is ending up with CPU HOG's
1898 : * when the subtraction wraps to very large numbers
1899 : *
1900 : * What we are going to do here is cheat a little bit
1901 : * and notice that this is a problem and just correct
1902 : * it so that it is impossible to happen
1903 : */
1904 : if (start->cpu.tv_sec == now->cpu.tv_sec &&
1905 : start->cpu.tv_nsec > now->cpu.tv_nsec)
1906 : now->cpu.tv_nsec = start->cpu.tv_nsec + 1;
1907 : else if (start->cpu.tv_sec > now->cpu.tv_sec) {
1908 : now->cpu.tv_sec = start->cpu.tv_sec;
1909 : now->cpu.tv_nsec = start->cpu.tv_nsec + 1;
1910 : }
1911 : #endif
1912 1290 : *cputime = (now->cpu.tv_sec - start->cpu.tv_sec) * TIMER_SECOND_MICRO
1913 1290 : + (now->cpu.tv_nsec - start->cpu.tv_nsec) / 1000;
1914 : #else
1915 : /* This is 'user + sys' time. */
1916 : *cputime = timeval_elapsed(now->cpu.ru_utime, start->cpu.ru_utime)
1917 : + timeval_elapsed(now->cpu.ru_stime, start->cpu.ru_stime);
1918 : #endif
1919 1290 : return timeval_elapsed(now->real, start->real);
1920 : }
1921 :
1922 : /* We should aim to yield after yield milliseconds, which defaults
1923 : to THREAD_YIELD_TIME_SLOT .
1924 : Note: we are using real (wall clock) time for this calculation.
1925 : It could be argued that CPU time may make more sense in certain
1926 : contexts. The things to consider are whether the thread may have
1927 : blocked (in which case wall time increases, but CPU time does not),
1928 : or whether the system is heavily loaded with other processes competing
1929 : for CPU time. On balance, wall clock time seems to make sense.
1930 : Plus it has the added benefit that gettimeofday should be faster
1931 : than calling getrusage. */
1932 102 : int thread_should_yield(struct thread *thread)
1933 : {
1934 102 : int result;
1935 102 : frr_with_mutex (&thread->mtx) {
1936 102 : result = monotime_since(&thread->real, NULL)
1937 102 : > (int64_t)thread->yield;
1938 : }
1939 102 : return result;
1940 : }
1941 :
1942 0 : void thread_set_yield_time(struct thread *thread, unsigned long yield_time)
1943 : {
1944 0 : frr_with_mutex (&thread->mtx) {
1945 0 : thread->yield = yield_time;
1946 : }
1947 0 : }
1948 :
1949 2538 : void thread_getrusage(RUSAGE_T *r)
1950 : {
1951 2538 : monotime(&r->real);
1952 2538 : if (!cputime_enabled) {
1953 0 : memset(&r->cpu, 0, sizeof(r->cpu));
1954 0 : return;
1955 : }
1956 :
1957 : #ifdef HAVE_CLOCK_THREAD_CPUTIME_ID
1958 : /* not currently implemented in Linux's vDSO, but maybe at some point
1959 : * in the future?
1960 : */
1961 2538 : clock_gettime(CLOCK_THREAD_CPUTIME_ID, &r->cpu);
1962 : #else /* !HAVE_CLOCK_THREAD_CPUTIME_ID */
1963 : #if defined RUSAGE_THREAD
1964 : #define FRR_RUSAGE RUSAGE_THREAD
1965 : #else
1966 : #define FRR_RUSAGE RUSAGE_SELF
1967 : #endif
1968 : getrusage(FRR_RUSAGE, &(r->cpu));
1969 : #endif
1970 : }
1971 :
1972 : /*
1973 : * Call a thread.
1974 : *
1975 : * This function will atomically update the thread's usage history. At present
1976 : * this is the only spot where usage history is written. Nevertheless the code
1977 : * has been written such that the introduction of writers in the future should
1978 : * not need to update it provided the writers atomically perform only the
1979 : * operations done here, i.e. updating the total and maximum times. In
1980 : * particular, the maximum real and cpu times must be monotonically increasing
1981 : * or this code is not correct.
1982 : */
1983 1184 : void thread_call(struct thread *thread)
1984 : {
1985 1184 : RUSAGE_T before, after;
1986 :
1987 : /* if the thread being called is the CLI, it may change cputime_enabled
1988 : * ("service cputime-stats" command), which can result in nonsensical
1989 : * and very confusing warnings
1990 : */
1991 1184 : bool cputime_enabled_here = cputime_enabled;
1992 :
1993 1184 : if (thread->master->ready_run_loop)
1994 1181 : before = thread->master->last_getrusage;
1995 : else
1996 3 : GETRUSAGE(&before);
1997 :
1998 1184 : thread->real = before.real;
1999 :
2000 1184 : frrtrace(9, frr_libfrr, thread_call, thread->master,
2001 : thread->xref->funcname, thread->xref->xref.file,
2002 : thread->xref->xref.line, NULL, thread->u.fd,
2003 : thread->u.val, thread->arg, thread->u.sands.tv_sec);
2004 :
2005 1183 : pthread_setspecific(thread_current, thread);
2006 1183 : (*thread->func)(thread);
2007 1180 : pthread_setspecific(thread_current, NULL);
2008 :
2009 1180 : GETRUSAGE(&after);
2010 1180 : thread->master->last_getrusage = after;
2011 :
2012 1180 : unsigned long walltime, cputime;
2013 1180 : unsigned long exp;
2014 :
2015 1180 : walltime = thread_consumed_time(&after, &before, &cputime);
2016 :
2017 : /* update walltime */
2018 1180 : atomic_fetch_add_explicit(&thread->hist->real.total, walltime,
2019 : memory_order_seq_cst);
2020 1180 : exp = atomic_load_explicit(&thread->hist->real.max,
2021 : memory_order_seq_cst);
2022 1180 : while (exp < walltime
2023 1180 : && !atomic_compare_exchange_weak_explicit(
2024 : &thread->hist->real.max, &exp, walltime,
2025 : memory_order_seq_cst, memory_order_seq_cst))
2026 : ;
2027 :
2028 1180 : if (cputime_enabled_here && cputime_enabled) {
2029 : /* update cputime */
2030 1180 : atomic_fetch_add_explicit(&thread->hist->cpu.total, cputime,
2031 : memory_order_seq_cst);
2032 1180 : exp = atomic_load_explicit(&thread->hist->cpu.max,
2033 : memory_order_seq_cst);
2034 1180 : while (exp < cputime
2035 1180 : && !atomic_compare_exchange_weak_explicit(
2036 : &thread->hist->cpu.max, &exp, cputime,
2037 : memory_order_seq_cst, memory_order_seq_cst))
2038 : ;
2039 : }
2040 :
2041 1180 : atomic_fetch_add_explicit(&thread->hist->total_calls, 1,
2042 : memory_order_seq_cst);
2043 1180 : atomic_fetch_or_explicit(&thread->hist->types, 1 << thread->add_type,
2044 : memory_order_seq_cst);
2045 :
2046 1180 : if (cputime_enabled_here && cputime_enabled && cputime_threshold
2047 1180 : && cputime > cputime_threshold) {
2048 : /*
2049 : * We have a CPU Hog on our hands. The time FRR has spent
2050 : * doing actual work (not sleeping) is greater than 5 seconds.
2051 : * Whinge about it now, so we're aware this is yet another task
2052 : * to fix.
2053 : */
2054 0 : atomic_fetch_add_explicit(&thread->hist->total_cpu_warn,
2055 : 1, memory_order_seq_cst);
2056 0 : flog_warn(
2057 : EC_LIB_SLOW_THREAD_CPU,
2058 : "CPU HOG: task %s (%lx) ran for %lums (cpu time %lums)",
2059 : thread->xref->funcname, (unsigned long)thread->func,
2060 : walltime / 1000, cputime / 1000);
2061 :
2062 1180 : } else if (walltime_threshold && walltime > walltime_threshold) {
2063 : /*
2064 : * The runtime for a task is greater than 5 seconds, but the
2065 : * cpu time is under 5 seconds. Let's whine about this because
2066 : * this could imply some sort of scheduling issue.
2067 : */
2068 0 : atomic_fetch_add_explicit(&thread->hist->total_wall_warn,
2069 : 1, memory_order_seq_cst);
2070 0 : flog_warn(
2071 : EC_LIB_SLOW_THREAD_WALL,
2072 : "STARVATION: task %s (%lx) ran for %lums (cpu time %lums)",
2073 : thread->xref->funcname, (unsigned long)thread->func,
2074 : walltime / 1000, cputime / 1000);
2075 : }
2076 1180 : }
2077 :
2078 : /* Execute thread */
2079 12 : void _thread_execute(const struct xref_threadsched *xref,
2080 : struct thread_master *m, void (*func)(struct thread *),
2081 : void *arg, int val)
2082 : {
2083 12 : struct thread *thread;
2084 :
2085 : /* Get or allocate new thread to execute. */
2086 12 : frr_with_mutex (&m->mtx) {
2087 12 : thread = thread_get(m, THREAD_EVENT, func, arg, xref);
2088 :
2089 : /* Set its event value. */
2090 24 : frr_with_mutex (&thread->mtx) {
2091 12 : thread->add_type = THREAD_EXECUTE;
2092 12 : thread->u.val = val;
2093 12 : thread->ref = &thread;
2094 : }
2095 : }
2096 :
2097 : /* Execute thread doing all accounting. */
2098 12 : thread_call(thread);
2099 :
2100 : /* Give back or free thread. */
2101 12 : thread_add_unuse(m, thread);
2102 12 : }
2103 :
2104 : /* Debug signal mask - if 'sigs' is NULL, use current effective mask. */
2105 0 : void debug_signals(const sigset_t *sigs)
2106 : {
2107 0 : int i, found;
2108 0 : sigset_t tmpsigs;
2109 0 : char buf[300];
2110 :
2111 : /*
2112 : * We're only looking at the non-realtime signals here, so we need
2113 : * some limit value. Platform differences mean at some point we just
2114 : * need to pick a reasonable value.
2115 : */
2116 : #if defined SIGRTMIN
2117 : # define LAST_SIGNAL SIGRTMIN
2118 : #else
2119 : # define LAST_SIGNAL 32
2120 : #endif
2121 :
2122 :
2123 0 : if (sigs == NULL) {
2124 0 : sigemptyset(&tmpsigs);
2125 0 : pthread_sigmask(SIG_BLOCK, NULL, &tmpsigs);
2126 0 : sigs = &tmpsigs;
2127 : }
2128 :
2129 0 : found = 0;
2130 0 : buf[0] = '\0';
2131 :
2132 0 : for (i = 0; i < LAST_SIGNAL; i++) {
2133 0 : char tmp[20];
2134 :
2135 0 : if (sigismember(sigs, i) > 0) {
2136 0 : if (found > 0)
2137 0 : strlcat(buf, ",", sizeof(buf));
2138 0 : snprintf(tmp, sizeof(tmp), "%d", i);
2139 0 : strlcat(buf, tmp, sizeof(buf));
2140 0 : found++;
2141 : }
2142 : }
2143 :
2144 0 : if (found == 0)
2145 0 : snprintf(buf, sizeof(buf), "<none>");
2146 :
2147 0 : zlog_debug("%s: %s", __func__, buf);
2148 0 : }
2149 :
2150 0 : static ssize_t printfrr_thread_dbg(struct fbuf *buf, struct printfrr_eargs *ea,
2151 : const struct thread *thread)
2152 : {
2153 0 : static const char * const types[] = {
2154 : [THREAD_READ] = "read",
2155 : [THREAD_WRITE] = "write",
2156 : [THREAD_TIMER] = "timer",
2157 : [THREAD_EVENT] = "event",
2158 : [THREAD_READY] = "ready",
2159 : [THREAD_UNUSED] = "unused",
2160 : [THREAD_EXECUTE] = "exec",
2161 : };
2162 0 : ssize_t rv = 0;
2163 0 : char info[16] = "";
2164 :
2165 0 : if (!thread)
2166 0 : return bputs(buf, "{(thread *)NULL}");
2167 :
2168 0 : rv += bprintfrr(buf, "{(thread *)%p arg=%p", thread, thread->arg);
2169 :
2170 0 : if (thread->type < array_size(types) && types[thread->type])
2171 0 : rv += bprintfrr(buf, " %-6s", types[thread->type]);
2172 : else
2173 0 : rv += bprintfrr(buf, " INVALID(%u)", thread->type);
2174 :
2175 0 : switch (thread->type) {
2176 0 : case THREAD_READ:
2177 : case THREAD_WRITE:
2178 0 : snprintfrr(info, sizeof(info), "fd=%d", thread->u.fd);
2179 0 : break;
2180 :
2181 0 : case THREAD_TIMER:
2182 0 : snprintfrr(info, sizeof(info), "r=%pTVMud", &thread->u.sands);
2183 0 : break;
2184 : }
2185 :
2186 0 : rv += bprintfrr(buf, " %-12s %s() %s from %s:%d}", info,
2187 0 : thread->xref->funcname, thread->xref->dest,
2188 0 : thread->xref->xref.file, thread->xref->xref.line);
2189 0 : return rv;
2190 : }
2191 :
2192 12 : printfrr_ext_autoreg_p("TH", printfrr_thread);
2193 0 : static ssize_t printfrr_thread(struct fbuf *buf, struct printfrr_eargs *ea,
2194 : const void *ptr)
2195 : {
2196 0 : const struct thread *thread = ptr;
2197 0 : struct timespec remain = {};
2198 :
2199 0 : if (ea->fmt[0] == 'D') {
2200 0 : ea->fmt++;
2201 0 : return printfrr_thread_dbg(buf, ea, thread);
2202 : }
2203 :
2204 0 : if (!thread) {
2205 : /* need to jump over time formatting flag characters in the
2206 : * input format string, i.e. adjust ea->fmt!
2207 : */
2208 0 : printfrr_time(buf, ea, &remain,
2209 : TIMEFMT_TIMER_DEADLINE | TIMEFMT_SKIP);
2210 0 : return bputch(buf, '-');
2211 : }
2212 :
2213 0 : TIMEVAL_TO_TIMESPEC(&thread->u.sands, &remain);
2214 0 : return printfrr_time(buf, ea, &remain, TIMEFMT_TIMER_DEADLINE);
2215 : }
|