Line data Source code
1 : /*
2 : * Copyright (c) 2017-19 David Lamparter, for NetDEF, Inc.
3 : *
4 : * Permission to use, copy, modify, and distribute this software for any
5 : * purpose with or without fee is hereby granted, provided that the above
6 : * copyright notice and this permission notice appear in all copies.
7 : *
8 : * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 : * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 : * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 : * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 : * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 : * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 : * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 : */
16 :
17 : /* implementation notes: this is an epoch-based RCU implementation. rcu_seq
18 : * (global variable) counts the current epoch. Threads hold a specific epoch
19 : * in rcu_read_lock(). This is the oldest epoch a thread might be accessing
20 : * data from.
21 : *
22 : * The rcu_seq global is only pushed forward on rcu_read_lock() and
23 : * rcu_read_unlock() calls. This makes things a tad more efficient since
24 : * those are the only places it matters:
25 : * - on rcu_read_lock, we don't want to hold an old epoch pointlessly
26 : * - on rcu_read_unlock, we want to make sure we're not stuck on an old epoch
27 : * when heading into a long idle period where no thread holds RCU
28 : *
29 : * rcu_thread structures themselves are RCU-free'd.
30 : *
31 : * rcu_head structures are the most iffy; normally for an ATOMLIST we would
32 : * need to make sure we use rcu_free or pthread_rwlock to deallocate old items
33 : * to prevent ABA or use-after-free problems. However, our ATOMLIST code
34 : * guarantees that if the list remains non-empty in all cases, we only need
35 : * the "last" pointer to do an "add_tail()", i.e. we can't run into ABA/UAF
36 : * issues - but we do need to keep at least 1 item on the list.
37 : *
38 : * (Search the atomlist code for all uses of "last")
39 : */
40 :
41 : #ifdef HAVE_CONFIG_H
42 : #include "config.h"
43 : #endif
44 :
45 : #include <pthread.h>
46 : #ifdef HAVE_PTHREAD_NP_H
47 : #include <pthread_np.h>
48 : #endif
49 : #include <string.h>
50 : #include <unistd.h>
51 : #include <signal.h>
52 :
53 : #include "frrcu.h"
54 : #include "seqlock.h"
55 : #include "atomlist.h"
56 :
57 24 : DEFINE_MTYPE_STATIC(LIB, RCU_THREAD, "RCU thread");
58 24 : DEFINE_MTYPE_STATIC(LIB, RCU_NEXT, "RCU sequence barrier");
59 :
60 360 : DECLARE_ATOMLIST(rcu_heads, struct rcu_head, head);
61 :
62 : PREDECL_ATOMLIST(rcu_threads);
63 : struct rcu_thread {
64 : struct rcu_threads_item head;
65 :
66 : struct rcu_head rcu_head;
67 :
68 : struct seqlock rcu;
69 :
70 : /* only accessed by thread itself, not atomic */
71 : unsigned depth;
72 : };
73 204 : DECLARE_ATOMLIST(rcu_threads, struct rcu_thread, head);
74 :
75 : static const struct rcu_action rcua_next = { .type = RCUA_NEXT };
76 : static const struct rcu_action rcua_end = { .type = RCUA_END };
77 : static const struct rcu_action rcua_close = { .type = RCUA_CLOSE };
78 :
79 : struct rcu_next {
80 : struct rcu_head head_free;
81 : struct rcu_head head_next;
82 : };
83 :
84 : #define rcu_free_internal(mtype, ptr, field) \
85 : do { \
86 : typeof(ptr) _ptr = (ptr); \
87 : struct rcu_head *_rcu_head = &_ptr->field; \
88 : static const struct rcu_action _rcu_action = { \
89 : .type = RCUA_FREE, \
90 : .u.free = { \
91 : .mt = mtype, \
92 : .offset = offsetof(typeof(*_ptr), field), \
93 : }, \
94 : }; \
95 : _rcu_head->action = &_rcu_action; \
96 : rcu_heads_add_tail(&rcu_heads, _rcu_head); \
97 : } while (0)
98 :
99 : /* primary global RCU position */
100 : static struct seqlock rcu_seq;
101 : /* this is set to rcu_seq whenever something is added on the RCU queue.
102 : * rcu_read_lock() and rcu_read_unlock() will then bump rcu_seq up one step.
103 : */
104 : static _Atomic seqlock_val_t rcu_dirty;
105 :
106 : static struct rcu_threads_head rcu_threads;
107 : static struct rcu_heads_head rcu_heads;
108 :
109 : /* main thread & RCU sweeper have pre-setup rcu_thread structures. The
110 : * reasons are different:
111 : *
112 : * - rcu_thread_main is there because the main thread isn't started like
113 : * other threads, it's implicitly created when the program is started. So
114 : * rcu_thread_main matches up implicitly.
115 : *
116 : * - rcu_thread_rcu isn't actually put on the rcu_threads list (makes no
117 : * sense really), it only exists so we can call RCU-using functions from
118 : * the RCU thread without special handling in rcu_read_lock/unlock.
119 : */
120 : static struct rcu_thread rcu_thread_main;
121 : static struct rcu_thread rcu_thread_rcu;
122 :
123 : static pthread_t rcu_pthread;
124 : static pthread_key_t rcu_thread_key;
125 : static bool rcu_active;
126 :
127 : static void rcu_start(void);
128 : static void rcu_bump(void);
129 :
130 : /*
131 : * preinitialization for main thread
132 : */
133 : static void rcu_thread_end(void *rcu_thread);
134 :
135 : static void rcu_preinit(void) __attribute__((constructor));
136 8 : static void rcu_preinit(void)
137 : {
138 8 : struct rcu_thread *rt;
139 :
140 8 : rt = &rcu_thread_main;
141 8 : rt->depth = 1;
142 8 : seqlock_init(&rt->rcu);
143 8 : seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
144 :
145 8 : pthread_key_create(&rcu_thread_key, rcu_thread_end);
146 8 : pthread_setspecific(rcu_thread_key, rt);
147 :
148 8 : rcu_threads_add_tail(&rcu_threads, rt);
149 :
150 : /* RCU sweeper's rcu_thread is a dummy, NOT added to rcu_threads */
151 8 : rt = &rcu_thread_rcu;
152 8 : rt->depth = 1;
153 :
154 8 : seqlock_init(&rcu_seq);
155 8 : seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
156 8 : }
157 :
158 54107 : static struct rcu_thread *rcu_self(void)
159 : {
160 54107 : return (struct rcu_thread *)pthread_getspecific(rcu_thread_key);
161 : }
162 :
163 : /*
164 : * thread management (for the non-main thread)
165 : */
166 24 : struct rcu_thread *rcu_thread_prepare(void)
167 : {
168 24 : struct rcu_thread *rt, *cur;
169 :
170 24 : rcu_assert_read_locked();
171 :
172 24 : if (!rcu_active)
173 8 : rcu_start();
174 :
175 24 : cur = rcu_self();
176 24 : assert(cur->depth);
177 :
178 : /* new thread always starts with rcu_read_lock held at depth 1, and
179 : * holding the same epoch as the parent (this makes it possible to
180 : * use RCU for things passed into the thread through its arg)
181 : */
182 24 : rt = XCALLOC(MTYPE_RCU_THREAD, sizeof(*rt));
183 24 : rt->depth = 1;
184 :
185 24 : seqlock_init(&rt->rcu);
186 24 : seqlock_acquire(&rt->rcu, &cur->rcu);
187 :
188 24 : rcu_threads_add_tail(&rcu_threads, rt);
189 :
190 24 : return rt;
191 : }
192 :
193 24 : void rcu_thread_start(struct rcu_thread *rt)
194 : {
195 24 : pthread_setspecific(rcu_thread_key, rt);
196 24 : }
197 :
198 32 : void rcu_thread_unprepare(struct rcu_thread *rt)
199 : {
200 32 : if (rt == &rcu_thread_rcu)
201 : return;
202 :
203 24 : rt->depth = 1;
204 24 : seqlock_acquire(&rt->rcu, &rcu_seq);
205 :
206 24 : rcu_bump();
207 24 : if (rt != &rcu_thread_main)
208 : /* this free() happens after seqlock_release() below */
209 24 : rcu_free_internal(MTYPE_RCU_THREAD, rt, rcu_head);
210 :
211 24 : rcu_threads_del(&rcu_threads, rt);
212 24 : seqlock_release(&rt->rcu);
213 : }
214 :
215 32 : static void rcu_thread_end(void *rtvoid)
216 : {
217 32 : struct rcu_thread *rt = rtvoid;
218 32 : rcu_thread_unprepare(rt);
219 32 : }
220 :
221 : /*
222 : * main RCU control aspects
223 : */
224 :
225 32 : static void rcu_bump(void)
226 : {
227 32 : struct rcu_next *rn;
228 :
229 32 : rn = XMALLOC(MTYPE_RCU_NEXT, sizeof(*rn));
230 :
231 : /* note: each RCUA_NEXT item corresponds to exactly one seqno bump.
232 : * This means we don't need to communicate which seqno is which
233 : * RCUA_NEXT, since we really don't care.
234 : */
235 :
236 : /*
237 : * Important race condition: while rcu_heads_add_tail is executing,
238 : * there is an intermediate point where the rcu_heads "last" pointer
239 : * already points to rn->head_next, but rn->head_next isn't added to
240 : * the list yet. That means any other "add_tail" calls append to this
241 : * item, which isn't fully on the list yet. Freeze this thread at
242 : * that point and look at another thread doing a rcu_bump. It adds
243 : * these two items and then does a seqlock_bump. But the rcu_heads
244 : * list is still "interrupted" and there's no RCUA_NEXT on the list
245 : * yet (from either the frozen thread or the second thread). So
246 : * rcu_main() might actually hit the end of the list at the
247 : * "interrupt".
248 : *
249 : * This situation is prevented by requiring that rcu_read_lock is held
250 : * for any calls to rcu_bump, since if we're holding the current RCU
251 : * epoch, that means rcu_main can't be chewing on rcu_heads and hit
252 : * that interruption point. Only by the time the thread has continued
253 : * to rcu_read_unlock() - and therefore completed the add_tail - the
254 : * RCU sweeper gobbles up the epoch and can be sure to find at least
255 : * the RCUA_NEXT and RCUA_FREE items on rcu_heads.
256 : */
257 32 : rn->head_next.action = &rcua_next;
258 32 : rcu_heads_add_tail(&rcu_heads, &rn->head_next);
259 :
260 : /* free rn that we allocated above.
261 : *
262 : * This is INTENTIONALLY not built into the RCUA_NEXT action. This
263 : * ensures that after the action above is popped off the queue, there
264 : * is still at least 1 item on the RCU queue. This means we never
265 : * delete the last item, which is extremely important since it keeps
266 : * the atomlist ->last pointer alive and well.
267 : *
268 : * If we were to "run dry" on the RCU queue, add_tail may run into the
269 : * "last item is being deleted - start over" case, and then we may end
270 : * up accessing old RCU queue items that are already free'd.
271 : */
272 32 : rcu_free_internal(MTYPE_RCU_NEXT, rn, head_free);
273 :
274 : /* Only allow the RCU sweeper to run after these 2 items are queued.
275 : *
276 : * If another thread enqueues some RCU action in the intermediate
277 : * window here, nothing bad happens - the queued action is associated
278 : * with a larger seq# than strictly necessary. Thus, it might get
279 : * executed a bit later, but that's not a problem.
280 : *
281 : * If another thread acquires the read lock in this window, it holds
282 : * the previous epoch, but its RCU queue actions will be in the next
283 : * epoch. This isn't a problem either, just a tad inefficient.
284 : */
285 32 : seqlock_bump(&rcu_seq);
286 32 : }
287 :
288 35554 : static void rcu_bump_maybe(void)
289 : {
290 35554 : seqlock_val_t dirty;
291 :
292 35554 : dirty = atomic_load_explicit(&rcu_dirty, memory_order_relaxed);
293 : /* no problem if we race here and multiple threads bump rcu_seq;
294 : * bumping too much causes no issues while not bumping enough will
295 : * result in delayed cleanup
296 : */
297 35554 : if (dirty == seqlock_cur(&rcu_seq))
298 8 : rcu_bump();
299 35553 : }
300 :
301 18120 : void rcu_read_lock(void)
302 : {
303 18120 : struct rcu_thread *rt = rcu_self();
304 :
305 18120 : assert(rt);
306 18120 : if (rt->depth++ > 0)
307 : return;
308 :
309 17775 : seqlock_acquire(&rt->rcu, &rcu_seq);
310 : /* need to hold RCU for bump ... */
311 17775 : rcu_bump_maybe();
312 : /* ... but no point in holding the old epoch if we just bumped */
313 17774 : seqlock_acquire(&rt->rcu, &rcu_seq);
314 : }
315 :
316 18124 : void rcu_read_unlock(void)
317 : {
318 18124 : struct rcu_thread *rt = rcu_self();
319 :
320 18124 : assert(rt && rt->depth);
321 18124 : if (--rt->depth > 0)
322 : return;
323 17779 : rcu_bump_maybe();
324 17779 : seqlock_release(&rt->rcu);
325 : }
326 :
327 56 : void rcu_assert_read_locked(void)
328 : {
329 56 : struct rcu_thread *rt = rcu_self();
330 56 : assert(rt && rt->depth && seqlock_held(&rt->rcu));
331 56 : }
332 :
333 17775 : void rcu_assert_read_unlocked(void)
334 : {
335 17775 : struct rcu_thread *rt = rcu_self();
336 17775 : assert(rt && !rt->depth && !seqlock_held(&rt->rcu));
337 17775 : }
338 :
339 : /*
340 : * RCU resource-release thread
341 : */
342 :
343 : static void *rcu_main(void *arg);
344 :
345 8 : static void rcu_start(void)
346 : {
347 : /* ensure we never handle signals on the RCU thread by blocking
348 : * everything here (new thread inherits signal mask)
349 : */
350 8 : sigset_t oldsigs, blocksigs;
351 :
352 8 : sigfillset(&blocksigs);
353 8 : pthread_sigmask(SIG_BLOCK, &blocksigs, &oldsigs);
354 :
355 8 : rcu_active = true;
356 :
357 8 : assert(!pthread_create(&rcu_pthread, NULL, rcu_main, NULL));
358 :
359 8 : pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
360 :
361 : #ifdef HAVE_PTHREAD_SETNAME_NP
362 : # ifdef GNU_LINUX
363 8 : pthread_setname_np(rcu_pthread, "RCU sweeper");
364 : # elif defined(__NetBSD__)
365 : pthread_setname_np(rcu_pthread, "RCU sweeper", NULL);
366 : # endif
367 : #elif defined(HAVE_PTHREAD_SET_NAME_NP)
368 : pthread_set_name_np(rcu_pthread, "RCU sweeper");
369 : #endif
370 8 : }
371 :
372 72 : static void rcu_do(struct rcu_head *rh)
373 : {
374 72 : struct rcu_head_close *rhc;
375 72 : void *p;
376 :
377 72 : switch (rh->action->type) {
378 64 : case RCUA_FREE:
379 64 : p = (char *)rh - rh->action->u.free.offset;
380 64 : if (rh->action->u.free.mt)
381 64 : qfree(rh->action->u.free.mt, p);
382 : else
383 0 : free(p);
384 : break;
385 8 : case RCUA_CLOSE:
386 8 : rhc = container_of(rh, struct rcu_head_close,
387 : rcu_head);
388 8 : close(rhc->fd);
389 8 : break;
390 0 : case RCUA_CALL:
391 0 : p = (char *)rh - rh->action->u.call.offset;
392 0 : rh->action->u.call.fptr(p);
393 0 : break;
394 :
395 : case RCUA_INVALID:
396 : case RCUA_NEXT:
397 : case RCUA_END:
398 : default:
399 0 : assert(0);
400 : }
401 72 : }
402 :
403 : static void rcu_watchdog(struct rcu_thread *rt)
404 : {
405 : #if 0
406 : /* future work: print a backtrace for the thread that's holding up
407 : * RCU. The only (good) way of doing that is to send a signal to the
408 : * other thread, save away the backtrace in the signal handler, and
409 : * block here until the signal is done processing.
410 : *
411 : * Just haven't implemented that yet.
412 : */
413 : fprintf(stderr, "RCU watchdog %p\n", rt);
414 : #endif
415 : }
416 :
417 8 : static void *rcu_main(void *arg)
418 : {
419 8 : struct rcu_thread *rt;
420 8 : struct rcu_head *rh = NULL;
421 8 : bool end = false;
422 8 : struct timespec maxwait;
423 :
424 8 : seqlock_val_t rcuval = SEQLOCK_STARTVAL;
425 :
426 8 : pthread_setspecific(rcu_thread_key, &rcu_thread_rcu);
427 :
428 48 : while (!end) {
429 40 : seqlock_wait(&rcu_seq, rcuval);
430 :
431 : /* RCU watchdog timeout, TODO: configurable value */
432 40 : clock_gettime(CLOCK_MONOTONIC, &maxwait);
433 40 : maxwait.tv_nsec += 100 * 1000 * 1000;
434 40 : if (maxwait.tv_nsec >= 1000000000) {
435 0 : maxwait.tv_sec++;
436 0 : maxwait.tv_nsec -= 1000000000;
437 : }
438 :
439 116 : frr_each (rcu_threads, &rcu_threads, rt)
440 76 : if (!seqlock_timedwait(&rt->rcu, rcuval, &maxwait)) {
441 0 : rcu_watchdog(rt);
442 0 : seqlock_wait(&rt->rcu, rcuval);
443 : }
444 :
445 120 : while ((rh = rcu_heads_pop(&rcu_heads))) {
446 112 : if (rh->action->type == RCUA_NEXT)
447 : break;
448 80 : else if (rh->action->type == RCUA_END)
449 : end = true;
450 : else
451 72 : rcu_do(rh);
452 : }
453 :
454 40 : rcuval += SEQLOCK_INCR;
455 : }
456 :
457 : /* rcu_shutdown can only be called singlethreaded, and it does a
458 : * pthread_join, so it should be impossible that anything ended up
459 : * on the queue after RCUA_END
460 : */
461 : #if 1
462 16 : assert(!rcu_heads_first(&rcu_heads));
463 : #else
464 : while ((rh = rcu_heads_pop(&rcu_heads)))
465 : if (rh->action->type >= RCUA_FREE)
466 : rcu_do(rh);
467 : #endif
468 8 : return NULL;
469 : }
470 :
471 8 : void rcu_shutdown(void)
472 : {
473 8 : static struct rcu_head rcu_head_end;
474 8 : struct rcu_thread *rt = rcu_self();
475 8 : void *retval;
476 :
477 8 : if (!rcu_active)
478 0 : return;
479 :
480 8 : rcu_assert_read_locked();
481 8 : assert(rcu_threads_count(&rcu_threads) == 1);
482 :
483 8 : rcu_enqueue(&rcu_head_end, &rcua_end);
484 :
485 8 : rt->depth = 0;
486 8 : seqlock_release(&rt->rcu);
487 8 : seqlock_release(&rcu_seq);
488 8 : rcu_active = false;
489 :
490 : /* clearing rcu_active is before pthread_join in case we hang in
491 : * pthread_join & get a SIGTERM or something - in that case, just
492 : * ignore the maybe-still-running RCU thread
493 : */
494 8 : if (pthread_join(rcu_pthread, &retval) == 0) {
495 8 : seqlock_acquire_val(&rcu_seq, SEQLOCK_STARTVAL);
496 8 : seqlock_acquire_val(&rt->rcu, SEQLOCK_STARTVAL);
497 8 : rt->depth = 1;
498 : }
499 : }
500 :
501 : /*
502 : * RCU'd free functions
503 : */
504 :
505 24 : void rcu_enqueue(struct rcu_head *rh, const struct rcu_action *action)
506 : {
507 : /* refer to rcu_bump() for why we need to hold RCU when adding items
508 : * to rcu_heads
509 : */
510 24 : rcu_assert_read_locked();
511 :
512 24 : rh->action = action;
513 :
514 24 : if (!rcu_active) {
515 0 : rcu_do(rh);
516 0 : return;
517 : }
518 24 : rcu_heads_add_tail(&rcu_heads, rh);
519 24 : atomic_store_explicit(&rcu_dirty, seqlock_cur(&rcu_seq),
520 : memory_order_relaxed);
521 : }
522 :
523 8 : void rcu_close(struct rcu_head_close *rhc, int fd)
524 : {
525 8 : rhc->fd = fd;
526 8 : rcu_enqueue(&rhc->rcu_head, &rcua_close);
527 8 : }
|