root/trunk/varnish-cache/bin/varnishd/cache_acceptor.c @ 4565

Revision 4565, 10.9 KB (checked in by phk, 7 months ago)

Pretend to use the variables, even if the kernel is deficient.

  • Property svn:keywords set to Id
Line 
1/*-
2 * Copyright (c) 2006 Verdens Gang AS
3 * Copyright (c) 2006-2010 Redpill Linpro AS
4 * All rights reserved.
5 *
6 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 *
29 */
30
31#include "config.h"
32
33#include "svnid.h"
34SVNID("$Id$")
35
36#include <stdio.h>
37#include <errno.h>
38#include <poll.h>
39#include <string.h>
40#include <stdlib.h>
41#include <unistd.h>
42
43#include <sys/uio.h>
44#include <sys/types.h>
45#include <sys/socket.h>
46
47#include "cli.h"
48#include "cli_priv.h"
49#include "shmlog.h"
50#include "cache.h"
51#include "cache_waiter.h"
52
53static struct waiter * const vca_waiters[] = {
54#if defined(HAVE_KQUEUE)
55        &waiter_kqueue,
56#endif
57#if defined(HAVE_EPOLL_CTL)
58        &waiter_epoll,
59#endif
60#if defined(HAVE_PORT_CREATE)
61        &waiter_ports,
62#endif
63        &waiter_poll,
64        NULL,
65};
66
67static struct waiter const *vca_act;
68
69pthread_t               VCA_thread;
70static struct timeval   tv_sndtimeo;
71static struct timeval   tv_rcvtimeo;
72
73/*--------------------------------------------------------------------
74 * Report waiter name to panics
75 */
76
77const char *
78VCA_waiter_name(void)
79{
80
81        if (vca_act != NULL)
82                return (vca_act->name);
83        else
84                return ("no_waiter");
85}
86
87
88/*--------------------------------------------------------------------
89 * We want to get out of any kind of touble-hit TCP connections as fast
90 * as absolutely possible, so we set them LINGER enabled with zero timeout,
91 * so that even if there are outstanding write data on the socket, a close(2)
92 * will return immediately.
93 */
94static const struct linger linger = {
95        .l_onoff        =       1,
96};
97
98static unsigned char    need_sndtimeo, need_rcvtimeo, need_linger, need_test;
99
100int vca_pipes[2] = { -1, -1 };
101
102static void
103sock_test(int fd)
104{
105        struct linger lin;
106        struct timeval tv;
107        socklen_t l;
108
109        l = sizeof lin;
110        AZ(getsockopt(fd, SOL_SOCKET, SO_LINGER, &lin, &l));
111        assert(l == sizeof lin);
112        if (memcmp(&lin, &linger, l))
113                need_linger = 1;
114
115#ifdef SO_SNDTIMEO_WORKS
116        l = sizeof tv;
117        AZ(getsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, &l));
118        assert(l == sizeof tv);
119        if (memcmp(&tv, &tv_sndtimeo, l))
120                need_sndtimeo = 1;
121#else
122        (void)tv;
123        (void)tv_sndtimeo;
124        (void)need_sndtimeo;
125#endif
126
127#ifdef SO_RCVTIMEO_WORKS
128        l = sizeof tv;
129        AZ(getsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, &l));
130        assert(l == sizeof tv);
131        if (memcmp(&tv, &tv_rcvtimeo, l))
132                need_rcvtimeo = 1;
133#else
134        (void)tv;
135        (void)tv_rcvtimeo;
136        (void)need_rcvtimeo;
137#endif
138
139        need_test = 0;
140}
141
142/*--------------------------------------------------------------------
143 * Called once the workerthread gets hold of the session, to do setup
144 * setup overhead, we don't want to bother the acceptor thread with.
145 */
146
147void
148VCA_Prep(struct sess *sp)
149{
150        char addr[TCP_ADDRBUFSIZE];
151        char port[TCP_PORTBUFSIZE];
152
153        TCP_name(sp->sockaddr, sp->sockaddrlen,
154            addr, sizeof addr, port, sizeof port);
155        sp->addr = WS_Dup(sp->ws, addr);
156        sp->port = WS_Dup(sp->ws, port);
157        if (params->log_local_addr) {
158                AZ(getsockname(sp->fd, sp->mysockaddr, &sp->mysockaddrlen));
159                TCP_name(sp->mysockaddr, sp->mysockaddrlen,
160                    addr, sizeof addr, port, sizeof port);
161                VSL(SLT_SessionOpen, sp->fd, "%s %s %s %s",
162                    sp->addr, sp->port, addr, port);
163        } else {
164                VSL(SLT_SessionOpen, sp->fd, "%s %s %s",
165                    sp->addr, sp->port, sp->mylsock->name);
166        }
167        sp->acct.first = sp->t_open;
168        if (need_test)
169                sock_test(sp->fd);
170        if (need_linger)
171                AZ(setsockopt(sp->fd, SOL_SOCKET, SO_LINGER,
172                    &linger, sizeof linger));
173#ifdef SO_SNDTIMEO_WORKS
174        if (need_sndtimeo)
175                AZ(setsockopt(sp->fd, SOL_SOCKET, SO_SNDTIMEO,
176                    &tv_sndtimeo, sizeof tv_sndtimeo));
177#endif
178#ifdef SO_RCVTIMEO_WORKS
179        if (need_rcvtimeo)
180                AZ(setsockopt(sp->fd, SOL_SOCKET, SO_RCVTIMEO,
181                    &tv_rcvtimeo, sizeof tv_rcvtimeo));
182#endif
183}
184
185/*--------------------------------------------------------------------*/
186
187static void *
188vca_acct(void *arg)
189{
190        struct sess *sp;
191        socklen_t l;
192        struct sockaddr_storage addr_s;
193        struct sockaddr *addr;
194        int i;
195        struct pollfd *pfd;
196        struct listen_sock *ls;
197        unsigned u;
198        double t0, now, pace;
199
200        THR_SetName("cache-acceptor");
201        (void)arg;
202
203        /* Set up the poll argument */
204        pfd = calloc(sizeof *pfd, heritage.nsocks);
205        AN(pfd);
206        i = 0;
207        VTAILQ_FOREACH(ls, &heritage.socks, list) {
208                if (ls->sock < 0)
209                        continue;
210                AZ(listen(ls->sock, params->listen_depth));
211                AZ(setsockopt(ls->sock, SOL_SOCKET, SO_LINGER,
212                    &linger, sizeof linger));
213                pfd[i].events = POLLIN;
214                pfd[i++].fd = ls->sock;
215        }
216
217        need_test = 1;
218        pace = 0;
219        t0 = TIM_real();
220        while (1) {
221#ifdef SO_SNDTIMEO_WORKS
222                if (params->send_timeout != tv_sndtimeo.tv_sec) {
223                        need_test = 1;
224                        tv_sndtimeo.tv_sec = params->send_timeout;
225                        VTAILQ_FOREACH(ls, &heritage.socks, list) {
226                                if (ls->sock < 0)
227                                        continue;
228                                AZ(setsockopt(ls->sock, SOL_SOCKET,
229                                    SO_SNDTIMEO,
230                                    &tv_sndtimeo, sizeof tv_sndtimeo));
231                        }
232                }
233#endif
234#ifdef SO_RCVTIMEO_WORKS
235                if (params->sess_timeout != tv_rcvtimeo.tv_sec) {
236                        need_test = 1;
237                        tv_rcvtimeo.tv_sec = params->sess_timeout;
238                        VTAILQ_FOREACH(ls, &heritage.socks, list) {
239                                if (ls->sock < 0)
240                                        continue;
241                                AZ(setsockopt(ls->sock, SOL_SOCKET,
242                                    SO_RCVTIMEO,
243                                    &tv_rcvtimeo, sizeof tv_rcvtimeo));
244                        }
245                }
246#endif
247                /* Bound the pacing delay by parameter */
248                if (pace > params->acceptor_sleep_max)
249                        pace = params->acceptor_sleep_max;
250                if (pace < params->acceptor_sleep_incr)
251                        pace = 0.0;
252                if (pace > 0.0)
253                        TIM_sleep(pace);
254                i = poll(pfd, heritage.nsocks, 1000);
255                now = TIM_real();
256                VSL_stats->uptime = (uint64_t)(now - t0);
257                u = 0;
258                VTAILQ_FOREACH(ls, &heritage.socks, list) {
259                        if (ls->sock < 0)
260                                continue;
261                        if (pfd[u++].revents == 0)
262                                continue;
263                        VSL_stats->client_conn++;
264                        l = sizeof addr_s;
265                        addr = (void*)&addr_s;
266                        i = accept(ls->sock, addr, &l);
267                        if (i < 0) {
268                                VSL_stats->accept_fail++;
269                                switch (errno) {
270                                case EAGAIN:
271                                case ECONNABORTED:
272                                        break;
273                                case EMFILE:
274                                        VSL(SLT_Debug, ls->sock,
275                                            "Too many open files "
276                                            "when accept(2)ing. Sleeping.");
277                                        pace += params->acceptor_sleep_incr;
278                                        break;
279                                default:
280                                        VSL(SLT_Debug, ls->sock,
281                                            "Accept failed: %s",
282                                            strerror(errno));
283                                        pace += params->acceptor_sleep_incr;
284                                        break;
285                                }
286                                continue;
287                        }
288                        sp = SES_New();
289                        if (sp == NULL) {
290                                AZ(close(i));
291                                VSL_stats->client_drop++;
292                                pace += params->acceptor_sleep_incr;
293                                continue;
294                        }
295                        sp->fd = i;
296                        sp->id = i;
297                        sp->t_open = now;
298                        sp->t_end = now;
299                        sp->mylsock = ls;
300                        assert(l < sp->sockaddrlen);
301                        memcpy(sp->sockaddr, addr, l);
302                        sp->sockaddrlen = l;
303
304                        sp->step = STP_FIRST;
305                        if (WRK_QueueSession(sp)) {
306                                VSL_stats->client_drop++;
307                                pace += params->acceptor_sleep_incr;
308                        } else {
309                                pace *= params->acceptor_sleep_decay;
310                        }
311                }
312        }
313        NEEDLESS_RETURN(NULL);
314}
315
316/*--------------------------------------------------------------------*/
317
318void
319vca_handover(struct sess *sp, int status)
320{
321
322        switch (status) {
323        case -2:
324                vca_close_session(sp, "blast");
325                SES_Delete(sp);
326                break;
327        case -1:
328                vca_close_session(sp, "no request");
329                SES_Delete(sp);
330                break;
331        case 1:
332                sp->step = STP_START;
333                if (WRK_QueueSession(sp))
334                        VSL_stats->client_drop_late++;
335                break;
336        default:
337                INCOMPL();
338        }
339}
340
341/*--------------------------------------------------------------------*/
342
343void
344vca_close_session(struct sess *sp, const char *why)
345{
346        int i;
347
348        VSL(SLT_SessionClose, sp->id, "%s", why);
349        if (sp->fd >= 0) {
350                i = close(sp->fd);
351                assert(i == 0 || errno != EBADF);       /* XXX EINVAL seen */
352        }
353        sp->fd = -1;
354}
355
356void
357vca_return_session(struct sess *sp)
358{
359
360        CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
361        AZ(sp->obj);
362        AZ(sp->vcl);
363        assert(sp->fd >= 0);
364        /*
365         * Set nonblocking in the worker-thread, before passing to the
366         * acceptor thread, to reduce syscall density of the latter.
367         */
368        TCP_nonblocking(sp->fd);
369        if (vca_act->pass == NULL)
370                assert(sizeof sp == write(vca_pipes[1], &sp, sizeof sp));
371        else
372                vca_act->pass(sp);
373}
374
375/*--------------------------------------------------------------------*/
376
377static void
378ccf_start(struct cli *cli, const char * const *av, void *priv)
379{
380
381        (void)cli;
382        (void)av;
383        (void)priv;
384
385        if (vca_act == NULL)
386                vca_act = vca_waiters[0];
387
388        AN(vca_act);
389        AN(vca_act->name);
390
391        if (vca_act->pass == NULL)
392                AZ(pipe(vca_pipes));
393        vca_act->init();
394        AZ(pthread_create(&VCA_thread, NULL, vca_acct, NULL));
395        VSL(SLT_Debug, 0, "Acceptor is %s", vca_act->name);
396}
397
398/*--------------------------------------------------------------------*/
399
400static void
401ccf_listen_address(struct cli *cli, const char * const *av, void *priv)
402{
403        struct listen_sock *ls;
404        char h[32], p[32];
405
406        (void)cli;
407        (void)av;
408        (void)priv;
409        VTAILQ_FOREACH(ls, &heritage.socks, list) {
410                if (ls->sock < 0)
411                        continue;
412                TCP_myname(ls->sock, h, sizeof h, p, sizeof p);
413                cli_out(cli, "%s %s\n", h, p);
414        }
415}
416
417/*--------------------------------------------------------------------*/
418
419static struct cli_proto vca_cmds[] = {
420        { CLI_SERVER_START,     "i", ccf_start },
421        { "debug.listen_address",
422            "debug.listen_address",
423            "Report the actual listen address\n", 0, 0,
424            "d", ccf_listen_address, NULL },
425        { NULL }
426};
427
428void
429VCA_Init(void)
430{
431
432        CLI_AddFuncs(vca_cmds);
433}
434
435void
436VCA_Shutdown(void)
437{
438        struct listen_sock *ls;
439        int i;
440
441        VTAILQ_FOREACH(ls, &heritage.socks, list) {
442                if (ls->sock < 0)
443                        continue;
444                i = ls->sock;
445                ls->sock = -1;
446                (void)close(i);
447        }
448}
449
450void
451VCA_tweak_waiter(struct cli *cli, const char *arg)
452{
453        int i;
454
455         ASSERT_MGT();
456
457        if (arg == NULL) {
458                if (vca_act == NULL)
459                        cli_out(cli, "default");
460                else
461                        cli_out(cli, "%s", vca_act->name);
462
463                cli_out(cli, " (");
464                for (i = 0; vca_waiters[i] != NULL; i++)
465                        cli_out(cli, "%s%s", i == 0 ? "" : ", ",
466                            vca_waiters[i]->name);
467                cli_out(cli, ")");
468                return;
469        }
470        if (!strcmp(arg, "default")) {
471                vca_act = NULL;
472                return;
473        }
474        for (i = 0; vca_waiters[i]; i++) {
475                if (!strcmp(arg, vca_waiters[i]->name)) {
476                        vca_act = vca_waiters[i];
477                        return;
478                }
479        }
480        cli_out(cli, "Unknown waiter");
481        cli_result(cli, CLIS_PARAM);
482}
Note: See TracBrowser for help on using the browser.