Line data Source code
1 : /*
2 : * Pull-driven write event handler
3 : * Copyright (C) 2019 David Lamparter
4 : *
5 : * This program is free software; you can redistribute it and/or modify it
6 : * under the terms of the GNU General Public License as published by the Free
7 : * Software Foundation; either version 2 of the License, or (at your option)
8 : * any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful, but WITHOUT
11 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 : * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 : * more details.
14 : *
15 : * You should have received a copy of the GNU General Public License along
16 : * with this program; see the file COPYING; if not, write to the Free Software
17 : * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 : */
19 :
20 : #include "zebra.h"
21 :
22 : #include "pullwr.h"
23 : #include "memory.h"
24 : #include "monotime.h"
25 :
26 : /* defaults */
27 : #define PULLWR_THRESH 16384 /* size at which we start to call write() */
28 : #define PULLWR_MAXSPIN 2500 /* max µs to spend grabbing more data */
29 :
30 : struct pullwr {
31 : int fd;
32 : struct thread_master *tm;
33 : /* writer == NULL <=> we're idle */
34 : struct thread *writer;
35 :
36 : void *arg;
37 : void (*fill)(void *, struct pullwr *);
38 : void (*err)(void *, struct pullwr *, bool);
39 :
40 : /* ring buffer (although it's "un-ringed" on resizing, it WILL wrap
41 : * around if data is trickling in while keeping it at a constant size)
42 : */
43 : size_t bufsz, valid, pos;
44 : uint64_t total_written;
45 : char *buffer;
46 :
47 : size_t thresh; /* PULLWR_THRESH */
48 : int64_t maxspin; /* PULLWR_MAXSPIN */
49 : };
50 :
51 48 : DEFINE_MTYPE_STATIC(LIB, PULLWR_HEAD, "pull-driven write controller");
52 48 : DEFINE_MTYPE_STATIC(LIB, PULLWR_BUF, "pull-driven write buffer");
53 :
54 : static void pullwr_run(struct thread *t);
55 :
56 0 : struct pullwr *_pullwr_new(struct thread_master *tm, int fd,
57 : void *arg,
58 : void (*fill)(void *, struct pullwr *),
59 : void (*err)(void *, struct pullwr *, bool))
60 : {
61 0 : struct pullwr *pullwr;
62 :
63 0 : pullwr = XCALLOC(MTYPE_PULLWR_HEAD, sizeof(*pullwr));
64 0 : pullwr->fd = fd;
65 0 : pullwr->tm = tm;
66 0 : pullwr->arg = arg;
67 0 : pullwr->fill = fill;
68 0 : pullwr->err = err;
69 :
70 0 : pullwr->thresh = PULLWR_THRESH;
71 0 : pullwr->maxspin = PULLWR_MAXSPIN;
72 :
73 0 : return pullwr;
74 : }
75 :
76 0 : void pullwr_del(struct pullwr *pullwr)
77 : {
78 0 : THREAD_OFF(pullwr->writer);
79 :
80 0 : XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
81 0 : XFREE(MTYPE_PULLWR_HEAD, pullwr);
82 0 : }
83 :
84 0 : void pullwr_cfg(struct pullwr *pullwr, int64_t max_spin_usec,
85 : size_t write_threshold)
86 : {
87 0 : pullwr->maxspin = max_spin_usec ?: PULLWR_MAXSPIN;
88 0 : pullwr->thresh = write_threshold ?: PULLWR_THRESH;
89 0 : }
90 :
91 0 : void pullwr_bump(struct pullwr *pullwr)
92 : {
93 0 : if (pullwr->writer)
94 : return;
95 :
96 0 : thread_add_timer(pullwr->tm, pullwr_run, pullwr, 0, &pullwr->writer);
97 : }
98 :
99 0 : static size_t pullwr_iov(struct pullwr *pullwr, struct iovec *iov)
100 : {
101 0 : size_t len1;
102 :
103 0 : if (pullwr->valid == 0)
104 : return 0;
105 :
106 0 : if (pullwr->pos + pullwr->valid <= pullwr->bufsz) {
107 0 : iov[0].iov_base = pullwr->buffer + pullwr->pos;
108 0 : iov[0].iov_len = pullwr->valid;
109 0 : return 1;
110 : }
111 :
112 0 : len1 = pullwr->bufsz - pullwr->pos;
113 :
114 0 : iov[0].iov_base = pullwr->buffer + pullwr->pos;
115 0 : iov[0].iov_len = len1;
116 0 : iov[1].iov_base = pullwr->buffer;
117 0 : iov[1].iov_len = pullwr->valid - len1;
118 0 : return 2;
119 : }
120 :
121 0 : static void pullwr_resize(struct pullwr *pullwr, size_t need)
122 : {
123 0 : struct iovec iov[2];
124 0 : size_t niov, newsize;
125 0 : char *newbuf;
126 :
127 : /* the buffer is maintained at pullwr->thresh * 2 since we'll be
128 : * trying to fill it as long as it's anywhere below pullwr->thresh.
129 : * That means we frequently end up a little short of it and then write
130 : * something that goes over the threshold. So, just use double.
131 : */
132 0 : if (need) {
133 : /* resize up */
134 0 : if (pullwr->bufsz - pullwr->valid >= need)
135 0 : return;
136 :
137 0 : newsize = MAX((pullwr->valid + need) * 2, pullwr->thresh * 2);
138 0 : newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
139 0 : } else if (!pullwr->valid) {
140 : /* resize down, buffer empty */
141 : newsize = 0;
142 : newbuf = NULL;
143 : } else {
144 : /* resize down */
145 0 : if (pullwr->bufsz - pullwr->valid < pullwr->thresh)
146 : return;
147 0 : newsize = MAX(pullwr->valid, pullwr->thresh * 2);
148 0 : newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
149 : }
150 :
151 0 : niov = pullwr_iov(pullwr, iov);
152 0 : if (niov >= 1) {
153 0 : memcpy(newbuf, iov[0].iov_base, iov[0].iov_len);
154 0 : if (niov >= 2)
155 0 : memcpy(newbuf + iov[0].iov_len,
156 0 : iov[1].iov_base, iov[1].iov_len);
157 : }
158 :
159 0 : XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
160 0 : pullwr->buffer = newbuf;
161 0 : pullwr->bufsz = newsize;
162 0 : pullwr->pos = 0;
163 : }
164 :
165 0 : void pullwr_write(struct pullwr *pullwr, const void *data, size_t len)
166 : {
167 0 : pullwr_resize(pullwr, len);
168 :
169 0 : if (pullwr->pos + pullwr->valid > pullwr->bufsz) {
170 0 : size_t pos;
171 :
172 0 : pos = (pullwr->pos + pullwr->valid) % pullwr->bufsz;
173 0 : memcpy(pullwr->buffer + pos, data, len);
174 : } else {
175 0 : size_t max1, len1;
176 0 : max1 = pullwr->bufsz - (pullwr->pos + pullwr->valid);
177 0 : max1 = MIN(max1, len);
178 :
179 0 : memcpy(pullwr->buffer + pullwr->pos + pullwr->valid,
180 : data, max1);
181 0 : len1 = len - max1;
182 :
183 0 : if (len1)
184 0 : memcpy(pullwr->buffer, (char *)data + max1, len1);
185 :
186 : }
187 0 : pullwr->valid += len;
188 :
189 0 : pullwr_bump(pullwr);
190 0 : }
191 :
192 0 : static void pullwr_run(struct thread *t)
193 : {
194 0 : struct pullwr *pullwr = THREAD_ARG(t);
195 0 : struct iovec iov[2];
196 0 : size_t niov, lastvalid;
197 0 : ssize_t nwr;
198 0 : struct timeval t0;
199 0 : bool maxspun = false;
200 :
201 0 : monotime(&t0);
202 :
203 0 : do {
204 0 : lastvalid = pullwr->valid - 1;
205 0 : while (pullwr->valid < pullwr->thresh
206 0 : && pullwr->valid != lastvalid
207 0 : && !maxspun) {
208 0 : lastvalid = pullwr->valid;
209 0 : pullwr->fill(pullwr->arg, pullwr);
210 :
211 : /* check after doing at least one fill() call so we
212 : * don't spin without making progress on slow boxes
213 : */
214 0 : if (!maxspun && monotime_since(&t0, NULL)
215 0 : >= pullwr->maxspin)
216 0 : maxspun = true;
217 : }
218 :
219 0 : if (pullwr->valid == 0) {
220 : /* we made a fill() call above that didn't feed any
221 : * data in, and we have nothing more queued, so we go
222 : * into idle, i.e. no calling thread_add_write()
223 : */
224 0 : pullwr_resize(pullwr, 0);
225 0 : return;
226 : }
227 :
228 0 : niov = pullwr_iov(pullwr, iov);
229 0 : assert(niov);
230 :
231 0 : nwr = writev(pullwr->fd, iov, niov);
232 0 : if (nwr < 0) {
233 0 : if (errno == EAGAIN || errno == EWOULDBLOCK)
234 : break;
235 0 : pullwr->err(pullwr->arg, pullwr, false);
236 0 : return;
237 : }
238 :
239 0 : if (nwr == 0) {
240 0 : pullwr->err(pullwr->arg, pullwr, true);
241 0 : return;
242 : }
243 :
244 0 : pullwr->total_written += nwr;
245 0 : pullwr->valid -= nwr;
246 0 : pullwr->pos += nwr;
247 0 : pullwr->pos %= pullwr->bufsz;
248 0 : } while (pullwr->valid == 0 && !maxspun);
249 : /* pullwr->valid != 0 implies we did an incomplete write, i.e. socket
250 : * is full and we go wait until it's available for writing again.
251 : */
252 :
253 0 : thread_add_write(pullwr->tm, pullwr_run, pullwr, pullwr->fd,
254 : &pullwr->writer);
255 :
256 : /* if we hit the time limit, just keep the buffer, we'll probably need
257 : * it anyway & another run is already coming up.
258 : */
259 0 : if (!maxspun)
260 0 : pullwr_resize(pullwr, 0);
261 : }
262 :
263 0 : void pullwr_stats(struct pullwr *pullwr, uint64_t *total_written,
264 : size_t *pending, size_t *kernel_pending)
265 : {
266 0 : int tmp;
267 :
268 0 : *total_written = pullwr->total_written;
269 0 : *pending = pullwr->valid;
270 :
271 0 : if (ioctl(pullwr->fd, TIOCOUTQ, &tmp) != 0)
272 0 : tmp = 0;
273 0 : *kernel_pending = tmp;
274 0 : }
|