Line data Source code
1 : // SPDX-License-Identifier: GPL-2.0-or-later
2 : /*
3 : * Pull-driven write event handler
4 : * Copyright (C) 2019 David Lamparter
5 : */
6 :
7 : #include "zebra.h"
8 :
9 : #include "pullwr.h"
10 : #include "memory.h"
11 : #include "monotime.h"
12 :
13 : /* defaults */
14 : #define PULLWR_THRESH 16384 /* size at which we start to call write() */
15 : #define PULLWR_MAXSPIN 2500 /* max µs to spend grabbing more data */
16 :
17 : struct pullwr {
18 : int fd;
19 : struct event_loop *tm;
20 : /* writer == NULL <=> we're idle */
21 : struct event *writer;
22 :
23 : void *arg;
24 : void (*fill)(void *, struct pullwr *);
25 : void (*err)(void *, struct pullwr *, bool);
26 :
27 : /* ring buffer (although it's "un-ringed" on resizing, it WILL wrap
28 : * around if data is trickling in while keeping it at a constant size)
29 : */
30 : size_t bufsz, valid, pos;
31 : uint64_t total_written;
32 : char *buffer;
33 :
34 : size_t thresh; /* PULLWR_THRESH */
35 : int64_t maxspin; /* PULLWR_MAXSPIN */
36 : };
37 :
38 12 : DEFINE_MTYPE_STATIC(LIB, PULLWR_HEAD, "pull-driven write controller");
39 12 : DEFINE_MTYPE_STATIC(LIB, PULLWR_BUF, "pull-driven write buffer");
40 :
41 : static void pullwr_run(struct event *t);
42 :
43 0 : struct pullwr *_pullwr_new(struct event_loop *tm, int fd, void *arg,
44 : void (*fill)(void *, struct pullwr *),
45 : void (*err)(void *, struct pullwr *, bool))
46 : {
47 0 : struct pullwr *pullwr;
48 :
49 0 : pullwr = XCALLOC(MTYPE_PULLWR_HEAD, sizeof(*pullwr));
50 0 : pullwr->fd = fd;
51 0 : pullwr->tm = tm;
52 0 : pullwr->arg = arg;
53 0 : pullwr->fill = fill;
54 0 : pullwr->err = err;
55 :
56 0 : pullwr->thresh = PULLWR_THRESH;
57 0 : pullwr->maxspin = PULLWR_MAXSPIN;
58 :
59 0 : return pullwr;
60 : }
61 :
62 0 : void pullwr_del(struct pullwr *pullwr)
63 : {
64 0 : EVENT_OFF(pullwr->writer);
65 :
66 0 : XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
67 0 : XFREE(MTYPE_PULLWR_HEAD, pullwr);
68 0 : }
69 :
70 0 : void pullwr_cfg(struct pullwr *pullwr, int64_t max_spin_usec,
71 : size_t write_threshold)
72 : {
73 0 : pullwr->maxspin = max_spin_usec ?: PULLWR_MAXSPIN;
74 0 : pullwr->thresh = write_threshold ?: PULLWR_THRESH;
75 0 : }
76 :
77 0 : void pullwr_bump(struct pullwr *pullwr)
78 : {
79 0 : if (pullwr->writer)
80 : return;
81 :
82 0 : event_add_timer(pullwr->tm, pullwr_run, pullwr, 0, &pullwr->writer);
83 : }
84 :
85 0 : static size_t pullwr_iov(struct pullwr *pullwr, struct iovec *iov)
86 : {
87 0 : size_t len1;
88 :
89 0 : if (pullwr->valid == 0)
90 : return 0;
91 :
92 0 : if (pullwr->pos + pullwr->valid <= pullwr->bufsz) {
93 0 : iov[0].iov_base = pullwr->buffer + pullwr->pos;
94 0 : iov[0].iov_len = pullwr->valid;
95 0 : return 1;
96 : }
97 :
98 0 : len1 = pullwr->bufsz - pullwr->pos;
99 :
100 0 : iov[0].iov_base = pullwr->buffer + pullwr->pos;
101 0 : iov[0].iov_len = len1;
102 0 : iov[1].iov_base = pullwr->buffer;
103 0 : iov[1].iov_len = pullwr->valid - len1;
104 0 : return 2;
105 : }
106 :
107 0 : static void pullwr_resize(struct pullwr *pullwr, size_t need)
108 : {
109 0 : struct iovec iov[2];
110 0 : size_t niov, newsize;
111 0 : char *newbuf;
112 :
113 : /* the buffer is maintained at pullwr->thresh * 2 since we'll be
114 : * trying to fill it as long as it's anywhere below pullwr->thresh.
115 : * That means we frequently end up a little short of it and then write
116 : * something that goes over the threshold. So, just use double.
117 : */
118 0 : if (need) {
119 : /* resize up */
120 0 : if (pullwr->bufsz - pullwr->valid >= need)
121 0 : return;
122 :
123 0 : newsize = MAX((pullwr->valid + need) * 2, pullwr->thresh * 2);
124 0 : newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
125 0 : } else if (!pullwr->valid) {
126 : /* resize down, buffer empty */
127 : newsize = 0;
128 : newbuf = NULL;
129 : } else {
130 : /* resize down */
131 0 : if (pullwr->bufsz - pullwr->valid < pullwr->thresh)
132 : return;
133 0 : newsize = MAX(pullwr->valid, pullwr->thresh * 2);
134 0 : newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize);
135 : }
136 :
137 0 : niov = pullwr_iov(pullwr, iov);
138 0 : if (niov >= 1) {
139 0 : memcpy(newbuf, iov[0].iov_base, iov[0].iov_len);
140 0 : if (niov >= 2)
141 0 : memcpy(newbuf + iov[0].iov_len,
142 0 : iov[1].iov_base, iov[1].iov_len);
143 : }
144 :
145 0 : XFREE(MTYPE_PULLWR_BUF, pullwr->buffer);
146 0 : pullwr->buffer = newbuf;
147 0 : pullwr->bufsz = newsize;
148 0 : pullwr->pos = 0;
149 : }
150 :
151 0 : void pullwr_write(struct pullwr *pullwr, const void *data, size_t len)
152 : {
153 0 : pullwr_resize(pullwr, len);
154 :
155 0 : if (pullwr->pos + pullwr->valid > pullwr->bufsz) {
156 0 : size_t pos;
157 :
158 0 : pos = (pullwr->pos + pullwr->valid) % pullwr->bufsz;
159 0 : memcpy(pullwr->buffer + pos, data, len);
160 : } else {
161 0 : size_t max1, len1;
162 0 : max1 = pullwr->bufsz - (pullwr->pos + pullwr->valid);
163 0 : max1 = MIN(max1, len);
164 :
165 0 : memcpy(pullwr->buffer + pullwr->pos + pullwr->valid,
166 : data, max1);
167 0 : len1 = len - max1;
168 :
169 0 : if (len1)
170 0 : memcpy(pullwr->buffer, (char *)data + max1, len1);
171 :
172 : }
173 0 : pullwr->valid += len;
174 :
175 0 : pullwr_bump(pullwr);
176 0 : }
177 :
178 0 : static void pullwr_run(struct event *t)
179 : {
180 0 : struct pullwr *pullwr = EVENT_ARG(t);
181 0 : struct iovec iov[2];
182 0 : size_t niov, lastvalid;
183 0 : ssize_t nwr;
184 0 : struct timeval t0;
185 0 : bool maxspun = false;
186 :
187 0 : monotime(&t0);
188 :
189 0 : do {
190 0 : lastvalid = pullwr->valid - 1;
191 0 : while (pullwr->valid < pullwr->thresh
192 0 : && pullwr->valid != lastvalid
193 0 : && !maxspun) {
194 0 : lastvalid = pullwr->valid;
195 0 : pullwr->fill(pullwr->arg, pullwr);
196 :
197 : /* check after doing at least one fill() call so we
198 : * don't spin without making progress on slow boxes
199 : */
200 0 : if (!maxspun && monotime_since(&t0, NULL)
201 0 : >= pullwr->maxspin)
202 0 : maxspun = true;
203 : }
204 :
205 0 : if (pullwr->valid == 0) {
206 : /* we made a fill() call above that didn't feed any
207 : * data in, and we have nothing more queued, so we go
208 : * into idle, i.e. no calling event_add_write()
209 : */
210 0 : pullwr_resize(pullwr, 0);
211 0 : return;
212 : }
213 :
214 0 : niov = pullwr_iov(pullwr, iov);
215 0 : assert(niov);
216 :
217 0 : nwr = writev(pullwr->fd, iov, niov);
218 0 : if (nwr < 0) {
219 0 : if (errno == EAGAIN || errno == EWOULDBLOCK)
220 : break;
221 0 : pullwr->err(pullwr->arg, pullwr, false);
222 0 : return;
223 : }
224 :
225 0 : if (nwr == 0) {
226 0 : pullwr->err(pullwr->arg, pullwr, true);
227 0 : return;
228 : }
229 :
230 0 : pullwr->total_written += nwr;
231 0 : pullwr->valid -= nwr;
232 0 : pullwr->pos += nwr;
233 0 : pullwr->pos %= pullwr->bufsz;
234 0 : } while (pullwr->valid == 0 && !maxspun);
235 : /* pullwr->valid != 0 implies we did an incomplete write, i.e. socket
236 : * is full and we go wait until it's available for writing again.
237 : */
238 :
239 0 : event_add_write(pullwr->tm, pullwr_run, pullwr, pullwr->fd,
240 : &pullwr->writer);
241 :
242 : /* if we hit the time limit, just keep the buffer, we'll probably need
243 : * it anyway & another run is already coming up.
244 : */
245 0 : if (!maxspun)
246 0 : pullwr_resize(pullwr, 0);
247 : }
248 :
249 0 : void pullwr_stats(struct pullwr *pullwr, uint64_t *total_written,
250 : size_t *pending, size_t *kernel_pending)
251 : {
252 0 : int tmp;
253 :
254 0 : *total_written = pullwr->total_written;
255 0 : *pending = pullwr->valid;
256 :
257 0 : if (ioctl(pullwr->fd, TIOCOUTQ, &tmp) != 0)
258 0 : tmp = 0;
259 0 : *kernel_pending = tmp;
260 0 : }
|