source: bin/varnishreplay/varnishreplay.c @ 8aa1d8

Revision 8aa1d8, 16.2 KB checked in by Poul-Henning Kamp <phk@…>, 3 years ago (diff)

Eliminate nested <*.h> includes from include/*

Sort #includes according to rules which are for me to know and you
to guess.

  • Property mode set to 100644
Line 
1/*-
2 * Copyright (c) 2010 Varnish Software AS
3 * All rights reserved.
4 *
5 * Author: Cecilie Fritzvold <cecilihf@linpro.no>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 *    notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 *    notice, this list of conditions and the following disclaimer in the
14 *    documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 */
28
29#include "config.h"
30
31#include <sys/types.h>
32#include <sys/signal.h>
33#include <sys/uio.h>
34
35#include <ctype.h>
36#include <errno.h>
37#include <fcntl.h>
38#include <pthread.h>
39#include <signal.h>
40#include <stdarg.h>
41#include <stdio.h>
42#include <stdlib.h>
43#include <string.h>
44#include <unistd.h>
45
46#include "vapi/vsl.h"
47#include "vapi/vsm.h"
48#include "vas.h"
49#include "vcs.h"
50#include "vqueue.h"
51#include "vss.h"
52
53#define freez(x) do { if (x) free(x); x = NULL; } while (0);
54
55static struct vss_addr *addr_info;
56static int debug;
57
58static int
59isprefix(const char *str, const char *prefix, const char **next)
60{
61
62        while (*str && *prefix &&
63            tolower((int)*str) == tolower((int)*prefix))
64                ++str, ++prefix;
65        if (*str && *str != ' ')
66                return (0);
67        if (next) {
68                while (*str && *str == ' ')
69                        ++str;
70                *next = str;
71        }
72        return (1);
73}
74
75#if 0
76static int
77isequal(const char *str, const char *reference, const char *end)
78{
79
80        while (str < end && *str && *reference &&
81            tolower((int)*str) == tolower((int)*reference))
82                ++str, ++reference;
83        if (str != end || *reference)
84                return (0);
85        return (1);
86}
87#endif
88
89/*
90 * mailbox toolkit
91 */
92
93struct message {
94        enum VSL_tag_e tag;
95        size_t len;
96        char *ptr;
97        VSTAILQ_ENTRY(message) list;
98};
99
100#define MAX_MAILBOX_SIZE 30
101
102struct mailbox {
103        pthread_mutex_t lock;
104        pthread_cond_t has_mail;
105        int open;
106        VSTAILQ_HEAD(msgq_head, message) messages;
107};
108
109static void
110mailbox_create(struct mailbox *mbox)
111{
112
113        VSTAILQ_INIT(&mbox->messages);
114        pthread_mutex_init(&mbox->lock, NULL);
115        pthread_cond_init(&mbox->has_mail, NULL);
116        mbox->open = 1;
117}
118
119static void
120mailbox_destroy(struct mailbox *mbox)
121{
122        struct message *msg;
123
124        while ((msg = VSTAILQ_FIRST(&mbox->messages))) {
125                VSTAILQ_REMOVE_HEAD(&mbox->messages, list);
126                free(msg);
127        }
128        pthread_cond_destroy(&mbox->has_mail);
129        pthread_mutex_destroy(&mbox->lock);
130}
131
132static void
133mailbox_put(struct mailbox *mbox, struct message *msg)
134{
135
136        pthread_mutex_lock(&mbox->lock);
137        VSTAILQ_INSERT_TAIL(&mbox->messages, msg, list);
138        pthread_cond_signal(&mbox->has_mail);
139        pthread_mutex_unlock(&mbox->lock);
140}
141
142static struct message *
143mailbox_get(struct mailbox *mbox)
144{
145        struct message *msg;
146
147        pthread_mutex_lock(&mbox->lock);
148        while ((msg = VSTAILQ_FIRST(&mbox->messages)) == NULL && mbox->open)
149                pthread_cond_wait(&mbox->has_mail, &mbox->lock);
150        if (msg != NULL)
151                VSTAILQ_REMOVE_HEAD(&mbox->messages, list);
152        pthread_mutex_unlock(&mbox->lock);
153        return (msg);
154}
155
156static void
157mailbox_close(struct mailbox *mbox)
158{
159        pthread_mutex_lock(&mbox->lock);
160        mbox->open = 0;
161        pthread_cond_signal(&mbox->has_mail);
162        pthread_mutex_unlock(&mbox->lock);
163}
164
165/*
166 * thread toolkit
167 */
168
169static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
170
171static void
172thread_log(int lvl, int errcode, const char *fmt, ...)
173{
174        va_list ap;
175
176        if (lvl > debug)
177                return;
178        pthread_mutex_lock(&log_mutex);
179        fprintf(stderr, "%p ", (void *)pthread_self());
180        va_start(ap, fmt);
181        vfprintf(stderr, fmt, ap);
182        va_end(ap);
183        if (errcode)
184                fprintf(stderr, ": %s", strerror(errcode));
185        fprintf(stderr, "\n");
186        pthread_mutex_unlock(&log_mutex);
187}
188
189struct replay_thread {
190        pthread_t thread_id;
191        struct mailbox mbox;
192
193        int sock;
194
195        int fd;                 /* original fd from logs */
196
197        char *method;           /* Request method*/
198        char *proto;            /* Protocol version */
199        char *url;              /* URL and query string */
200        const char *conn;       /* Connection info (keep-alive, close) */
201        char *hdr[64];          /* Headers */
202        int nhdr;               /* Number of headers */
203        int bogus;              /* bogus request */
204
205        char arena[4096];
206        int top;
207        char line[2048];
208        char temp[2048];
209};
210
211static struct replay_thread **threads;
212static size_t nthreads;
213
214/*
215 * Clear thread state
216 */
217static void
218thread_clear(struct replay_thread *thr)
219{
220
221        thr->method = thr->proto = thr->url = NULL;
222        thr->conn = NULL;
223        memset(&thr->hdr, 0, sizeof thr->hdr);
224        thr->nhdr = 0;
225        thr->bogus = 0;
226        memset(&thr->arena, 0, sizeof thr->arena);
227        thr->top = 0;
228        memset(&thr->line, 0, sizeof thr->line);
229        memset(&thr->temp, 0, sizeof thr->temp);
230        if (thr->sock != -1)
231                close(thr->sock);
232        thr->sock = -1;
233}
234
235#define THREAD_FAIL ((struct replay_thread *)-1)
236
237static pthread_attr_t thread_attr;
238
239static struct replay_thread *
240thread_get(int fd, void *(*thread_main)(void *))
241{
242
243        assert(fd != 0);
244        if (fd >= nthreads) {
245                struct replay_thread **newthreads = threads;
246                size_t newnthreads = nthreads;
247
248                while (fd >= newnthreads)
249                        newnthreads += newnthreads + 1;
250                newthreads = realloc(newthreads,
251                    newnthreads * sizeof *newthreads);
252                XXXAN(newthreads != NULL);
253                memset(newthreads + nthreads, 0,
254                    (newnthreads - nthreads) * sizeof *newthreads);
255                threads = newthreads;
256                nthreads = newnthreads;
257        }
258        if (threads[fd] == NULL) {
259                threads[fd] = malloc(sizeof *threads[fd]);
260                assert(threads[fd] != NULL);
261                threads[fd]->sock = -1;
262                thread_clear(threads[fd]);
263                mailbox_create(&threads[fd]->mbox);
264                if (pthread_create(&threads[fd]->thread_id, &thread_attr,
265                    thread_main, threads[fd]) != 0) {
266                        thread_log(0, errno, "pthread_create()");
267                        mailbox_destroy(&threads[fd]->mbox);
268                        freez(threads[fd]);
269                        threads[fd] = THREAD_FAIL;
270                } else {
271                        threads[fd]->fd = fd;
272                        thread_log(0, 0, "thread %p:%d started",
273                            (void *)threads[fd]->thread_id, fd);
274                }
275        }
276        if (threads[fd] == THREAD_FAIL)
277                return (NULL);
278        return (threads[fd]);
279}
280
281static void
282thread_close(int fd)
283{
284
285        if (fd == -1) {
286                for (fd = 0; fd < nthreads; ++fd)
287                        thread_close(fd);
288                return;
289        }
290
291        assert(fd < nthreads);
292
293        if (threads[fd] == NULL)
294                return;
295        mailbox_close(&threads[fd]->mbox);
296        pthread_join(threads[fd]->thread_id, NULL);
297        thread_log(0, 0, "thread %p stopped",
298            (void *)threads[fd]->thread_id);
299        thread_clear(threads[fd]);
300        mailbox_destroy(&threads[fd]->mbox);
301        freez(threads[fd]);
302}
303
304/*
305 * Allocate from thread arena
306 */
307static void *
308thread_alloc(struct replay_thread *thr, size_t len)
309{
310        void *ptr;
311
312        if (sizeof thr->arena - thr->top < len)
313                return (NULL);
314        ptr = thr->arena + thr->top;
315        thr->top += len;
316        return (ptr);
317}
318
319/*
320 * Returns a copy of the entire string with leading and trailing spaces
321 * trimmed.
322 */
323static char *
324trimline(struct replay_thread *thr, const char *str)
325{
326        size_t len;
327        char *p;
328
329        /* skip leading space */
330        while (*str && *str == ' ')
331                ++str;
332
333        /* seek to end of string */
334        for (len = 0; str[len]; ++len)
335                 /* nothing */ ;
336
337        /* trim trailing space */
338        while (len && str[len - 1] == ' ')
339                --len;
340
341        /* copy and return */
342        if ((p = thread_alloc(thr, len + 1)) == NULL)
343                return (NULL);
344        memcpy(p, str, len);
345        p[len] = '\0';
346        return (p);
347}
348
349/* Read a line from the socket and return the number of bytes read.
350 * After returning, line will point to the read bytes in memory.
351 * A line is terminated by \r\n
352 */
353static int
354read_line(struct replay_thread *thr)
355{
356        int i, len;
357
358        len = 0;
359        while (1) {
360                if (len + 2 > sizeof thr->line) {
361                        thread_log(0, 0, "overflow");
362                        return (-1);
363                }
364                i = read(thr->sock, thr->line + len, 1);
365                if (i < 0) {
366                        thread_log(0, errno, "read(%d, %p, %d)",
367                            thr->sock, thr->line + len, 1);
368                        return (-1);
369                }
370                if (i == 0)
371                        break;
372                len += i;
373                if (len >= 2 && thr->line[len - 2] == '\r' &&
374                    thr->line[len - 1] == '\n') {
375                        len -= 2;
376                        break;
377                }
378        }
379        thr->line[len] = '\0';
380        return (len);
381}
382
383/* Read a block of data from the socket, and do nothing with it.
384 * length says how many bytes to read, and the function returns
385 * the number of bytes read.
386 */
387static int
388read_block(struct replay_thread *thr, int len)
389{
390        int n, r, tot;
391
392        for (tot = 0; tot < len; tot += r) {
393                n = len - tot;
394                if (n > sizeof thr->temp)
395                        n = sizeof thr->temp;
396                r = read(thr->sock, thr->temp, n);
397                if (r < 0) {
398                        thread_log(0, errno, "read(%d, %p, %d)",
399                            thr->sock, thr->temp, n);
400                        return (-1);
401                }
402                if (r == 0)
403                        break;
404        }
405        return (tot);
406}
407
408/* Receive the response after sending a request.
409 */
410static int
411receive_response(struct replay_thread *thr)
412{
413        const char *next;
414        int line_len;
415        long chunk_length, content_length;
416        int chunked, connclose, failed;
417        int n, status;
418
419        content_length = 0;
420        chunked = connclose = failed = 0;
421
422        /* Read header */
423        for (;;) {
424                line_len = read_line(thr);
425                if (line_len < 0)
426                        return (-1);
427                thread_log(2, 0, "< %.*s", line_len, thr->line);
428                if (line_len == 0)
429                        break;
430                if (strncmp(thr->line, "HTTP", 4) == 0) {
431                        sscanf(thr->line, "%*s %d %*s\r\n", &status);
432                        failed = (status != 200);
433                } else if (isprefix(thr->line, "content-length:", &next)) {
434                        content_length = strtol(next, NULL, 10);
435                } else if (isprefix(thr->line, "transfer-encoding:", &next)) {
436                        chunked = (strcasecmp(next, "chunked") == 0);
437                } else if (isprefix(thr->line, "connection:", &next)) {
438                        connclose = (strcasecmp(next, "close") == 0);
439                }
440        }
441
442        thread_log(1, 0, "status: %d", status);
443
444        /* Read body */
445        if (chunked) {
446                /* Chunked encoding, read size and bytes until no more */
447                thread_log(1, 0, "chunked encoding");
448                for (;;) {
449                        line_len = read_line(thr);
450                        if (line_len < 0)
451                                return (-1);
452                        /* read_line() guarantees null-termination */
453                        chunk_length = strtol(thr->line, NULL, 16);
454                        if (chunk_length == 0)
455                                break;
456                        if ((n = read_block(thr, chunk_length)) < 0)
457                                return (-1);
458                        if (n < chunk_length)
459                                thread_log(0, 0, "short read: %d/%ld",
460                                    n, chunk_length);
461                        thread_log(1, 0, "chunk length: %ld", chunk_length);
462                        thread_log(1, 0, "bytes read: %d", n);
463                        /* trailing CR LF */
464                        if ((n = read_line(thr)) < 0)
465                                return (-1);
466                }
467                /* trailing CR LF */
468                n = read_line(thr);
469                if (n < 0)
470                        return (-1);
471        } else if (content_length > 0) {
472                /* Fixed body size, read content_length bytes */
473                thread_log(1, 0, "fixed length");
474                thread_log(1, 0, "content length: %ld", content_length);
475                if ((n = read_block(thr, content_length)) < 0)
476                        return (1);
477                if (n < content_length)
478                        thread_log(0, 0, "short read: %d/%ld",
479                            n, content_length);
480                thread_log(1, 0, "bytes read: %d", n);
481        } else {
482                /* No body --> stop reading. */
483                thread_log(1, 0, "no body");
484                return (-1);
485        }
486
487        return (connclose);
488}
489
490static void *
491replay_thread(void *arg)
492{
493        struct iovec iov[6];
494        char space[1] = " ", crlf[2] = "\r\n";
495        struct replay_thread *thr = arg;
496        struct message *msg;
497        enum VSL_tag_e tag;
498        char *ptr;
499        const char *next;
500
501        int i;
502
503        int reopen = 1;
504
505        while ((msg = mailbox_get(&thr->mbox)) != NULL) {
506                tag = msg->tag;
507                ptr = msg->ptr;
508
509                thread_log(2, 0, "%s(%s)", VSL_tags[tag], msg->ptr);
510
511                switch (tag) {
512                case SLT_RxRequest:
513                        if (thr->method != NULL)
514                                thr->bogus = 1;
515                        else
516                                thr->method = trimline(thr, ptr);
517                        break;
518
519                case SLT_RxURL:
520                        if (thr->url != NULL)
521                                thr->bogus = 1;
522                        else
523                                thr->url = trimline(thr, ptr);
524                        break;
525
526                case SLT_RxProtocol:
527                        if (thr->proto != NULL)
528                                thr->bogus = 1;
529                        else
530                                thr->proto = trimline(thr, ptr);
531                        break;
532
533                case SLT_RxHeader:
534                        if (thr->nhdr >= sizeof thr->hdr / sizeof *thr->hdr) {
535                                thr->bogus = 1;
536                        } else {
537                                thr->hdr[thr->nhdr++] = trimline(thr, ptr);
538                                if (isprefix(ptr, "connection:", &next))
539                                        thr->conn = trimline(thr, next);
540                        }
541                        break;
542
543                default:
544                        break;
545                }
546
547                freez(msg->ptr);
548                freez(msg);
549
550                if (tag != SLT_ReqEnd)
551                        continue;
552
553                if (!thr->method || !thr->url || !thr->proto) {
554                        thr->bogus = 1;
555                } else if (strcmp(thr->method, "GET") != 0 &&
556                    strcmp(thr->method, "HEAD") != 0) {
557                        thr->bogus = 1;
558                } else if (strcmp(thr->proto, "HTTP/1.0") == 0) {
559                        reopen = !(thr->conn &&
560                            strcasecmp(thr->conn, "keep-alive") == 0);
561                } else if (strcmp(thr->proto, "HTTP/1.1") == 0) {
562                        reopen = (thr->conn &&
563                            strcasecmp(thr->conn, "close") == 0);
564                } else {
565                        thr->bogus = 1;
566                }
567
568                if (thr->bogus) {
569                        thread_log(1, 0, "bogus");
570                        goto clear;
571                }
572
573                if (thr->sock == -1) {
574                        for (;;) {
575                                thread_log(1, 0, "sleeping before connect...");
576                                usleep(1000 * (thr->fd % 3001));
577                                thr->sock = VSS_connect(addr_info, 0);
578                                if (thr->sock >= 0)
579                                        break;
580                                thread_log(0, errno, "connect failed");
581                        }
582                }
583
584                thread_log(1, 0, "%s %s %s", thr->method, thr->url, thr->proto);
585
586                iov[0].iov_base = thr->method;
587                iov[0].iov_len = strlen(thr->method);
588                iov[2].iov_base = thr->url;
589                iov[2].iov_len = strlen(thr->url);
590                iov[4].iov_base = thr->proto;
591                iov[4].iov_len = strlen(thr->proto);
592                iov[1].iov_base = iov[3].iov_base = space;
593                iov[1].iov_len = iov[3].iov_len = 1;
594                iov[5].iov_base = crlf;
595                iov[5].iov_len = 2;
596                if (writev(thr->sock, iov, 6) == -1) {
597                        thread_log(0, errno, "writev()");
598                        goto close;
599                }
600
601                for (i = 0; i < thr->nhdr; ++i) {
602                        thread_log(2, 0, "%d %s", i, thr->hdr[i]);
603                        iov[0].iov_base = thr->hdr[i];
604                        iov[0].iov_len = strlen(thr->hdr[i]);
605                        iov[1].iov_base = crlf;
606                        iov[1].iov_len = 2;
607                        if (writev(thr->sock, iov, 2) == -1) {
608                                thread_log(0, errno, "writev()");
609                                goto close;
610                        }
611                }
612                if (write(thr->sock, crlf, 2) == -1) {
613                        thread_log(0, errno, "writev()");
614                        goto close;
615                }
616                if (receive_response(thr) || reopen) {
617close:
618                        thread_log(1, 0, "close");
619                        assert(thr->sock != -1);
620                        close(thr->sock);
621                        thr->sock = -1;
622                }
623
624                sleep(1);
625clear:
626                /* clean up */
627                thread_clear(thr);
628        }
629
630        /* leftovers */
631        thread_clear(thr);
632
633        return (0);
634}
635
636static int
637gen_traffic(void *priv, enum VSL_tag_e tag, unsigned fd,
638    unsigned len, unsigned spec, const char *ptr, uint64_t bitmap)
639{
640        struct replay_thread *thr;
641        struct message *msg;
642
643        (void)priv;
644        (void)bitmap;
645
646        if (fd == 0 || !(spec & VSL_S_CLIENT))
647                return (0);
648
649        thread_log(3, 0, "%d %s", fd, VSL_tags[tag]);
650        thr = thread_get(fd, replay_thread);
651        if (thr == NULL)
652                return (0);
653        msg = malloc(sizeof (struct message));
654        msg->tag = tag;
655        msg->len = len;
656        msg->ptr = malloc(len);
657        AN(msg->ptr);
658        memcpy(msg->ptr, ptr, len);
659        mailbox_put(&thr->mbox, msg);
660
661        return (0);
662}
663
664/* Initiate a connection to <address> by resolving the
665 * hostname and returning a struct with necessary
666 * connection info.
667 */
668static struct vss_addr *
669init_connection(const char *address)
670{
671        struct vss_addr **ta;
672        struct vss_addr *tap;
673        int i, n;
674
675        n = VSS_resolve(address, NULL, &ta);
676        if (n == 0) {
677                thread_log(0, 0, "Could not connect to server");
678                exit(2);
679        }
680        for (i = 1; i < n; ++i) {
681                free(ta[i]);
682                ta[i] = NULL;
683        }
684        tap = ta[0];
685        free(ta);
686
687        return (tap);
688}
689
690/*--------------------------------------------------------------------*/
691
692static void
693usage(void)
694{
695
696        fprintf(stderr,
697            "usage: varnishreplay [-D] -a address:port -r logfile\n");
698        exit(1);
699}
700
701int
702main(int argc, char *argv[])
703{
704        int c;
705        struct VSM_data *vd;
706        const char *address = NULL;
707
708        vd = VSM_New();
709        VSL_Setup(vd);
710        debug = 0;
711
712        VSL_Arg(vd, 'c', NULL);
713        while ((c = getopt(argc, argv, "a:Dr:n:")) != -1) {
714                switch (c) {
715                case 'a':
716                        address = optarg;
717                        break;
718                case 'D':
719                        ++debug;
720                        break;
721                default:
722                        if (VSL_Arg(vd, c, optarg) > 0)
723                                break;
724                        usage();
725                }
726        }
727
728        if (address == NULL) {
729                usage();
730        }
731
732        if (VSL_Open(vd, 1))
733                exit(1);
734
735        addr_info = init_connection(address);
736
737        signal(SIGPIPE, SIG_IGN);
738
739        pthread_attr_init(&thread_attr);
740
741        /*
742         * XXX: seting the stack size manually reduces the memory usage
743         * XXX: (allowing more threads) and increases speed (?)
744         */
745        pthread_attr_setstacksize(&thread_attr, 32768);
746
747        while (VSL_Dispatch(vd, gen_traffic, NULL) == 0)
748                /* nothing */ ;
749        thread_close(-1);
750        exit(0);
751}
Note: See TracBrowser for help on using the repository browser.