back to topotato report
topotato coverage report
Current view: top level - lib - pullwr.c (source / functions) Hit Total Coverage
Test: test_rip.py::RIPBasic Lines: 2 124 1.6 %
Date: 2023-02-24 18:39:46 Functions: 4 13 30.8 %

          Line data    Source code
       1             : /*
       2             :  * Pull-driven write event handler
       3             :  * Copyright (C) 2019  David Lamparter
       4             :  *
       5             :  * This program is free software; you can redistribute it and/or modify it
       6             :  * under the terms of the GNU General Public License as published by the Free
       7             :  * Software Foundation; either version 2 of the License, or (at your option)
       8             :  * any later version.
       9             :  *
      10             :  * This program is distributed in the hope that it will be useful, but WITHOUT
      11             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      12             :  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
      13             :  * more details.
      14             :  *
      15             :  * You should have received a copy of the GNU General Public License along
      16             :  * with this program; see the file COPYING; if not, write to the Free Software
      17             :  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
      18             :  */
      19             : 
      20             : #include "zebra.h"
      21             : 
      22             : #include "pullwr.h"
      23             : #include "memory.h"
      24             : #include "monotime.h"
      25             : 
      26             : /* defaults */
      27             : #define PULLWR_THRESH   16384   /* size at which we start to call write() */
      28             : #define PULLWR_MAXSPIN  2500    /* max µs to spend grabbing more data */
      29             : 
      30             : struct pullwr {
      31             :         int fd;
      32             :         struct thread_master *tm;
      33             :         /* writer == NULL <=> we're idle */
      34             :         struct thread *writer;
      35             : 
      36             :         void *arg;
      37             :         void (*fill)(void *, struct pullwr *);
      38             :         void (*err)(void *, struct pullwr *, bool);
      39             : 
      40             :         /* ring buffer (although it's "un-ringed" on resizing, it WILL wrap
      41             :          * around if data is trickling in while keeping it at a constant size)
      42             :          */
      43             :         size_t bufsz, valid, pos;
      44             :         uint64_t total_written;
      45             :         char *buffer;
      46             : 
      47             :         size_t thresh;          /* PULLWR_THRESH */
      48             :         int64_t maxspin;        /* PULLWR_MAXSPIN */
      49             : };
      50             : 
      51          36 : DEFINE_MTYPE_STATIC(LIB, PULLWR_HEAD, "pull-driven write controller");
      52          36 : DEFINE_MTYPE_STATIC(LIB, PULLWR_BUF,  "pull-driven write buffer");
      53             : 
      54             : static void pullwr_run(struct thread *t);
      55             : 
      56           0 : struct pullwr *_pullwr_new(struct thread_master *tm, int fd,
      57             :                 void *arg,
      58             :                 void (*fill)(void *, struct pullwr *),
      59             :                 void (*err)(void *, struct pullwr *, bool))
      60             : {
      61           0 :         struct pullwr *pullwr;
      62             : 
      63           0 :         pullwr = XCALLOC(MTYPE_PULLWR_HEAD, sizeof(*pullwr));
      64           0 :         pullwr->fd = fd;
      65           0 :         pullwr->tm = tm;
      66           0 :         pullwr->arg = arg;
      67           0 :         pullwr->fill = fill;
      68           0 :         pullwr->err = err;
      69             : 
      70           0 :         pullwr->thresh = PULLWR_THRESH;
      71           0 :         pullwr->maxspin = PULLWR_MAXSPIN;
      72             : 
      73           0 :         return pullwr;
      74             : }
      75             : 
      76           0 : void pullwr_del(struct pullwr *pullwr)
      77             : {
      78           0 :         THREAD_OFF(pullwr->writer);
      79             : 
      80           0 :         XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
      81           0 :         XFREE(MTYPE_PULLWR_HEAD, pullwr);
      82           0 : }
      83             : 
      84           0 : void pullwr_cfg(struct pullwr *pullwr, int64_t max_spin_usec,
      85             :                 size_t write_threshold)
      86             : {
      87           0 :         pullwr->maxspin = max_spin_usec ?: PULLWR_MAXSPIN;
      88           0 :         pullwr->thresh = write_threshold ?: PULLWR_THRESH;
      89           0 : }
      90             : 
      91           0 : void pullwr_bump(struct pullwr *pullwr)
      92             : {
      93           0 :         if (pullwr->writer)
      94             :                 return;
      95             : 
      96           0 :         thread_add_timer(pullwr->tm, pullwr_run, pullwr, 0, &pullwr->writer);
      97             : }
      98             : 
      99           0 : static size_t pullwr_iov(struct pullwr *pullwr, struct iovec *iov)
     100             : {
     101           0 :         size_t len1;
     102             : 
     103           0 :         if (pullwr->valid == 0)
     104             :                 return 0;
     105             : 
     106           0 :         if (pullwr->pos + pullwr->valid <= pullwr->bufsz) {
     107           0 :                 iov[0].iov_base = pullwr->buffer + pullwr->pos;
     108           0 :                 iov[0].iov_len = pullwr->valid;
     109           0 :                 return 1;
     110             :         }
     111             : 
     112           0 :         len1 = pullwr->bufsz - pullwr->pos;
     113             : 
     114           0 :         iov[0].iov_base = pullwr->buffer + pullwr->pos;
     115           0 :         iov[0].iov_len = len1;
     116           0 :         iov[1].iov_base = pullwr->buffer;
     117           0 :         iov[1].iov_len = pullwr->valid - len1;
     118           0 :         return 2;
     119             : }
     120             : 
     121           0 : static void pullwr_resize(struct pullwr *pullwr, size_t need)
     122             : {
     123           0 :         struct iovec iov[2];
     124           0 :         size_t niov, newsize;
     125           0 :         char *newbuf;
     126             : 
     127             :         /* the buffer is maintained at pullwr->thresh * 2 since we'll be
     128             :          * trying to fill it as long as it's anywhere below pullwr->thresh.
     129             :          * That means we frequently end up a little short of it and then write
     130             :          * something that goes over the threshold.  So, just use double.
     131             :          */
     132           0 :         if (need) {
     133             :                 /* resize up */
     134           0 :                 if (pullwr->bufsz - pullwr->valid >= need)
     135           0 :                         return;
     136             : 
     137           0 :                 newsize = MAX((pullwr->valid + need) * 2, pullwr->thresh * 2);
     138           0 :                 newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
     139           0 :         } else if (!pullwr->valid) {
     140             :                 /* resize down, buffer empty */
     141             :                 newsize = 0;
     142             :                 newbuf = NULL;
     143             :         } else {
     144             :                 /* resize down */
     145           0 :                 if (pullwr->bufsz - pullwr->valid < pullwr->thresh)
     146             :                         return;
     147           0 :                 newsize = MAX(pullwr->valid, pullwr->thresh * 2);
     148           0 :                 newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
     149             :         }
     150             : 
     151           0 :         niov = pullwr_iov(pullwr, iov);
     152           0 :         if (niov >= 1) {
     153           0 :                 memcpy(newbuf, iov[0].iov_base, iov[0].iov_len);
     154           0 :                 if (niov >= 2)
     155           0 :                         memcpy(newbuf + iov[0].iov_len,
     156           0 :                                 iov[1].iov_base, iov[1].iov_len);
     157             :         }
     158             : 
     159           0 :         XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
     160           0 :         pullwr->buffer = newbuf;
     161           0 :         pullwr->bufsz = newsize;
     162           0 :         pullwr->pos = 0;
     163             : }
     164             : 
     165           0 : void pullwr_write(struct pullwr *pullwr, const void *data, size_t len)
     166             : {
     167           0 :         pullwr_resize(pullwr, len);
     168             : 
     169           0 :         if (pullwr->pos + pullwr->valid > pullwr->bufsz) {
     170           0 :                 size_t pos;
     171             : 
     172           0 :                 pos = (pullwr->pos + pullwr->valid) % pullwr->bufsz;
     173           0 :                 memcpy(pullwr->buffer + pos, data, len);
     174             :         } else {
     175           0 :                 size_t max1, len1;
     176           0 :                 max1 = pullwr->bufsz - (pullwr->pos + pullwr->valid);
     177           0 :                 max1 = MIN(max1, len);
     178             : 
     179           0 :                 memcpy(pullwr->buffer + pullwr->pos + pullwr->valid,
     180             :                                 data, max1);
     181           0 :                 len1 = len - max1;
     182             : 
     183           0 :                 if (len1)
     184           0 :                         memcpy(pullwr->buffer, (char *)data + max1, len1);
     185             : 
     186             :         }
     187           0 :         pullwr->valid += len;
     188             : 
     189           0 :         pullwr_bump(pullwr);
     190           0 : }
     191             : 
     192           0 : static void pullwr_run(struct thread *t)
     193             : {
     194           0 :         struct pullwr *pullwr = THREAD_ARG(t);
     195           0 :         struct iovec iov[2];
     196           0 :         size_t niov, lastvalid;
     197           0 :         ssize_t nwr;
     198           0 :         struct timeval t0;
     199           0 :         bool maxspun = false;
     200             : 
     201           0 :         monotime(&t0);
     202             : 
     203           0 :         do {
     204           0 :                 lastvalid = pullwr->valid - 1;
     205           0 :                 while (pullwr->valid < pullwr->thresh
     206           0 :                                 && pullwr->valid != lastvalid
     207           0 :                                 && !maxspun) {
     208           0 :                         lastvalid = pullwr->valid;
     209           0 :                         pullwr->fill(pullwr->arg, pullwr);
     210             : 
     211             :                         /* check after doing at least one fill() call so we
     212             :                          * don't spin without making progress on slow boxes
     213             :                          */
     214           0 :                         if (!maxspun && monotime_since(&t0, NULL)
     215           0 :                                         >= pullwr->maxspin)
     216           0 :                                 maxspun = true;
     217             :                 }
     218             : 
     219           0 :                 if (pullwr->valid == 0) {
     220             :                         /* we made a fill() call above that didn't feed any
     221             :                          * data in, and we have nothing more queued, so we go
     222             :                          * into idle, i.e. no calling thread_add_write()
     223             :                          */
     224           0 :                         pullwr_resize(pullwr, 0);
     225           0 :                         return;
     226             :                 }
     227             : 
     228           0 :                 niov = pullwr_iov(pullwr, iov);
     229           0 :                 assert(niov);
     230             : 
     231           0 :                 nwr = writev(pullwr->fd, iov, niov);
     232           0 :                 if (nwr < 0) {
     233           0 :                         if (errno == EAGAIN || errno == EWOULDBLOCK)
     234             :                                 break;
     235           0 :                         pullwr->err(pullwr->arg, pullwr, false);
     236           0 :                         return;
     237             :                 }
     238             : 
     239           0 :                 if (nwr == 0) {
     240           0 :                         pullwr->err(pullwr->arg, pullwr, true);
     241           0 :                         return;
     242             :                 }
     243             : 
     244           0 :                 pullwr->total_written += nwr;
     245           0 :                 pullwr->valid -= nwr;
     246           0 :                 pullwr->pos += nwr;
     247           0 :                 pullwr->pos %= pullwr->bufsz;
     248           0 :         } while (pullwr->valid == 0 && !maxspun);
     249             :         /* pullwr->valid != 0 implies we did an incomplete write, i.e. socket
     250             :          * is full and we go wait until it's available for writing again.
     251             :          */
     252             : 
     253           0 :         thread_add_write(pullwr->tm, pullwr_run, pullwr, pullwr->fd,
     254             :                         &pullwr->writer);
     255             : 
     256             :         /* if we hit the time limit, just keep the buffer, we'll probably need
     257             :          * it anyway & another run is already coming up.
     258             :          */
     259           0 :         if (!maxspun)
     260           0 :                 pullwr_resize(pullwr, 0);
     261             : }
     262             : 
     263           0 : void pullwr_stats(struct pullwr *pullwr, uint64_t *total_written,
     264             :                   size_t *pending, size_t *kernel_pending)
     265             : {
     266           0 :         int tmp;
     267             : 
     268           0 :         *total_written = pullwr->total_written;
     269           0 :         *pending = pullwr->valid;
     270             : 
     271           0 :         if (ioctl(pullwr->fd, TIOCOUTQ, &tmp) != 0)
     272           0 :                 tmp = 0;
     273           0 :         *kernel_pending = tmp;
     274           0 : }

Generated by: LCOV version v1.16-topotato