Line data Source code
1 : // SPDX-License-Identifier: ISC
2 : /*
3 : * Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc.
4 : */
5 :
6 : /* implementation notes: this is an epoch-based RCU implementation. rcu_seq
7 : * (global variable) counts the current epoch. Threads hold a specific epoch
8 : * in rcu_read_lock(). This is the oldest epoch a thread might be accessing
9 : * data from.
10 : *
11 : * The rcu_seq global is only pushed forward on rcu_read_lock() and
12 : * rcu_read_unlock() calls. This makes things a tad more efficient since
13 : * those are the only places it matters:
14 : * - on rcu_read_lock, we don't want to hold an old epoch pointlessly
15 : * - on rcu_read_unlock, we want to make sure we're not stuck on an old epoch
16 : * when heading into a long idle period where no thread holds RCU
17 : *
18 : * rcu_thread structures themselves are RCU-free'd.
19 : *
20 : * rcu_head structures are the most iffy; normally for an ATOMLIST we would
21 : * need to make sure we use rcu_free or pthread_rwlock to deallocate old items
22 : * to prevent ABA or use-after-free problems. However, our ATOMLIST code
23 : * guarantees that if the list remains non-empty in all cases, we only need
24 : * the "last" pointer to do an "add_tail()", i.e. we can't run into ABA/UAF
25 : * issues - but we do need to keep at least 1 item on the list.
26 : *
27 : * (Search the atomlist code for all uses of "last")
28 : */
29 :
30 : #ifdef HAVE_CONFIG_H
31 : #include "config.h"
32 : #endif
33 :
34 : #include <pthread.h>
35 : #ifdef HAVE_PTHREAD_NP_H
36 : #include <pthread_np.h>
37 : #endif
38 : #include <string.h>
39 : #include <unistd.h>
40 : #include <signal.h>
41 :
42 : #include "frrcu.h"
43 : #include "seqlock.h"
44 : #include "atomlist.h"
45 :
46 12 : DEFINE_MTYPE_STATIC(LIB, RCU_THREAD, "RCU thread");
47 12 : DEFINE_MTYPE_STATIC(LIB, RCU_NEXT, "RCU sequence barrier");
48 :
49 222 : DECLARE_ATOMLIST(rcu_heads, struct rcu_head, head);
50 :
51 : PREDECL_ATOMLIST(rcu_threads);
52 : struct rcu_thread {
53 : struct rcu_threads_item head;
54 :
55 : struct rcu_head rcu_head;
56 :
57 : struct seqlock rcu;
58 :
59 : /* only accessed by thread itself, not atomic */
60 : unsigned depth;
61 : };
62 119 : DECLARE_ATOMLIST(rcu_threads, struct rcu_thread, head);
63 :
64 : static const struct rcu_action rcua_next = { .type = RCUA_NEXT };
65 : static const struct rcu_action rcua_end = { .type = RCUA_END };
66 : static const struct rcu_action rcua_close = { .type = RCUA_CLOSE };
67 :
68 : struct rcu_next {
69 : struct rcu_head head_free;
70 : struct rcu_head head_next;
71 : };
72 :
73 : #define rcu_free_internal(mtype, ptr, field) \
74 : do { \
75 : typeof(ptr) _ptr = (ptr); \
76 : struct rcu_head *_rcu_head = &_ptr->field; \
77 : static const struct rcu_action _rcu_action = { \
78 : .type = RCUA_FREE, \
79 : .u.free = { \
80 : .mt = mtype, \
81 : .offset = offsetof(typeof(*_ptr), field), \
82 : }, \
83 : }; \
84 : _rcu_head->action = &_rcu_action; \
85 : rcu_heads_add_tail(&rcu_heads, _rcu_head); \
86 : } while (0)
87 :
88 : /* primary global RCU position */
89 : static struct seqlock rcu_seq;
90 : /* this is set to rcu_seq whenever something is added on the RCU queue.
91 : * rcu_read_lock() and rcu_read_unlock() will then bump rcu_seq up one step.
92 : */
93 : static _Atomic seqlock_val_t rcu_dirty;
94 :
95 : static struct rcu_threads_head rcu_threads;
96 : static struct rcu_heads_head rcu_heads;
97 :
98 : /* main thread & RCU sweeper have pre-setup rcu_thread structures. The
99 : * reasons are different:
100 : *
101 : * - rcu_thread_main is there because the main thread isn't started like
102 : * other threads, it's implicitly created when the program is started. So
103 : * rcu_thread_main matches up implicitly.
104 : *
105 : * - rcu_thread_rcu isn't actually put on the rcu_threads list (makes no
106 : * sense really), it only exists so we can call RCU-using functions from
107 : * the RCU thread without special handling in rcu_read_lock/unlock.
108 : */
109 : static struct rcu_thread rcu_thread_main;
110 : static struct rcu_thread rcu_thread_rcu;
111 :
112 : static pthread_t rcu_pthread;
113 : static pthread_key_t rcu_thread_key;
114 : static bool rcu_active;
115 :
116 : static void rcu_start(void);
117 : static void rcu_bump(void);
118 :
119 : /*
120 : * preinitialization for main thread
121 : */
122 : static void rcu_thread_end(void *rcu_thread);
123 :
124 : static void rcu_preinit(void) __attribute__((constructor));
125 4 : static void rcu_preinit(void)
126 : {
127 4 : struct rcu_thread *rt;
128 :
129 4 : rt = &rcu_thread_main;
130 4 : rt->depth = 1;
131 4 : seqlock_init(&rt->rcu);
132 4 : seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
133 :
134 4 : pthread_key_create(&rcu_thread_key, rcu_thread_end);
135 4 : pthread_setspecific(rcu_thread_key, rt);
136 :
137 4 : rcu_threads_add_tail(&rcu_threads, rt);
138 :
139 : /* RCU sweeper's rcu_thread is a dummy, NOT added to rcu_threads */
140 4 : rt = &rcu_thread_rcu;
141 4 : rt->depth = 1;
142 :
143 4 : seqlock_init(&rcu_seq);
144 4 : seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
145 4 : }
146 :
147 22354 : static struct rcu_thread *rcu_self(void)
148 : {
149 22354 : return (struct rcu_thread *)pthread_getspecific(rcu_thread_key);
150 : }
151 :
152 : /*
153 : * thread management (for the non-main thread)
154 : */
155 14 : struct rcu_thread *rcu_thread_prepare(void)
156 : {
157 14 : struct rcu_thread *rt, *cur;
158 :
159 14 : rcu_assert_read_locked();
160 :
161 14 : if (!rcu_active)
162 4 : rcu_start();
163 :
164 14 : cur = rcu_self();
165 14 : assert(cur->depth);
166 :
167 : /* new thread always starts with rcu_read_lock held at depth 1, and
168 : * holding the same epoch as the parent (this makes it possible to
169 : * use RCU for things passed into the thread through its arg)
170 : */
171 14 : rt = XCALLOC(MTYPE_RCU_THREAD, sizeof(*rt));
172 14 : rt->depth = 1;
173 :
174 14 : seqlock_init(&rt->rcu);
175 14 : seqlock_acquire(&rt->rcu, &cur->rcu);
176 :
177 14 : rcu_threads_add_tail(&rcu_threads, rt);
178 :
179 14 : return rt;
180 : }
181 :
182 14 : void rcu_thread_start(struct rcu_thread *rt)
183 : {
184 14 : pthread_setspecific(rcu_thread_key, rt);
185 14 : }
186 :
187 18 : void rcu_thread_unprepare(struct rcu_thread *rt)
188 : {
189 18 : if (rt == &rcu_thread_rcu)
190 : return;
191 :
192 14 : rt->depth = 1;
193 14 : seqlock_acquire(&rt->rcu, &rcu_seq);
194 :
195 14 : rcu_bump();
196 14 : if (rt != &rcu_thread_main)
197 : /* this free() happens after seqlock_release() below */
198 14 : rcu_free_internal(MTYPE_RCU_THREAD, rt, rcu_head);
199 :
200 14 : rcu_threads_del(&rcu_threads, rt);
201 14 : seqlock_release(&rt->rcu);
202 : }
203 :
204 18 : static void rcu_thread_end(void *rtvoid)
205 : {
206 18 : struct rcu_thread *rt = rtvoid;
207 18 : rcu_thread_unprepare(rt);
208 18 : }
209 :
210 : /*
211 : * main RCU control aspects
212 : */
213 :
214 18 : static void rcu_bump(void)
215 : {
216 18 : struct rcu_next *rn;
217 :
218 18 : rn = XMALLOC(MTYPE_RCU_NEXT, sizeof(*rn));
219 :
220 : /* note: each RCUA_NEXT item corresponds to exactly one seqno bump.
221 : * This means we don't need to communicate which seqno is which
222 : * RCUA_NEXT, since we really don't care.
223 : */
224 :
225 : /*
226 : * Important race condition: while rcu_heads_add_tail is executing,
227 : * there is an intermediate point where the rcu_heads "last" pointer
228 : * already points to rn->head_next, but rn->head_next isn't added to
229 : * the list yet. That means any other "add_tail" calls append to this
230 : * item, which isn't fully on the list yet. Freeze this thread at
231 : * that point and look at another thread doing a rcu_bump. It adds
232 : * these two items and then does a seqlock_bump. But the rcu_heads
233 : * list is still "interrupted" and there's no RCUA_NEXT on the list
234 : * yet (from either the frozen thread or the second thread). So
235 : * rcu_main() might actually hit the end of the list at the
236 : * "interrupt".
237 : *
238 : * This situation is prevented by requiring that rcu_read_lock is held
239 : * for any calls to rcu_bump, since if we're holding the current RCU
240 : * epoch, that means rcu_main can't be chewing on rcu_heads and hit
241 : * that interruption point. Only by the time the thread has continued
242 : * to rcu_read_unlock() - and therefore completed the add_tail - the
243 : * RCU sweeper gobbles up the epoch and can be sure to find at least
244 : * the RCUA_NEXT and RCUA_FREE items on rcu_heads.
245 : */
246 18 : rn->head_next.action = &rcua_next;
247 18 : rcu_heads_add_tail(&rcu_heads, &rn->head_next);
248 :
249 : /* free rn that we allocated above.
250 : *
251 : * This is INTENTIONALLY not built into the RCUA_NEXT action. This
252 : * ensures that after the action above is popped off the queue, there
253 : * is still at least 1 item on the RCU queue. This means we never
254 : * delete the last item, which is extremely important since it keeps
255 : * the atomlist ->last pointer alive and well.
256 : *
257 : * If we were to "run dry" on the RCU queue, add_tail may run into the
258 : * "last item is being deleted - start over" case, and then we may end
259 : * up accessing old RCU queue items that are already free'd.
260 : */
261 18 : rcu_free_internal(MTYPE_RCU_NEXT, rn, head_free);
262 :
263 : /* Only allow the RCU sweeper to run after these 2 items are queued.
264 : *
265 : * If another thread enqueues some RCU action in the intermediate
266 : * window here, nothing bad happens - the queued action is associated
267 : * with a larger seq# than strictly necessary. Thus, it might get
268 : * executed a bit later, but that's not a problem.
269 : *
270 : * If another thread acquires the read lock in this window, it holds
271 : * the previous epoch, but its RCU queue actions will be in the next
272 : * epoch. This isn't a problem either, just a tad inefficient.
273 : */
274 18 : seqlock_bump(&rcu_seq);
275 18 : }
276 :
277 13718 : static void rcu_bump_maybe(void)
278 : {
279 13718 : seqlock_val_t dirty;
280 :
281 13718 : dirty = atomic_load_explicit(&rcu_dirty, memory_order_relaxed);
282 : /* no problem if we race here and multiple threads bump rcu_seq;
283 : * bumping too much causes no issues while not bumping enough will
284 : * result in delayed cleanup
285 : */
286 13718 : if (dirty == seqlock_cur(&rcu_seq))
287 4 : rcu_bump();
288 13718 : }
289 :
290 7719 : void rcu_read_lock(void)
291 : {
292 7719 : struct rcu_thread *rt = rcu_self();
293 :
294 7718 : assert(rt);
295 7718 : if (rt->depth++ > 0)
296 : return;
297 :
298 6858 : seqlock_acquire(&rt->rcu, &rcu_seq);
299 : /* need to hold RCU for bump ... */
300 6858 : rcu_bump_maybe();
301 : /* ... but no point in holding the old epoch if we just bumped */
302 6858 : seqlock_acquire(&rt->rcu, &rcu_seq);
303 : }
304 :
305 7721 : void rcu_read_unlock(void)
306 : {
307 7721 : struct rcu_thread *rt = rcu_self();
308 :
309 7721 : assert(rt && rt->depth);
310 7721 : if (--rt->depth > 0)
311 : return;
312 6860 : rcu_bump_maybe();
313 6860 : seqlock_release(&rt->rcu);
314 : }
315 :
316 38 : void rcu_assert_read_locked(void)
317 : {
318 38 : struct rcu_thread *rt = rcu_self();
319 38 : assert(rt && rt->depth && seqlock_held(&rt->rcu));
320 38 : }
321 :
322 6858 : void rcu_assert_read_unlocked(void)
323 : {
324 6858 : struct rcu_thread *rt = rcu_self();
325 6858 : assert(rt && !rt->depth && !seqlock_held(&rt->rcu));
326 6858 : }
327 :
328 : /*
329 : * RCU resource-release thread
330 : */
331 :
332 : static void *rcu_main(void *arg);
333 :
334 4 : static void rcu_start(void)
335 : {
336 : /* ensure we never handle signals on the RCU thread by blocking
337 : * everything here (new thread inherits signal mask)
338 : */
339 4 : sigset_t oldsigs, blocksigs;
340 :
341 4 : sigfillset(&blocksigs);
342 4 : pthread_sigmask(SIG_BLOCK, &blocksigs, &oldsigs);
343 :
344 4 : rcu_active = true;
345 :
346 4 : assert(!pthread_create(&rcu_pthread, NULL, rcu_main, NULL));
347 :
348 4 : pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
349 :
350 : #ifdef HAVE_PTHREAD_SETNAME_NP
351 : # ifdef GNU_LINUX
352 4 : pthread_setname_np(rcu_pthread, "RCU sweeper");
353 : # elif defined(__NetBSD__)
354 : pthread_setname_np(rcu_pthread, "RCU sweeper", NULL);
355 : # endif
356 : #elif defined(HAVE_PTHREAD_SET_NAME_NP)
357 : pthread_set_name_np(rcu_pthread, "RCU sweeper");
358 : #endif
359 4 : }
360 :
361 48 : static void rcu_do(struct rcu_head *rh)
362 : {
363 48 : struct rcu_head_close *rhc;
364 48 : void *p;
365 :
366 48 : switch (rh->action->type) {
367 40 : case RCUA_FREE:
368 40 : p = (char *)rh - rh->action->u.free.offset;
369 40 : if (rh->action->u.free.mt)
370 40 : qfree(rh->action->u.free.mt, p);
371 : else
372 0 : free(p);
373 : break;
374 8 : case RCUA_CLOSE:
375 8 : rhc = container_of(rh, struct rcu_head_close,
376 : rcu_head);
377 8 : close(rhc->fd);
378 8 : break;
379 0 : case RCUA_CALL:
380 0 : p = (char *)rh - rh->action->u.call.offset;
381 0 : rh->action->u.call.fptr(p);
382 0 : break;
383 :
384 : case RCUA_INVALID:
385 : case RCUA_NEXT:
386 : case RCUA_END:
387 : default:
388 0 : assert(0);
389 : }
390 48 : }
391 :
392 : static void rcu_watchdog(struct rcu_thread *rt)
393 : {
394 : #if 0
395 : /* future work: print a backtrace for the thread that's holding up
396 : * RCU. The only (good) way of doing that is to send a signal to the
397 : * other thread, save away the backtrace in the signal handler, and
398 : * block here until the signal is done processing.
399 : *
400 : * Just haven't implemented that yet.
401 : */
402 : fprintf(stderr, "RCU watchdog %p\n", rt);
403 : #endif
404 : }
405 :
406 4 : static void *rcu_main(void *arg)
407 : {
408 4 : struct rcu_thread *rt;
409 4 : struct rcu_head *rh = NULL;
410 4 : bool end = false;
411 4 : struct timespec maxwait;
412 :
413 4 : seqlock_val_t rcuval = SEQLOCK_STARTVAL;
414 :
415 4 : pthread_setspecific(rcu_thread_key, &rcu_thread_rcu);
416 :
417 26 : while (!end) {
418 22 : seqlock_wait(&rcu_seq, rcuval);
419 :
420 : /* RCU watchdog timeout, TODO: configurable value */
421 22 : clock_gettime(CLOCK_MONOTONIC, &maxwait);
422 22 : maxwait.tv_nsec += 100 * 1000 * 1000;
423 22 : if (maxwait.tv_nsec >= 1000000000) {
424 2 : maxwait.tv_sec++;
425 2 : maxwait.tv_nsec -= 1000000000;
426 : }
427 :
428 69 : frr_each (rcu_threads, &rcu_threads, rt)
429 47 : if (!seqlock_timedwait(&rt->rcu, rcuval, &maxwait)) {
430 1 : rcu_watchdog(rt);
431 1 : seqlock_wait(&rt->rcu, rcuval);
432 : }
433 :
434 74 : while ((rh = rcu_heads_pop(&rcu_heads))) {
435 70 : if (rh->action->type == RCUA_NEXT)
436 : break;
437 52 : else if (rh->action->type == RCUA_END)
438 : end = true;
439 : else
440 48 : rcu_do(rh);
441 : }
442 :
443 22 : rcuval += SEQLOCK_INCR;
444 : }
445 :
446 : /* rcu_shutdown can only be called singlethreaded, and it does a
447 : * pthread_join, so it should be impossible that anything ended up
448 : * on the queue after RCUA_END
449 : */
450 : #if 1
451 8 : assert(!rcu_heads_first(&rcu_heads));
452 : #else
453 : while ((rh = rcu_heads_pop(&rcu_heads)))
454 : if (rh->action->type >= RCUA_FREE)
455 : rcu_do(rh);
456 : #endif
457 4 : return NULL;
458 : }
459 :
460 4 : void rcu_shutdown(void)
461 : {
462 4 : static struct rcu_head rcu_head_end;
463 4 : struct rcu_thread *rt = rcu_self();
464 4 : void *retval;
465 :
466 4 : if (!rcu_active)
467 0 : return;
468 :
469 4 : rcu_assert_read_locked();
470 4 : assert(rcu_threads_count(&rcu_threads) == 1);
471 :
472 4 : rcu_enqueue(&rcu_head_end, &rcua_end);
473 :
474 4 : rt->depth = 0;
475 4 : seqlock_release(&rt->rcu);
476 4 : seqlock_release(&rcu_seq);
477 4 : rcu_active = false;
478 :
479 : /* clearing rcu_active is before pthread_join in case we hang in
480 : * pthread_join & get a SIGTERM or something - in that case, just
481 : * ignore the maybe-still-running RCU thread
482 : */
483 4 : if (pthread_join(rcu_pthread, &retval) == 0) {
484 4 : seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
485 4 : seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
486 4 : rt->depth = 1;
487 : }
488 : }
489 :
490 : /*
491 : * RCU'd free functions
492 : */
493 :
494 20 : void rcu_enqueue(struct rcu_head *rh, const struct rcu_action *action)
495 : {
496 : /* refer to rcu_bump() for why we need to hold RCU when adding items
497 : * to rcu_heads
498 : */
499 20 : rcu_assert_read_locked();
500 :
501 20 : rh->action = action;
502 :
503 20 : if (!rcu_active) {
504 0 : rcu_do(rh);
505 0 : return;
506 : }
507 20 : rcu_heads_add_tail(&rcu_heads, rh);
508 20 : atomic_store_explicit(&rcu_dirty, seqlock_cur(&rcu_seq),
509 : memory_order_relaxed);
510 : }
511 :
512 8 : void rcu_close(struct rcu_head_close *rhc, int fd)
513 : {
514 8 : rhc->fd = fd;
515 8 : rcu_enqueue(&rhc->rcu_head, &rcua_close);
516 8 : }
|