Line data Source code
1 : /*
2 : * Packet interface
3 : * Copyright (C) 1999 Kunihiro Ishiguro
4 : *
5 : * This file is part of GNU Zebra.
6 : *
7 : * GNU Zebra is free software; you can redistribute it and/or modify it
8 : * under the terms of the GNU General Public License as published by the
9 : * Free Software Foundation; either version 2, or (at your option) any
10 : * later version.
11 : *
12 : * GNU Zebra is distributed in the hope that it will be useful, but
13 : * WITHOUT ANY WARRANTY; without even the implied warranty of
14 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 : * General Public License for more details.
16 : *
17 : * You should have received a copy of the GNU General Public License along
18 : * with this program; see the file COPYING; if not, write to the Free Software
19 : * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 : */
21 :
22 : #include <zebra.h>
23 : #include <stddef.h>
24 : #include <pthread.h>
25 :
26 : #include "stream.h"
27 : #include "memory.h"
28 : #include "network.h"
29 : #include "prefix.h"
30 : #include "log.h"
31 : #include "frr_pthread.h"
32 : #include "lib_errors.h"
33 :
34 18 : DEFINE_MTYPE_STATIC(LIB, STREAM, "Stream");
35 18 : DEFINE_MTYPE_STATIC(LIB, STREAM_FIFO, "Stream FIFO");
36 :
37 : /* Tests whether a position is valid */
38 : #define GETP_VALID(S, G) ((G) <= (S)->endp)
39 : #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
40 : #define ENDP_VALID(S, E) ((E) <= (S)->size)
41 :
42 : /* asserting sanity checks. Following must be true before
43 : * stream functions are called:
44 : *
45 : * Following must always be true of stream elements
46 : * before and after calls to stream functions:
47 : *
48 : * getp <= endp <= size
49 : *
50 : * Note that after a stream function is called following may be true:
51 : * if (getp == endp) then stream is no longer readable
52 : * if (endp == size) then stream is no longer writeable
53 : *
54 : * It is valid to put to anywhere within the size of the stream, but only
55 : * using stream_put..._at() functions.
56 : */
57 : #define STREAM_WARN_OFFSETS(S) \
58 : do { \
59 : flog_warn(EC_LIB_STREAM, \
60 : "&(struct stream): %p, size: %lu, getp: %lu, endp: %lu", \
61 : (void *)(S), (unsigned long)(S)->size, \
62 : (unsigned long)(S)->getp, (unsigned long)(S)->endp); \
63 : zlog_backtrace(LOG_WARNING); \
64 : } while (0)
65 :
66 : #define STREAM_VERIFY_SANE(S) \
67 : do { \
68 : if (!(GETP_VALID(S, (S)->getp) && ENDP_VALID(S, (S)->endp))) { \
69 : STREAM_WARN_OFFSETS(S); \
70 : } \
71 : assert(GETP_VALID(S, (S)->getp)); \
72 : assert(ENDP_VALID(S, (S)->endp)); \
73 : } while (0)
74 :
75 : #define STREAM_BOUND_WARN(S, WHAT) \
76 : do { \
77 : flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
78 : __func__, (WHAT)); \
79 : STREAM_WARN_OFFSETS(S); \
80 : assert(0); \
81 : } while (0)
82 :
83 : #define STREAM_BOUND_WARN2(S, WHAT) \
84 : do { \
85 : flog_warn(EC_LIB_STREAM, "%s: Attempt to %s out of bounds", \
86 : __func__, (WHAT)); \
87 : STREAM_WARN_OFFSETS(S); \
88 : } while (0)
89 :
90 : /* XXX: Deprecated macro: do not use */
91 : #define CHECK_SIZE(S, Z) \
92 : do { \
93 : if (((S)->endp + (Z)) > (S)->size) { \
94 : flog_warn( \
95 : EC_LIB_STREAM, \
96 : "CHECK_SIZE: truncating requested size %lu", \
97 : (unsigned long)(Z)); \
98 : STREAM_WARN_OFFSETS(S); \
99 : (Z) = (S)->size - (S)->endp; \
100 : } \
101 : } while (0);
102 :
103 : /* Make stream buffer. */
104 507 : struct stream *stream_new(size_t size)
105 : {
106 507 : struct stream *s;
107 :
108 507 : assert(size > 0);
109 :
110 507 : s = XMALLOC(MTYPE_STREAM, sizeof(struct stream) + size);
111 :
112 507 : s->getp = s->endp = 0;
113 507 : s->next = NULL;
114 507 : s->size = size;
115 507 : return s;
116 : }
117 :
118 : /* Free it now. */
119 507 : void stream_free(struct stream *s)
120 : {
121 507 : if (!s)
122 : return;
123 :
124 507 : XFREE(MTYPE_STREAM, s);
125 : }
126 :
127 183 : struct stream *stream_copy(struct stream *dest, const struct stream *src)
128 : {
129 183 : STREAM_VERIFY_SANE(src);
130 :
131 183 : assert(dest != NULL);
132 183 : assert(STREAM_SIZE(dest) >= src->endp);
133 :
134 183 : dest->endp = src->endp;
135 183 : dest->getp = src->getp;
136 :
137 183 : memcpy(dest->data, src->data, src->endp);
138 :
139 183 : return dest;
140 : }
141 :
142 171 : struct stream *stream_dup(const struct stream *s)
143 : {
144 171 : struct stream *snew;
145 :
146 171 : STREAM_VERIFY_SANE(s);
147 :
148 171 : snew = stream_new(s->endp);
149 :
150 171 : return (stream_copy(snew, s));
151 : }
152 :
153 0 : struct stream *stream_dupcat(const struct stream *s1, const struct stream *s2,
154 : size_t offset)
155 : {
156 0 : struct stream *new;
157 :
158 0 : STREAM_VERIFY_SANE(s1);
159 0 : STREAM_VERIFY_SANE(s2);
160 :
161 0 : if ((new = stream_new(s1->endp + s2->endp)) == NULL)
162 : return NULL;
163 :
164 0 : memcpy(new->data, s1->data, offset);
165 0 : memcpy(new->data + offset, s2->data, s2->endp);
166 0 : memcpy(new->data + offset + s2->endp, s1->data + offset,
167 0 : (s1->endp - offset));
168 0 : new->endp = s1->endp + s2->endp;
169 0 : return new;
170 : }
171 :
172 0 : size_t stream_resize_inplace(struct stream **sptr, size_t newsize)
173 : {
174 0 : struct stream *orig = *sptr;
175 :
176 0 : STREAM_VERIFY_SANE(orig);
177 :
178 0 : orig = XREALLOC(MTYPE_STREAM, orig, sizeof(struct stream) + newsize);
179 :
180 0 : orig->size = newsize;
181 :
182 0 : if (orig->endp > orig->size)
183 0 : orig->endp = orig->size;
184 0 : if (orig->getp > orig->endp)
185 0 : orig->getp = orig->endp;
186 :
187 0 : STREAM_VERIFY_SANE(orig);
188 :
189 0 : *sptr = orig;
190 0 : return orig->size;
191 : }
192 :
193 609 : size_t stream_get_getp(const struct stream *s)
194 : {
195 609 : STREAM_VERIFY_SANE(s);
196 609 : return s->getp;
197 : }
198 :
199 1327 : size_t stream_get_endp(const struct stream *s)
200 : {
201 1327 : STREAM_VERIFY_SANE(s);
202 1327 : return s->endp;
203 : }
204 :
205 0 : size_t stream_get_size(const struct stream *s)
206 : {
207 0 : STREAM_VERIFY_SANE(s);
208 0 : return s->size;
209 : }
210 :
211 : /* Stream structre' stream pointer related functions. */
212 607 : void stream_set_getp(struct stream *s, size_t pos)
213 : {
214 607 : STREAM_VERIFY_SANE(s);
215 :
216 607 : if (!GETP_VALID(s, pos)) {
217 0 : STREAM_BOUND_WARN(s, "set getp");
218 : pos = s->endp;
219 : }
220 :
221 607 : s->getp = pos;
222 607 : }
223 :
224 33 : void stream_set_endp(struct stream *s, size_t pos)
225 : {
226 33 : STREAM_VERIFY_SANE(s);
227 :
228 33 : if (!ENDP_VALID(s, pos)) {
229 0 : STREAM_BOUND_WARN(s, "set endp");
230 : return;
231 : }
232 :
233 : /*
234 : * Make sure the current read pointer is not beyond the new endp.
235 : */
236 33 : if (s->getp > pos) {
237 0 : STREAM_BOUND_WARN(s, "set endp");
238 : return;
239 : }
240 :
241 33 : s->endp = pos;
242 33 : STREAM_VERIFY_SANE(s);
243 : }
244 :
245 : /* Forward pointer. */
246 61 : void stream_forward_getp(struct stream *s, size_t size)
247 : {
248 61 : STREAM_VERIFY_SANE(s);
249 :
250 61 : if (!GETP_VALID(s, s->getp + size)) {
251 0 : STREAM_BOUND_WARN(s, "seek getp");
252 : return;
253 : }
254 :
255 61 : s->getp += size;
256 : }
257 :
258 0 : bool stream_forward_getp2(struct stream *s, size_t size)
259 : {
260 0 : STREAM_VERIFY_SANE(s);
261 :
262 0 : if (!GETP_VALID(s, s->getp + size))
263 : return false;
264 :
265 0 : s->getp += size;
266 :
267 0 : return true;
268 : }
269 :
270 0 : void stream_rewind_getp(struct stream *s, size_t size)
271 : {
272 0 : STREAM_VERIFY_SANE(s);
273 :
274 0 : if (size > s->getp || !GETP_VALID(s, s->getp - size)) {
275 0 : STREAM_BOUND_WARN(s, "rewind getp");
276 : return;
277 : }
278 :
279 0 : s->getp -= size;
280 : }
281 :
282 0 : bool stream_rewind_getp2(struct stream *s, size_t size)
283 : {
284 0 : STREAM_VERIFY_SANE(s);
285 :
286 0 : if (size > s->getp || !GETP_VALID(s, s->getp - size))
287 : return false;
288 :
289 0 : s->getp -= size;
290 :
291 0 : return true;
292 : }
293 :
294 0 : void stream_forward_endp(struct stream *s, size_t size)
295 : {
296 0 : STREAM_VERIFY_SANE(s);
297 :
298 0 : if (!ENDP_VALID(s, s->endp + size)) {
299 0 : STREAM_BOUND_WARN(s, "seek endp");
300 : return;
301 : }
302 :
303 0 : s->endp += size;
304 : }
305 :
306 0 : bool stream_forward_endp2(struct stream *s, size_t size)
307 : {
308 0 : STREAM_VERIFY_SANE(s);
309 :
310 0 : if (!ENDP_VALID(s, s->endp + size))
311 : return false;
312 :
313 0 : s->endp += size;
314 :
315 0 : return true;
316 : }
317 :
318 : /* Copy from stream to destination. */
319 262 : bool stream_get2(void *dst, struct stream *s, size_t size)
320 : {
321 262 : STREAM_VERIFY_SANE(s);
322 :
323 262 : if (STREAM_READABLE(s) < size) {
324 0 : STREAM_BOUND_WARN2(s, "get");
325 0 : return false;
326 : }
327 :
328 262 : memcpy(dst, s->data + s->getp, size);
329 262 : s->getp += size;
330 :
331 262 : return true;
332 : }
333 :
334 4 : void stream_get(void *dst, struct stream *s, size_t size)
335 : {
336 4 : STREAM_VERIFY_SANE(s);
337 :
338 4 : if (STREAM_READABLE(s) < size) {
339 0 : STREAM_BOUND_WARN(s, "get");
340 : return;
341 : }
342 :
343 4 : memcpy(dst, s->data + s->getp, size);
344 4 : s->getp += size;
345 : }
346 :
347 : /* Get next character from the stream. */
348 1171 : bool stream_getc2(struct stream *s, uint8_t *byte)
349 : {
350 1171 : STREAM_VERIFY_SANE(s);
351 :
352 1171 : if (STREAM_READABLE(s) < sizeof(uint8_t)) {
353 0 : STREAM_BOUND_WARN2(s, "get char");
354 0 : return false;
355 : }
356 1171 : *byte = s->data[s->getp++];
357 :
358 1171 : return true;
359 : }
360 :
361 949 : uint8_t stream_getc(struct stream *s)
362 : {
363 949 : uint8_t c;
364 :
365 949 : STREAM_VERIFY_SANE(s);
366 :
367 949 : if (STREAM_READABLE(s) < sizeof(uint8_t)) {
368 0 : STREAM_BOUND_WARN(s, "get char");
369 : return 0;
370 : }
371 949 : c = s->data[s->getp++];
372 :
373 949 : return c;
374 : }
375 :
376 : /* Get next character from the stream. */
377 8 : uint8_t stream_getc_from(struct stream *s, size_t from)
378 : {
379 8 : uint8_t c;
380 :
381 8 : STREAM_VERIFY_SANE(s);
382 :
383 8 : if (!GETP_VALID(s, from + sizeof(uint8_t))) {
384 0 : STREAM_BOUND_WARN(s, "get char");
385 : return 0;
386 : }
387 :
388 8 : c = s->data[from];
389 :
390 8 : return c;
391 : }
392 :
393 690 : bool stream_getw2(struct stream *s, uint16_t *word)
394 : {
395 690 : STREAM_VERIFY_SANE(s);
396 :
397 690 : if (STREAM_READABLE(s) < sizeof(uint16_t)) {
398 0 : STREAM_BOUND_WARN2(s, "get ");
399 0 : return false;
400 : }
401 :
402 690 : *word = s->data[s->getp++] << 8;
403 690 : *word |= s->data[s->getp++];
404 :
405 690 : return true;
406 : }
407 :
408 : /* Get next word from the stream. */
409 508 : uint16_t stream_getw(struct stream *s)
410 : {
411 508 : uint16_t w;
412 :
413 508 : STREAM_VERIFY_SANE(s);
414 :
415 508 : if (STREAM_READABLE(s) < sizeof(uint16_t)) {
416 0 : STREAM_BOUND_WARN(s, "get ");
417 : return 0;
418 : }
419 :
420 508 : w = s->data[s->getp++] << 8;
421 508 : w |= s->data[s->getp++];
422 :
423 508 : return w;
424 : }
425 :
426 : /* Get next word from the stream. */
427 60 : uint16_t stream_getw_from(struct stream *s, size_t from)
428 : {
429 60 : uint16_t w;
430 :
431 60 : STREAM_VERIFY_SANE(s);
432 :
433 60 : if (!GETP_VALID(s, from + sizeof(uint16_t))) {
434 0 : STREAM_BOUND_WARN(s, "get ");
435 : return 0;
436 : }
437 :
438 60 : w = s->data[from++] << 8;
439 60 : w |= s->data[from];
440 :
441 60 : return w;
442 : }
443 :
444 : /* Get next 3-byte from the stream. */
445 0 : uint32_t stream_get3_from(struct stream *s, size_t from)
446 : {
447 0 : uint32_t l;
448 :
449 0 : STREAM_VERIFY_SANE(s);
450 :
451 0 : if (!GETP_VALID(s, from + 3)) {
452 0 : STREAM_BOUND_WARN(s, "get 3byte");
453 : return 0;
454 : }
455 :
456 0 : l = s->data[from++] << 16;
457 0 : l |= s->data[from++] << 8;
458 0 : l |= s->data[from];
459 :
460 0 : return l;
461 : }
462 :
463 4 : uint32_t stream_get3(struct stream *s)
464 : {
465 4 : uint32_t l;
466 :
467 4 : STREAM_VERIFY_SANE(s);
468 :
469 4 : if (STREAM_READABLE(s) < 3) {
470 0 : STREAM_BOUND_WARN(s, "get 3byte");
471 : return 0;
472 : }
473 :
474 4 : l = s->data[s->getp++] << 16;
475 4 : l |= s->data[s->getp++] << 8;
476 4 : l |= s->data[s->getp++];
477 :
478 4 : return l;
479 : }
480 :
481 : /* Get next long word from the stream. */
482 0 : uint32_t stream_getl_from(struct stream *s, size_t from)
483 : {
484 0 : uint32_t l;
485 :
486 0 : STREAM_VERIFY_SANE(s);
487 :
488 0 : if (!GETP_VALID(s, from + sizeof(uint32_t))) {
489 0 : STREAM_BOUND_WARN(s, "get long");
490 : return 0;
491 : }
492 :
493 0 : l = (unsigned)(s->data[from++]) << 24;
494 0 : l |= s->data[from++] << 16;
495 0 : l |= s->data[from++] << 8;
496 0 : l |= s->data[from];
497 :
498 0 : return l;
499 : }
500 :
501 : /* Copy from stream at specific location to destination. */
502 11 : void stream_get_from(void *dst, struct stream *s, size_t from, size_t size)
503 : {
504 11 : STREAM_VERIFY_SANE(s);
505 :
506 11 : if (!GETP_VALID(s, from + size)) {
507 0 : STREAM_BOUND_WARN(s, "get from");
508 : return;
509 : }
510 :
511 11 : memcpy(dst, s->data + from, size);
512 : }
513 :
514 1281 : bool stream_getl2(struct stream *s, uint32_t *l)
515 : {
516 1281 : STREAM_VERIFY_SANE(s);
517 :
518 1281 : if (STREAM_READABLE(s) < sizeof(uint32_t)) {
519 0 : STREAM_BOUND_WARN2(s, "get long");
520 0 : return false;
521 : }
522 :
523 1281 : *l = (unsigned int)(s->data[s->getp++]) << 24;
524 1281 : *l |= s->data[s->getp++] << 16;
525 1281 : *l |= s->data[s->getp++] << 8;
526 1281 : *l |= s->data[s->getp++];
527 :
528 1281 : return true;
529 : }
530 :
531 236 : uint32_t stream_getl(struct stream *s)
532 : {
533 236 : uint32_t l;
534 :
535 236 : STREAM_VERIFY_SANE(s);
536 :
537 236 : if (STREAM_READABLE(s) < sizeof(uint32_t)) {
538 0 : STREAM_BOUND_WARN(s, "get long");
539 : return 0;
540 : }
541 :
542 236 : l = (unsigned)(s->data[s->getp++]) << 24;
543 236 : l |= s->data[s->getp++] << 16;
544 236 : l |= s->data[s->getp++] << 8;
545 236 : l |= s->data[s->getp++];
546 :
547 236 : return l;
548 : }
549 :
550 : /* Get next quad word from the stream. */
551 0 : uint64_t stream_getq_from(struct stream *s, size_t from)
552 : {
553 0 : uint64_t q;
554 :
555 0 : STREAM_VERIFY_SANE(s);
556 :
557 0 : if (!GETP_VALID(s, from + sizeof(uint64_t))) {
558 0 : STREAM_BOUND_WARN(s, "get quad");
559 : return 0;
560 : }
561 :
562 0 : q = ((uint64_t)s->data[from++]) << 56;
563 0 : q |= ((uint64_t)s->data[from++]) << 48;
564 0 : q |= ((uint64_t)s->data[from++]) << 40;
565 0 : q |= ((uint64_t)s->data[from++]) << 32;
566 0 : q |= ((uint64_t)s->data[from++]) << 24;
567 0 : q |= ((uint64_t)s->data[from++]) << 16;
568 0 : q |= ((uint64_t)s->data[from++]) << 8;
569 0 : q |= ((uint64_t)s->data[from++]);
570 :
571 0 : return q;
572 : }
573 :
574 0 : uint64_t stream_getq(struct stream *s)
575 : {
576 0 : uint64_t q;
577 :
578 0 : STREAM_VERIFY_SANE(s);
579 :
580 0 : if (STREAM_READABLE(s) < sizeof(uint64_t)) {
581 0 : STREAM_BOUND_WARN(s, "get quad");
582 : return 0;
583 : }
584 :
585 0 : q = ((uint64_t)s->data[s->getp++]) << 56;
586 0 : q |= ((uint64_t)s->data[s->getp++]) << 48;
587 0 : q |= ((uint64_t)s->data[s->getp++]) << 40;
588 0 : q |= ((uint64_t)s->data[s->getp++]) << 32;
589 0 : q |= ((uint64_t)s->data[s->getp++]) << 24;
590 0 : q |= ((uint64_t)s->data[s->getp++]) << 16;
591 0 : q |= ((uint64_t)s->data[s->getp++]) << 8;
592 0 : q |= ((uint64_t)s->data[s->getp++]);
593 :
594 0 : return q;
595 : }
596 :
597 44 : bool stream_getq2(struct stream *s, uint64_t *q)
598 : {
599 44 : STREAM_VERIFY_SANE(s);
600 :
601 44 : if (STREAM_READABLE(s) < sizeof(uint64_t)) {
602 0 : STREAM_BOUND_WARN2(s, "get uint64");
603 0 : return false;
604 : }
605 :
606 44 : *q = ((uint64_t)s->data[s->getp++]) << 56;
607 44 : *q |= ((uint64_t)s->data[s->getp++]) << 48;
608 44 : *q |= ((uint64_t)s->data[s->getp++]) << 40;
609 44 : *q |= ((uint64_t)s->data[s->getp++]) << 32;
610 44 : *q |= ((uint64_t)s->data[s->getp++]) << 24;
611 44 : *q |= ((uint64_t)s->data[s->getp++]) << 16;
612 44 : *q |= ((uint64_t)s->data[s->getp++]) << 8;
613 44 : *q |= ((uint64_t)s->data[s->getp++]);
614 :
615 44 : return true;
616 : }
617 :
618 : /* Get next long word from the stream. */
619 14 : uint32_t stream_get_ipv4(struct stream *s)
620 : {
621 14 : uint32_t l;
622 :
623 14 : STREAM_VERIFY_SANE(s);
624 :
625 14 : if (STREAM_READABLE(s) < sizeof(uint32_t)) {
626 0 : STREAM_BOUND_WARN(s, "get ipv4");
627 : return 0;
628 : }
629 :
630 14 : memcpy(&l, s->data + s->getp, sizeof(uint32_t));
631 14 : s->getp += sizeof(uint32_t);
632 :
633 14 : return l;
634 : }
635 :
636 0 : bool stream_get_ipaddr(struct stream *s, struct ipaddr *ip)
637 : {
638 0 : uint16_t ipa_len = 0;
639 :
640 0 : STREAM_VERIFY_SANE(s);
641 :
642 : /* Get address type. */
643 0 : if (STREAM_READABLE(s) < sizeof(uint16_t)) {
644 0 : STREAM_BOUND_WARN2(s, "get ipaddr");
645 0 : return false;
646 : }
647 0 : ip->ipa_type = stream_getw(s);
648 :
649 : /* Get address value. */
650 0 : switch (ip->ipa_type) {
651 0 : case IPADDR_V4:
652 0 : ipa_len = IPV4_MAX_BYTELEN;
653 0 : break;
654 0 : case IPADDR_V6:
655 0 : ipa_len = IPV6_MAX_BYTELEN;
656 0 : break;
657 0 : case IPADDR_NONE:
658 0 : flog_err(EC_LIB_DEVELOPMENT,
659 : "%s: unknown ip address-family: %u", __func__,
660 : ip->ipa_type);
661 0 : return false;
662 : }
663 0 : if (STREAM_READABLE(s) < ipa_len) {
664 0 : STREAM_BOUND_WARN2(s, "get ipaddr");
665 0 : return false;
666 : }
667 0 : memcpy(&ip->ip, s->data + s->getp, ipa_len);
668 0 : s->getp += ipa_len;
669 :
670 0 : return true;
671 : }
672 :
673 0 : float stream_getf(struct stream *s)
674 : {
675 0 : union {
676 : float r;
677 : uint32_t d;
678 : } u;
679 0 : u.d = stream_getl(s);
680 0 : return u.r;
681 : }
682 :
683 0 : double stream_getd(struct stream *s)
684 : {
685 0 : union {
686 : double r;
687 : uint64_t d;
688 : } u;
689 0 : u.d = stream_getq(s);
690 0 : return u.r;
691 : }
692 :
693 : /* Copy from source to stream.
694 : *
695 : * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
696 : * around. This should be fixed once the stream updates are working.
697 : *
698 : * stream_write() is saner
699 : */
700 305 : void stream_put(struct stream *s, const void *src, size_t size)
701 : {
702 :
703 : /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
704 305 : CHECK_SIZE(s, size);
705 :
706 305 : STREAM_VERIFY_SANE(s);
707 :
708 305 : if (STREAM_WRITEABLE(s) < size) {
709 0 : STREAM_BOUND_WARN(s, "put");
710 : return;
711 : }
712 :
713 305 : if (src)
714 204 : memcpy(s->data + s->endp, src, size);
715 : else
716 101 : memset(s->data + s->endp, 0, size);
717 :
718 305 : s->endp += size;
719 : }
720 :
721 : /* Put character to the stream. */
722 2456 : int stream_putc(struct stream *s, uint8_t c)
723 : {
724 2456 : STREAM_VERIFY_SANE(s);
725 :
726 2456 : if (STREAM_WRITEABLE(s) < sizeof(uint8_t)) {
727 0 : STREAM_BOUND_WARN(s, "put");
728 : return 0;
729 : }
730 :
731 2456 : s->data[s->endp++] = c;
732 2456 : return sizeof(uint8_t);
733 : }
734 :
735 : /* Put word to the stream. */
736 927 : int stream_putw(struct stream *s, uint16_t w)
737 : {
738 927 : STREAM_VERIFY_SANE(s);
739 :
740 927 : if (STREAM_WRITEABLE(s) < sizeof(uint16_t)) {
741 0 : STREAM_BOUND_WARN(s, "put");
742 : return 0;
743 : }
744 :
745 927 : s->data[s->endp++] = (uint8_t)(w >> 8);
746 927 : s->data[s->endp++] = (uint8_t)w;
747 :
748 927 : return 2;
749 : }
750 :
751 : /* Put long word to the stream. */
752 6 : int stream_put3(struct stream *s, uint32_t l)
753 : {
754 6 : STREAM_VERIFY_SANE(s);
755 :
756 6 : if (STREAM_WRITEABLE(s) < 3) {
757 0 : STREAM_BOUND_WARN(s, "put");
758 : return 0;
759 : }
760 :
761 6 : s->data[s->endp++] = (uint8_t)(l >> 16);
762 6 : s->data[s->endp++] = (uint8_t)(l >> 8);
763 6 : s->data[s->endp++] = (uint8_t)l;
764 :
765 6 : return 3;
766 : }
767 :
768 : /* Put long word to the stream. */
769 1458 : int stream_putl(struct stream *s, uint32_t l)
770 : {
771 1458 : STREAM_VERIFY_SANE(s);
772 :
773 1458 : if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
774 0 : STREAM_BOUND_WARN(s, "put");
775 : return 0;
776 : }
777 :
778 1458 : s->data[s->endp++] = (uint8_t)(l >> 24);
779 1458 : s->data[s->endp++] = (uint8_t)(l >> 16);
780 1458 : s->data[s->endp++] = (uint8_t)(l >> 8);
781 1458 : s->data[s->endp++] = (uint8_t)l;
782 :
783 1458 : return 4;
784 : }
785 :
786 : /* Put quad word to the stream. */
787 44 : int stream_putq(struct stream *s, uint64_t q)
788 : {
789 44 : STREAM_VERIFY_SANE(s);
790 :
791 44 : if (STREAM_WRITEABLE(s) < sizeof(uint64_t)) {
792 0 : STREAM_BOUND_WARN(s, "put quad");
793 : return 0;
794 : }
795 :
796 44 : s->data[s->endp++] = (uint8_t)(q >> 56);
797 44 : s->data[s->endp++] = (uint8_t)(q >> 48);
798 44 : s->data[s->endp++] = (uint8_t)(q >> 40);
799 44 : s->data[s->endp++] = (uint8_t)(q >> 32);
800 44 : s->data[s->endp++] = (uint8_t)(q >> 24);
801 44 : s->data[s->endp++] = (uint8_t)(q >> 16);
802 44 : s->data[s->endp++] = (uint8_t)(q >> 8);
803 44 : s->data[s->endp++] = (uint8_t)q;
804 :
805 44 : return 8;
806 : }
807 :
808 0 : int stream_putf(struct stream *s, float f)
809 : {
810 0 : union {
811 : float i;
812 : uint32_t o;
813 : } u;
814 0 : u.i = f;
815 0 : return stream_putl(s, u.o);
816 : }
817 :
818 0 : int stream_putd(struct stream *s, double d)
819 : {
820 0 : union {
821 : double i;
822 : uint64_t o;
823 : } u;
824 0 : u.i = d;
825 0 : return stream_putq(s, u.o);
826 : }
827 :
828 46 : int stream_putc_at(struct stream *s, size_t putp, uint8_t c)
829 : {
830 46 : STREAM_VERIFY_SANE(s);
831 :
832 46 : if (!PUT_AT_VALID(s, putp + sizeof(uint8_t))) {
833 0 : STREAM_BOUND_WARN(s, "put");
834 : return 0;
835 : }
836 :
837 46 : s->data[putp] = c;
838 :
839 46 : return 1;
840 : }
841 :
842 401 : int stream_putw_at(struct stream *s, size_t putp, uint16_t w)
843 : {
844 401 : STREAM_VERIFY_SANE(s);
845 :
846 401 : if (!PUT_AT_VALID(s, putp + sizeof(uint16_t))) {
847 0 : STREAM_BOUND_WARN(s, "put");
848 : return 0;
849 : }
850 :
851 401 : s->data[putp] = (uint8_t)(w >> 8);
852 401 : s->data[putp + 1] = (uint8_t)w;
853 :
854 401 : return 2;
855 : }
856 :
857 0 : int stream_put3_at(struct stream *s, size_t putp, uint32_t l)
858 : {
859 0 : STREAM_VERIFY_SANE(s);
860 :
861 0 : if (!PUT_AT_VALID(s, putp + 3)) {
862 0 : STREAM_BOUND_WARN(s, "put");
863 : return 0;
864 : }
865 0 : s->data[putp] = (uint8_t)(l >> 16);
866 0 : s->data[putp + 1] = (uint8_t)(l >> 8);
867 0 : s->data[putp + 2] = (uint8_t)l;
868 :
869 0 : return 3;
870 : }
871 :
872 0 : int stream_putl_at(struct stream *s, size_t putp, uint32_t l)
873 : {
874 0 : STREAM_VERIFY_SANE(s);
875 :
876 0 : if (!PUT_AT_VALID(s, putp + sizeof(uint32_t))) {
877 0 : STREAM_BOUND_WARN(s, "put");
878 : return 0;
879 : }
880 0 : s->data[putp] = (uint8_t)(l >> 24);
881 0 : s->data[putp + 1] = (uint8_t)(l >> 16);
882 0 : s->data[putp + 2] = (uint8_t)(l >> 8);
883 0 : s->data[putp + 3] = (uint8_t)l;
884 :
885 0 : return 4;
886 : }
887 :
888 0 : int stream_putq_at(struct stream *s, size_t putp, uint64_t q)
889 : {
890 0 : STREAM_VERIFY_SANE(s);
891 :
892 0 : if (!PUT_AT_VALID(s, putp + sizeof(uint64_t))) {
893 0 : STREAM_BOUND_WARN(s, "put");
894 : return 0;
895 : }
896 0 : s->data[putp] = (uint8_t)(q >> 56);
897 0 : s->data[putp + 1] = (uint8_t)(q >> 48);
898 0 : s->data[putp + 2] = (uint8_t)(q >> 40);
899 0 : s->data[putp + 3] = (uint8_t)(q >> 32);
900 0 : s->data[putp + 4] = (uint8_t)(q >> 24);
901 0 : s->data[putp + 5] = (uint8_t)(q >> 16);
902 0 : s->data[putp + 6] = (uint8_t)(q >> 8);
903 0 : s->data[putp + 7] = (uint8_t)q;
904 :
905 0 : return 8;
906 : }
907 :
908 : /* Put long word to the stream. */
909 7 : int stream_put_ipv4(struct stream *s, uint32_t l)
910 : {
911 7 : STREAM_VERIFY_SANE(s);
912 :
913 7 : if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
914 0 : STREAM_BOUND_WARN(s, "put");
915 : return 0;
916 : }
917 7 : memcpy(s->data + s->endp, &l, sizeof(uint32_t));
918 7 : s->endp += sizeof(uint32_t);
919 :
920 7 : return sizeof(uint32_t);
921 : }
922 :
923 : /* Put long word to the stream. */
924 30 : int stream_put_in_addr(struct stream *s, const struct in_addr *addr)
925 : {
926 30 : STREAM_VERIFY_SANE(s);
927 :
928 30 : if (STREAM_WRITEABLE(s) < sizeof(uint32_t)) {
929 0 : STREAM_BOUND_WARN(s, "put");
930 : return 0;
931 : }
932 :
933 30 : memcpy(s->data + s->endp, addr, sizeof(uint32_t));
934 30 : s->endp += sizeof(uint32_t);
935 :
936 30 : return sizeof(uint32_t);
937 : }
938 :
939 0 : bool stream_put_ipaddr(struct stream *s, struct ipaddr *ip)
940 : {
941 0 : stream_putw(s, ip->ipa_type);
942 :
943 0 : switch (ip->ipa_type) {
944 0 : case IPADDR_V4:
945 0 : stream_put_in_addr(s, &ip->ipaddr_v4);
946 0 : break;
947 0 : case IPADDR_V6:
948 0 : stream_write(s, (uint8_t *)&ip->ipaddr_v6, 16);
949 0 : break;
950 0 : case IPADDR_NONE:
951 0 : flog_err(EC_LIB_DEVELOPMENT,
952 : "%s: unknown ip address-family: %u", __func__,
953 : ip->ipa_type);
954 0 : return false;
955 : }
956 :
957 : return true;
958 : }
959 :
960 : /* Put in_addr at location in the stream. */
961 8 : int stream_put_in_addr_at(struct stream *s, size_t putp,
962 : const struct in_addr *addr)
963 : {
964 8 : STREAM_VERIFY_SANE(s);
965 :
966 8 : if (!PUT_AT_VALID(s, putp + 4)) {
967 0 : STREAM_BOUND_WARN(s, "put");
968 : return 0;
969 : }
970 :
971 8 : memcpy(&s->data[putp], addr, 4);
972 8 : return 4;
973 : }
974 :
975 : /* Put in6_addr at location in the stream. */
976 0 : int stream_put_in6_addr_at(struct stream *s, size_t putp,
977 : const struct in6_addr *addr)
978 : {
979 0 : STREAM_VERIFY_SANE(s);
980 :
981 0 : if (!PUT_AT_VALID(s, putp + 16)) {
982 0 : STREAM_BOUND_WARN(s, "put");
983 : return 0;
984 : }
985 :
986 0 : memcpy(&s->data[putp], addr, 16);
987 0 : return 16;
988 : }
989 :
990 : /* Put prefix by nlri type format. */
991 16 : int stream_put_prefix_addpath(struct stream *s, const struct prefix *p,
992 : bool addpath_capable, uint32_t addpath_tx_id)
993 : {
994 16 : size_t psize;
995 16 : size_t psize_with_addpath;
996 :
997 16 : STREAM_VERIFY_SANE(s);
998 :
999 16 : psize = PSIZE(p->prefixlen);
1000 :
1001 16 : if (addpath_capable)
1002 0 : psize_with_addpath = psize + 4;
1003 : else
1004 : psize_with_addpath = psize;
1005 :
1006 16 : if (STREAM_WRITEABLE(s) < (psize_with_addpath + sizeof(uint8_t))) {
1007 0 : STREAM_BOUND_WARN(s, "put");
1008 : return 0;
1009 : }
1010 :
1011 16 : if (addpath_capable) {
1012 0 : s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
1013 0 : s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
1014 0 : s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
1015 0 : s->data[s->endp++] = (uint8_t)addpath_tx_id;
1016 : }
1017 :
1018 16 : s->data[s->endp++] = p->prefixlen;
1019 16 : memcpy(s->data + s->endp, &p->u.prefix, psize);
1020 16 : s->endp += psize;
1021 :
1022 16 : return psize;
1023 : }
1024 :
1025 0 : int stream_put_prefix(struct stream *s, const struct prefix *p)
1026 : {
1027 0 : return stream_put_prefix_addpath(s, p, 0, 0);
1028 : }
1029 :
1030 : /* Put NLRI with label */
1031 0 : int stream_put_labeled_prefix(struct stream *s, const struct prefix *p,
1032 : mpls_label_t *label, bool addpath_capable,
1033 : uint32_t addpath_tx_id)
1034 : {
1035 0 : size_t psize;
1036 0 : size_t psize_with_addpath;
1037 0 : uint8_t *label_pnt = (uint8_t *)label;
1038 :
1039 0 : STREAM_VERIFY_SANE(s);
1040 :
1041 0 : psize = PSIZE(p->prefixlen);
1042 0 : psize_with_addpath = psize + (addpath_capable ? 4 : 0);
1043 :
1044 0 : if (STREAM_WRITEABLE(s) < (psize_with_addpath + 3)) {
1045 0 : STREAM_BOUND_WARN(s, "put");
1046 : return 0;
1047 : }
1048 :
1049 0 : if (addpath_capable) {
1050 0 : s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 24);
1051 0 : s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 16);
1052 0 : s->data[s->endp++] = (uint8_t)(addpath_tx_id >> 8);
1053 0 : s->data[s->endp++] = (uint8_t)addpath_tx_id;
1054 : }
1055 :
1056 0 : stream_putc(s, (p->prefixlen + 24));
1057 0 : stream_putc(s, label_pnt[0]);
1058 0 : stream_putc(s, label_pnt[1]);
1059 0 : stream_putc(s, label_pnt[2]);
1060 0 : memcpy(s->data + s->endp, &p->u.prefix, psize);
1061 0 : s->endp += psize;
1062 :
1063 0 : return (psize + 3);
1064 : }
1065 :
1066 : /* Read size from fd. */
1067 0 : int stream_read(struct stream *s, int fd, size_t size)
1068 : {
1069 0 : int nbytes;
1070 :
1071 0 : STREAM_VERIFY_SANE(s);
1072 :
1073 0 : if (STREAM_WRITEABLE(s) < size) {
1074 0 : STREAM_BOUND_WARN(s, "put");
1075 : return 0;
1076 : }
1077 :
1078 0 : nbytes = readn(fd, s->data + s->endp, size);
1079 :
1080 0 : if (nbytes > 0)
1081 0 : s->endp += nbytes;
1082 :
1083 0 : return nbytes;
1084 : }
1085 :
1086 746 : ssize_t stream_read_try(struct stream *s, int fd, size_t size)
1087 : {
1088 746 : ssize_t nbytes;
1089 :
1090 746 : STREAM_VERIFY_SANE(s);
1091 :
1092 746 : if (STREAM_WRITEABLE(s) < size) {
1093 0 : STREAM_BOUND_WARN(s, "put");
1094 : /* Fatal (not transient) error, since retrying will not help
1095 : (stream is too small to contain the desired data). */
1096 : return -1;
1097 : }
1098 :
1099 746 : nbytes = read(fd, s->data + s->endp, size);
1100 746 : if (nbytes >= 0) {
1101 710 : s->endp += nbytes;
1102 710 : return nbytes;
1103 : }
1104 : /* Error: was it transient (return -2) or fatal (return -1)? */
1105 36 : if (ERRNO_IO_RETRY(errno))
1106 : return -2;
1107 0 : flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
1108 : safe_strerror(errno));
1109 0 : return -1;
1110 : }
1111 :
1112 : /* Read up to size bytes into the stream from the fd, using recvmsgfrom
1113 : * whose arguments match the remaining arguments to this function
1114 : */
1115 0 : ssize_t stream_recvfrom(struct stream *s, int fd, size_t size, int flags,
1116 : struct sockaddr *from, socklen_t *fromlen)
1117 : {
1118 0 : ssize_t nbytes;
1119 :
1120 0 : STREAM_VERIFY_SANE(s);
1121 :
1122 0 : if (STREAM_WRITEABLE(s) < size) {
1123 0 : STREAM_BOUND_WARN(s, "put");
1124 : /* Fatal (not transient) error, since retrying will not help
1125 : (stream is too small to contain the desired data). */
1126 : return -1;
1127 : }
1128 :
1129 0 : nbytes = recvfrom(fd, s->data + s->endp, size, flags, from, fromlen);
1130 0 : if (nbytes >= 0) {
1131 0 : s->endp += nbytes;
1132 0 : return nbytes;
1133 : }
1134 : /* Error: was it transient (return -2) or fatal (return -1)? */
1135 0 : if (ERRNO_IO_RETRY(errno))
1136 : return -2;
1137 0 : flog_err(EC_LIB_SOCKET, "%s: read failed on fd %d: %s", __func__, fd,
1138 : safe_strerror(errno));
1139 0 : return -1;
1140 : }
1141 :
1142 : /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
1143 : * from endp.
1144 : * First iovec will be used to receive the data.
1145 : * Stream need not be empty.
1146 : */
1147 0 : ssize_t stream_recvmsg(struct stream *s, int fd, struct msghdr *msgh, int flags,
1148 : size_t size)
1149 : {
1150 0 : int nbytes;
1151 0 : struct iovec *iov;
1152 :
1153 0 : STREAM_VERIFY_SANE(s);
1154 0 : assert(msgh->msg_iovlen > 0);
1155 :
1156 0 : if (STREAM_WRITEABLE(s) < size) {
1157 0 : STREAM_BOUND_WARN(s, "put");
1158 : /* This is a logic error in the calling code: the stream is too
1159 : small
1160 : to hold the desired data! */
1161 : return -1;
1162 : }
1163 :
1164 0 : iov = &(msgh->msg_iov[0]);
1165 0 : iov->iov_base = (s->data + s->endp);
1166 0 : iov->iov_len = size;
1167 :
1168 0 : nbytes = recvmsg(fd, msgh, flags);
1169 :
1170 0 : if (nbytes > 0)
1171 0 : s->endp += nbytes;
1172 :
1173 0 : return nbytes;
1174 : }
1175 :
1176 : /* Write data to buffer. */
1177 26 : size_t stream_write(struct stream *s, const void *ptr, size_t size)
1178 : {
1179 :
1180 26 : CHECK_SIZE(s, size);
1181 :
1182 26 : STREAM_VERIFY_SANE(s);
1183 :
1184 26 : if (STREAM_WRITEABLE(s) < size) {
1185 0 : STREAM_BOUND_WARN(s, "put");
1186 : return 0;
1187 : }
1188 :
1189 26 : memcpy(s->data + s->endp, ptr, size);
1190 26 : s->endp += size;
1191 :
1192 26 : return size;
1193 : }
1194 :
1195 : /* Return current read pointer.
1196 : * DEPRECATED!
1197 : * Use stream_get_pnt_to if you must, but decoding streams properly
1198 : * is preferred
1199 : */
1200 355 : uint8_t *stream_pnt(struct stream *s)
1201 : {
1202 355 : STREAM_VERIFY_SANE(s);
1203 355 : return s->data + s->getp;
1204 : }
1205 :
1206 : /* Check does this stream empty? */
1207 96 : int stream_empty(struct stream *s)
1208 : {
1209 96 : STREAM_VERIFY_SANE(s);
1210 :
1211 96 : return (s->endp == 0);
1212 : }
1213 :
1214 : /* Reset stream. */
1215 662 : void stream_reset(struct stream *s)
1216 : {
1217 662 : STREAM_VERIFY_SANE(s);
1218 :
1219 662 : s->getp = s->endp = 0;
1220 662 : }
1221 :
1222 : /* Write stream contens to the file discriptor. */
1223 0 : int stream_flush(struct stream *s, int fd)
1224 : {
1225 0 : int nbytes;
1226 :
1227 0 : STREAM_VERIFY_SANE(s);
1228 :
1229 0 : nbytes = write(fd, s->data + s->getp, s->endp - s->getp);
1230 :
1231 0 : return nbytes;
1232 : }
1233 :
1234 0 : void stream_hexdump(const struct stream *s)
1235 : {
1236 0 : zlog_hexdump(s->data, s->endp);
1237 0 : }
1238 :
1239 : /* Stream first in first out queue. */
1240 :
1241 173 : struct stream_fifo *stream_fifo_new(void)
1242 : {
1243 173 : struct stream_fifo *new;
1244 :
1245 173 : new = XMALLOC(MTYPE_STREAM_FIFO, sizeof(struct stream_fifo));
1246 173 : stream_fifo_init(new);
1247 173 : return new;
1248 : }
1249 :
1250 215 : void stream_fifo_init(struct stream_fifo *fifo)
1251 : {
1252 215 : memset(fifo, 0, sizeof(struct stream_fifo));
1253 215 : pthread_mutex_init(&fifo->mtx, NULL);
1254 215 : }
1255 :
1256 : /* Add new stream to fifo. */
1257 935 : void stream_fifo_push(struct stream_fifo *fifo, struct stream *s)
1258 : {
1259 : #if defined DEV_BUILD
1260 : size_t max, curmax;
1261 : #endif
1262 :
1263 935 : if (fifo->tail)
1264 654 : fifo->tail->next = s;
1265 : else
1266 281 : fifo->head = s;
1267 :
1268 935 : fifo->tail = s;
1269 935 : fifo->tail->next = NULL;
1270 : #if !defined DEV_BUILD
1271 935 : atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1272 : #else
1273 : max = atomic_fetch_add_explicit(&fifo->count, 1, memory_order_release);
1274 : curmax = atomic_load_explicit(&fifo->max_count, memory_order_relaxed);
1275 : if (max > curmax)
1276 : atomic_store_explicit(&fifo->max_count, max,
1277 : memory_order_relaxed);
1278 : #endif
1279 935 : }
1280 :
1281 0 : void stream_fifo_push_safe(struct stream_fifo *fifo, struct stream *s)
1282 : {
1283 0 : frr_with_mutex (&fifo->mtx) {
1284 0 : stream_fifo_push(fifo, s);
1285 : }
1286 0 : }
1287 :
1288 : /* Delete first stream from fifo. */
1289 951 : struct stream *stream_fifo_pop(struct stream_fifo *fifo)
1290 : {
1291 951 : struct stream *s;
1292 :
1293 951 : s = fifo->head;
1294 :
1295 951 : if (s) {
1296 924 : fifo->head = s->next;
1297 :
1298 924 : if (fifo->head == NULL)
1299 276 : fifo->tail = NULL;
1300 :
1301 924 : atomic_fetch_sub_explicit(&fifo->count, 1,
1302 : memory_order_release);
1303 :
1304 : /* ensure stream is scrubbed of references to this fifo */
1305 924 : s->next = NULL;
1306 : }
1307 :
1308 951 : return s;
1309 : }
1310 :
1311 0 : struct stream *stream_fifo_pop_safe(struct stream_fifo *fifo)
1312 : {
1313 0 : struct stream *ret;
1314 :
1315 0 : frr_with_mutex (&fifo->mtx) {
1316 0 : ret = stream_fifo_pop(fifo);
1317 : }
1318 :
1319 0 : return ret;
1320 : }
1321 :
1322 1064 : struct stream *stream_fifo_head(struct stream_fifo *fifo)
1323 : {
1324 1064 : return fifo->head;
1325 : }
1326 :
1327 0 : struct stream *stream_fifo_head_safe(struct stream_fifo *fifo)
1328 : {
1329 0 : struct stream *ret;
1330 :
1331 0 : frr_with_mutex (&fifo->mtx) {
1332 0 : ret = stream_fifo_head(fifo);
1333 : }
1334 :
1335 0 : return ret;
1336 : }
1337 :
1338 258 : void stream_fifo_clean(struct stream_fifo *fifo)
1339 : {
1340 258 : struct stream *s;
1341 258 : struct stream *next;
1342 :
1343 265 : for (s = fifo->head; s; s = next) {
1344 7 : next = s->next;
1345 7 : stream_free(s);
1346 : }
1347 258 : fifo->head = fifo->tail = NULL;
1348 258 : atomic_store_explicit(&fifo->count, 0, memory_order_release);
1349 258 : }
1350 :
1351 0 : void stream_fifo_clean_safe(struct stream_fifo *fifo)
1352 : {
1353 0 : frr_with_mutex (&fifo->mtx) {
1354 0 : stream_fifo_clean(fifo);
1355 : }
1356 0 : }
1357 :
1358 29 : size_t stream_fifo_count_safe(struct stream_fifo *fifo)
1359 : {
1360 29 : return atomic_load_explicit(&fifo->count, memory_order_acquire);
1361 : }
1362 :
1363 212 : void stream_fifo_deinit(struct stream_fifo *fifo)
1364 : {
1365 212 : stream_fifo_clean(fifo);
1366 212 : pthread_mutex_destroy(&fifo->mtx);
1367 212 : }
1368 :
1369 170 : void stream_fifo_free(struct stream_fifo *fifo)
1370 : {
1371 170 : stream_fifo_deinit(fifo);
1372 170 : XFREE(MTYPE_STREAM_FIFO, fifo);
1373 170 : }
1374 :
1375 0 : void stream_pulldown(struct stream *s)
1376 : {
1377 0 : size_t rlen = STREAM_READABLE(s);
1378 :
1379 : /* No more data, so just move the pointers. */
1380 0 : if (rlen == 0) {
1381 0 : stream_reset(s);
1382 0 : return;
1383 : }
1384 :
1385 : /* Move the available data to the beginning. */
1386 0 : memmove(s->data, &s->data[s->getp], rlen);
1387 0 : s->getp = 0;
1388 0 : s->endp = rlen;
1389 : }
|