r862 - trunk/varnish-cache/bin/varnishd

phk at projects.linpro.no phk at projects.linpro.no
Mon Aug 21 12:59:01 CEST 2006


Author: phk
Date: 2006-08-21 12:59:00 +0200 (Mon, 21 Aug 2006)
New Revision: 862

Added:
   trunk/varnish-cache/bin/varnishd/cache_acceptor_epoll.c
   trunk/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c
   trunk/varnish-cache/bin/varnishd/cache_acceptor_poll.c
Modified:
   trunk/varnish-cache/bin/varnishd/Makefile.am
   trunk/varnish-cache/bin/varnishd/cache_acceptor.c
Log:
Break the acceptors out into their own files.

The intent here is to compile in all acceptors supported on the
operating system and allow the user to select one at startup time.



Modified: trunk/varnish-cache/bin/varnishd/Makefile.am
===================================================================
--- trunk/varnish-cache/bin/varnishd/Makefile.am	2006-08-21 09:51:23 UTC (rev 861)
+++ trunk/varnish-cache/bin/varnishd/Makefile.am	2006-08-21 10:59:00 UTC (rev 862)
@@ -8,6 +8,9 @@
 
 varnishd_SOURCES = \
 	cache_acceptor.c \
+	cache_acceptor_epoll.c \
+	cache_acceptor_poll.c \
+	cache_acceptor_kqueue.c \
 	cache_backend.c \
 	cache_ban.c \
 	cache_center.c \

Modified: trunk/varnish-cache/bin/varnishd/cache_acceptor.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor.c	2006-08-21 09:51:23 UTC (rev 861)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor.c	2006-08-21 10:59:00 UTC (rev 862)
@@ -9,7 +9,7 @@
 #undef ACCEPTOR_USE_KQUEUE
 #undef ACCEPTOR_USE_EPOLL
 #undef ACCEPTOR_USE_POLL
-
+    
 #if defined(HAVE_KQUEUE)
 #define ACCEPTOR_USE_KQUEUE 1
 #elif defined(HAVE_EPOLL_CTL)
@@ -37,11 +37,25 @@
 #include "heritage.h"
 #include "shmlog.h"
 #include "cache.h"
+#include "cache_acceptor.h"
 
-static pthread_t vca_thread;
+
+static struct acceptor *vca_acceptors[] = {
+#if defined(HAVE_KQUEUE)
+	&acceptor_kqueue,
+#endif
+#if defined(HAVE_EPOLL_CTL)
+	&acceptor_epoll,
+#endif
+#if defined(HAVE_POLL_CTL)
+	&acceptor_poll,
+#endif
+	NULL,
+};
+
 static unsigned		xids;
 
-static struct sess *
+struct sess *
 vca_accept_sess(int fd)
 {
 	socklen_t l;
@@ -90,7 +104,7 @@
 	return (sp);
 }
 
-static void
+void
 vca_handover(struct sess *sp, int bad)
 {
 
@@ -107,460 +121,35 @@
 	WRK_QueueSession(sp);
 }
 
-/*====================================================================*/
-#ifdef ACCEPTOR_USE_POLL
-
-#include <poll.h>
-
-static struct pollfd *pollfd;
-static unsigned npoll;
-
-static int pipes[2];
-
-static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
-
 /*--------------------------------------------------------------------*/
 
-static void
-vca_pollspace(int fd)
-{
-	struct pollfd *p;
-	unsigned u, v;
-
-	if (fd < npoll)
-		return;
-	if (npoll == 0)
-		npoll = 16;
-	for (u = npoll; fd >= u; )
-		u += u;
-	VSL(SLT_Debug, 0, "Acceptor Pollspace %u", u);
-	p = realloc(pollfd, u * sizeof *p);
-	assert(p != NULL);
-	memset(p + npoll, 0, (u - npoll) * sizeof *p);
-	for (v = npoll ; v <= u; v++) 
-		p->fd = -1;
-	pollfd = p;
-	npoll = u;
-}
-
-/*--------------------------------------------------------------------*/
-
-static void
-vca_poll(int fd)
-{
-	vca_pollspace(fd);
-	pollfd[fd].fd = fd;
-	pollfd[fd].events = POLLIN;
-}
-
-static void
-vca_unpoll(int fd)
-{
-	vca_pollspace(fd);
-	pollfd[fd].fd = -1;
-	pollfd[fd].events = 0;
-}
-
-/*--------------------------------------------------------------------*/
-
-static void
-vca_rcvhdev(struct sess *sp)
-{
-
-	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-	clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
-	TAILQ_INSERT_TAIL(&sesshead, sp, list);
-	vca_poll(sp->fd);
-}
-
-static void
-accept_f(int fd)
-{
-	struct sess *sp;
-
-	sp = vca_accept_sess(fd);
-	if (sp == NULL)
-		return;
-
-	http_RecvPrep(sp->http);
-	vca_rcvhdev(sp);
-}
-
-static void *
-vca_main(void *arg)
-{
-	unsigned u, v;
-	struct sess *sp, *sp2;
-	struct timespec t;
-	int i;
-
-	(void)arg;
-
-	AZ(pipe(pipes));
-	vca_poll(pipes[0]);
-
-	if (heritage.socket >= 0)
-		vca_poll(heritage.socket);
-
-	while (1) {
-		v = poll(pollfd, npoll, 5000);
-		if (v && pollfd[pipes[0]].revents) {
-			v--;
-			i = read(pipes[0], &sp, sizeof sp);
-			assert(i == sizeof sp);
-			CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-			if (http_RecvPrepAgain(sp->http))
-				vca_handover(sp, 0);
-			else
-				vca_rcvhdev(sp);
-		}
-		if (heritage.socket >= 0 &&
-		    pollfd[heritage.socket].revents) {
-			accept_f(heritage.socket);
-			v--;
-		}
-		clock_gettime(CLOCK_MONOTONIC, &t);
-		TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
-			CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-		    	if (pollfd[sp->fd].revents) {
-				v--;
-				i = http_RecvSome(sp->fd, sp->http);
-				if (i < 0)
-					continue;
-
-				vca_unpoll(sp->fd);
-				TAILQ_REMOVE(&sesshead, sp, list);
-				vca_handover(sp, i);
-				continue;
-			}
-			if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) {
-				TAILQ_REMOVE(&sesshead, sp, list);
-				vca_unpoll(sp->fd);
-				vca_close_session(sp, "timeout");
-				vca_return_session(sp);
-				continue;
-			}
-			if (v == 0)
-				break;
-		}
-	}
-
-	INCOMPL();
-}
-
-/*--------------------------------------------------------------------*/
-
 void
-vca_return_session(struct sess *sp)
+vca_close_session(struct sess *sp, const char *why)
 {
 
-	if (sp->fd < 0) {
-		SES_Delete(sp);
-		return;
-	}
-	(void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
-	VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
-	assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
+	VSL(SLT_SessionClose, sp->fd, why);
+	if (sp->fd >= 0)
+		AZ(close(sp->fd));
+	sp->fd = -1;
 }
 
-#endif /* ACCEPTOR_USE_POLL */
-/*====================================================================*/
-#ifdef ACCEPTOR_USE_EPOLL
-
-#include <sys/epoll.h>
-
-static int epfd = -1;
-static int pipes[2];
-
-static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
-
-static void
-vca_add(int fd, void *data)
-{
-	struct epoll_event ev = { EPOLLIN | EPOLLPRI, { data } };
-	AZ(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev));
-}
-
-static void
-vca_del(int fd)
-{
-	AZ(epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL));
-}
-
-static void
-vca_rcvhdev(struct sess *sp)
-{
-
-	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-	clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
-	TAILQ_INSERT_TAIL(&sesshead, sp, list);
-	vca_add(sp->fd, sp);
-}
-
-static void
-accept_f(int fd)
-{
-	struct sess *sp;
-
-	sp = vca_accept_sess(fd);
-	if (sp == NULL)
-		return;
-	http_RecvPrep(sp->http);
-	vca_rcvhdev(sp);
-}
-
-static void *
-vca_main(void *arg)
-{
-	struct epoll_event ev;
-	struct timespec t;
-	struct sess *sp, *sp2;
-	int i;
-
-	(void)arg;
-
-	epfd = epoll_create(16);
-	assert(epfd >= 0);
-
-	AZ(pipe(pipes));
-	vca_add(pipes[0], pipes);
-
-	if (heritage.socket >= 0)
-		vca_add(heritage.socket, accept_f);
-
-	while (1) {
-		if (epoll_wait(epfd, &ev, 1, 5000) > 0) {
-			if (ev.data.ptr == pipes) {
-				i = read(pipes[0], &sp, sizeof sp);
-				assert(i == sizeof sp);
-				CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-				if (http_RecvPrepAgain(sp->http))
-					vca_handover(sp, 0);
-				else
-					vca_rcvhdev(sp);
-			} else if (ev.data.ptr == accept_f) {
-				accept_f(heritage.socket);
-			} else {
-				CAST_OBJ_NOTNULL(sp, ev.data.ptr, SESS_MAGIC);
-				i = http_RecvSome(sp->fd, sp->http);
-				if (i != -1) {
-					TAILQ_REMOVE(&sesshead, sp, list);
-					vca_del(sp->fd);
-					vca_handover(sp, i);
-				}
-			}
-		}
-		/* check for timeouts */
-		clock_gettime(CLOCK_MONOTONIC, &t);
-		TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
-			CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-			if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) {
-				TAILQ_REMOVE(&sesshead, sp, list);
-				vca_del(sp->fd);
-				vca_close_session(sp, "timeout");
-				vca_return_session(sp);
-				continue;
-			}
-		}
-	}
-
-	INCOMPL();
-}
-
-/*--------------------------------------------------------------------*/
-
 void
 vca_return_session(struct sess *sp)
 {
 
-	if (sp->fd < 0) {
-		SES_Delete(sp);
-		return;
-	}
-	(void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
-	VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
-	assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
-}
-
-#endif /* ACCEPTOR_USE_EPOLL */
-/*====================================================================*/
-#ifdef ACCEPTOR_USE_KQUEUE
-
-#include <sys/event.h>
-
-static int kq = -1;
-
-static void
-vca_kq_sess(struct sess *sp, int arm)
-{
-	struct kevent ke[2];
-	int i, j, arm2;
-
 	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-	memset(ke, 0, sizeof ke);
-	if (arm == EV_ADD || arm == EV_ENABLE) {
-		assert(sp->kqa == 0);
-		sp->kqa = 1;
-		arm2 = EV_ADD;
-	} else  {
-		assert(sp->kqa == 1);
-		sp->kqa = 0;
-		arm2 = EV_DELETE;
-	}
-	j = 0;
-	EV_SET(&ke[j++], sp->id, EVFILT_TIMER, arm2,
-	    0, params->sess_timeout * 1000, sp);
-	if (sp->fd >= 0)
-		EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp);
-
-	i = kevent(kq, ke, j, NULL, 0, NULL);
-	assert(i == 0);
+	vca_acceptors[0]->recycle(sp);
 }
 
-static struct sess *
-vca_kev(struct kevent *kp)
-{
-	int i;
-	struct sess *sp;
-
-	if (kp->udata == vca_accept_sess) {
-		while (kp->data-- > 0) {
-			sp = vca_accept_sess(kp->ident);
-			if (sp == NULL)
-				return (NULL);
-			clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
-			http_RecvPrep(sp->http);
-			vca_kq_sess(sp, EV_ADD);
-		}
-		return (NULL);
-	}
-	if (kp->udata == NULL) {
-		VSL(SLT_Debug, 0,
-		    "KQ RACE %s flags %x fflags %x data %x",
-		    kp->filter == EVFILT_READ ? "R" : "T",
-		    kp->flags, kp->fflags, kp->data);
-		return (NULL);
-	}
-	CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
-	if (sp->kqa == 0) {
-		VSL(SLT_Debug, sp->id,
-		    "KQ %s flags %x fflags %x data %x",
-		    kp->filter == EVFILT_READ ? "R" : "T",
-		    kp->flags, kp->fflags, kp->data);
-		return (NULL);
-	}
-	if (kp->filter == EVFILT_READ) {
-		if (kp->data > 0) {
-			i = http_RecvSome(sp->fd, sp->http);
-			switch (i) {
-			case -1:
-				return (NULL);
-			case 0:
-				vca_kq_sess(sp, EV_DISABLE);
-				vca_handover(sp, i);
-				return (NULL);	 /* ?? */
-			case 1:
-				vca_close_session(sp, "overflow");
-				break;
-			case 2:
-				vca_close_session(sp, "no request");
-				break;
-			default:
-				INCOMPL();
-			}
-			return (sp);
-		}
-		if (kp->flags == EV_EOF) {
-			vca_close_session(sp, "EOF");
-			return (sp);
-		}
-		INCOMPL();
-	}
-	if (kp->filter == EVFILT_TIMER) {
-		vca_close_session(sp, "timeout");
-		return (sp);
-	}
-	INCOMPL();
-}
-
-
-#define NKEV	100
-
-static void *
-vca_main(void *arg)
-{
-	struct kevent ke[NKEV], *kp;
-	int i, j, n;
-	struct sess *sp;
-
-	(void)arg;
-
-	kq = kqueue();
-	assert(kq >= 0);
-
-
-	assert(heritage.socket >= 0);
-	EV_SET(&ke[0], heritage.socket,
-	    EVFILT_READ, EV_ADD, 0, 0, vca_accept_sess);
-	AZ(kevent(kq, &ke[0], 1, NULL, 0, NULL));
-
-	while (1) {
-		n = kevent(kq, NULL, 0, ke, NKEV, NULL);
-		assert(n >= 1 && n <= NKEV);
-		for (kp = ke, j = 0; j < n; j++, kp++) {
-			sp = vca_kev(kp);
-			if (sp != NULL) {
-				vca_kq_sess(sp, EV_DELETE);
-				SES_Delete(sp);
-				for (i = j; i < n; i++)
-					if (ke[i].udata == sp)
-						ke[i].udata = NULL;
-			}
-		}
-	}
-	INCOMPL();
-}
-
 /*--------------------------------------------------------------------*/
 
 void
-vca_return_session(struct sess *sp)
-{
-
-	if (sp->fd < 0) {
-		SES_Delete(sp);
-		return;
-	}
-	(void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
-	VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
-	if (http_RecvPrepAgain(sp->http))
-		vca_handover(sp, 0);
-	else 
-		vca_kq_sess(sp, EV_ENABLE);
-}
-
-#endif /* ACCEPTOR_USE_KQUEUE */
-/*====================================================================*/
-
-/*--------------------------------------------------------------------*/
-
-void
-vca_close_session(struct sess *sp, const char *why)
-{
-
-	VSL(SLT_SessionClose, sp->fd, why);
-	if (sp->fd >= 0)
-		AZ(close(sp->fd));
-	sp->fd = -1;
-}
-
-/*--------------------------------------------------------------------*/
-
-void
 VCA_Init(void)
 {
 
-	AZ(pthread_create(&vca_thread, NULL, vca_main, NULL));
 	srandomdev();
 	xids = random();
+
+	/* XXX: Add selector mechanism at some point */
+	vca_acceptors[0]->init();
 }

Added: trunk/varnish-cache/bin/varnishd/cache_acceptor_epoll.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor_epoll.c	2006-08-21 09:51:23 UTC (rev 861)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor_epoll.c	2006-08-21 10:59:00 UTC (rev 862)
@@ -0,0 +1,157 @@
+/*
+ * $Id: cache_acceptor.c 860 2006-08-21 09:49:43Z phk $
+ *
+ * XXX: We need to pass sessions back into the event engine when they are
+ * reused.  Not sure what the most efficient way is for that.  For now
+ * write the session pointer to a pipe which the event engine monitors.
+ */
+
+#if defined(HAVE_EPOLL_CTL)
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <sys/uio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+
+#ifndef HAVE_SRANDOMDEV
+#include "compat/srandomdev.h"
+#endif
+
+#include "heritage.h"
+#include "shmlog.h"
+#include "cache.h"
+#include "cache_acceptor.h"
+
+static pthread_t vca_epoll_thread;
+static int epfd = -1;
+static int pipes[2];
+
+static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
+
+static void
+vca_add(int fd, void *data)
+{
+	struct epoll_event ev = { EPOLLIN | EPOLLPRI, { data } };
+	AZ(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev));
+}
+
+static void
+vca_del(int fd)
+{
+	AZ(epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL));
+}
+
+static void
+vca_rcvhdev(struct sess *sp)
+{
+
+	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+	clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+	TAILQ_INSERT_TAIL(&sesshead, sp, list);
+	vca_add(sp->fd, sp);
+}
+
+static void
+accept_f(int fd)
+{
+	struct sess *sp;
+
+	sp = vca_accept_sess(fd);
+	if (sp == NULL)
+		return;
+	http_RecvPrep(sp->http);
+	vca_rcvhdev(sp);
+}
+
+static void *
+vca_main(void *arg)
+{
+	struct epoll_event ev;
+	struct timespec t;
+	struct sess *sp, *sp2;
+	int i;
+
+	(void)arg;
+
+	epfd = epoll_create(16);
+	assert(epfd >= 0);
+
+	AZ(pipe(pipes));
+	vca_add(pipes[0], pipes);
+
+	if (heritage.socket >= 0)
+		vca_add(heritage.socket, accept_f);
+
+	while (1) {
+		if (epoll_wait(epfd, &ev, 1, 5000) > 0) {
+			if (ev.data.ptr == pipes) {
+				i = read(pipes[0], &sp, sizeof sp);
+				assert(i == sizeof sp);
+				CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+				if (http_RecvPrepAgain(sp->http))
+					vca_handover(sp, 0);
+				else
+					vca_rcvhdev(sp);
+			} else if (ev.data.ptr == accept_f) {
+				accept_f(heritage.socket);
+			} else {
+				CAST_OBJ_NOTNULL(sp, ev.data.ptr, SESS_MAGIC);
+				i = http_RecvSome(sp->fd, sp->http);
+				if (i != -1) {
+					TAILQ_REMOVE(&sesshead, sp, list);
+					vca_del(sp->fd);
+					vca_handover(sp, i);
+				}
+			}
+		}
+		/* check for timeouts */
+		clock_gettime(CLOCK_MONOTONIC, &t);
+		TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
+			CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+			if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) {
+				TAILQ_REMOVE(&sesshead, sp, list);
+				vca_del(sp->fd);
+				vca_close_session(sp, "timeout");
+				vca_return_session(sp);
+				continue;
+			}
+		}
+	}
+
+	INCOMPL();
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_epoll_recycle(struct sess *sp)
+{
+
+	if (sp->fd < 0) {
+		SES_Delete(sp);
+		return;
+	}
+	(void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
+	VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+	assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
+}
+
+static void
+vca_epoll_init(void)
+{
+	AZ(pthread_create(&vca_epoll_thread, NULL, vca_main, NULL));
+}
+
+struct acceptor acceptor_epoll = {
+	.name =		"epoll",
+	.init =		vca_epoll_init,
+	.recycle =	vca_epoll_recycle,
+};
+
+#endif /* defined(HAVE_EPOLL_CTL) */

Added: trunk/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c	2006-08-21 09:51:23 UTC (rev 861)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c	2006-08-21 10:59:00 UTC (rev 862)
@@ -0,0 +1,195 @@
+/*
+ * $Id: cache_acceptor.c 860 2006-08-21 09:49:43Z phk $
+ *
+ * XXX: We need to pass sessions back into the event engine when they are
+ * reused.  Not sure what the most efficient way is for that.  For now
+ * write the session pointer to a pipe which the event engine monitors.
+ */
+
+#if defined(HAVE_KQUEUE)
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <sys/uio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/event.h>
+
+#ifndef HAVE_SRANDOMDEV
+#include "compat/srandomdev.h"
+#endif
+
+#include "heritage.h"
+#include "shmlog.h"
+#include "cache.h"
+#include "cache_acceptor.h"
+
+static pthread_t vca_kqueue_thread;
+static int kq = -1;
+
+static void
+vca_kq_sess(struct sess *sp, int arm)
+{
+	struct kevent ke[2];
+	int i, j, arm2;
+
+	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+	memset(ke, 0, sizeof ke);
+	if (arm == EV_ADD || arm == EV_ENABLE) {
+		assert(sp->kqa == 0);
+		sp->kqa = 1;
+		arm2 = EV_ADD;
+	} else  {
+		assert(sp->kqa == 1);
+		sp->kqa = 0;
+		arm2 = EV_DELETE;
+	}
+	j = 0;
+	EV_SET(&ke[j++], sp->id, EVFILT_TIMER, arm2,
+	    0, params->sess_timeout * 1000, sp);
+	if (sp->fd >= 0)
+		EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp);
+
+	i = kevent(kq, ke, j, NULL, 0, NULL);
+	assert(i == 0);
+}
+
+static struct sess *
+vca_kev(struct kevent *kp)
+{
+	int i;
+	struct sess *sp;
+
+	if (kp->udata == vca_accept_sess) {
+		while (kp->data-- > 0) {
+			sp = vca_accept_sess(kp->ident);
+			if (sp == NULL)
+				return (NULL);
+			clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+			http_RecvPrep(sp->http);
+			vca_kq_sess(sp, EV_ADD);
+		}
+		return (NULL);
+	}
+	if (kp->udata == NULL) {
+		VSL(SLT_Debug, 0,
+		    "KQ RACE %s flags %x fflags %x data %x",
+		    kp->filter == EVFILT_READ ? "R" : "T",
+		    kp->flags, kp->fflags, kp->data);
+		return (NULL);
+	}
+	CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
+	if (sp->kqa == 0) {
+		VSL(SLT_Debug, sp->id,
+		    "KQ %s flags %x fflags %x data %x",
+		    kp->filter == EVFILT_READ ? "R" : "T",
+		    kp->flags, kp->fflags, kp->data);
+		return (NULL);
+	}
+	if (kp->filter == EVFILT_READ) {
+		if (kp->data > 0) {
+			i = http_RecvSome(sp->fd, sp->http);
+			switch (i) {
+			case -1:
+				return (NULL);
+			case 0:
+				vca_kq_sess(sp, EV_DISABLE);
+				vca_handover(sp, i);
+				return (NULL);	 /* ?? */
+			case 1:
+				vca_close_session(sp, "overflow");
+				break;
+			case 2:
+				vca_close_session(sp, "no request");
+				break;
+			default:
+				INCOMPL();
+			}
+			return (sp);
+		}
+		if (kp->flags == EV_EOF) {
+			vca_close_session(sp, "EOF");
+			return (sp);
+		}
+		INCOMPL();
+	}
+	if (kp->filter == EVFILT_TIMER) {
+		vca_close_session(sp, "timeout");
+		return (sp);
+	}
+	INCOMPL();
+}
+
+
+#define NKEV	100
+
+static void *
+vca_main(void *arg)
+{
+	struct kevent ke[NKEV], *kp;
+	int i, j, n;
+	struct sess *sp;
+
+	(void)arg;
+
+	kq = kqueue();
+	assert(kq >= 0);
+
+
+	assert(heritage.socket >= 0);
+	EV_SET(&ke[0], heritage.socket,
+	    EVFILT_READ, EV_ADD, 0, 0, vca_accept_sess);
+	AZ(kevent(kq, &ke[0], 1, NULL, 0, NULL));
+
+	while (1) {
+		n = kevent(kq, NULL, 0, ke, NKEV, NULL);
+		assert(n >= 1 && n <= NKEV);
+		for (kp = ke, j = 0; j < n; j++, kp++) {
+			sp = vca_kev(kp);
+			if (sp != NULL) {
+				vca_kq_sess(sp, EV_DELETE);
+				SES_Delete(sp);
+				for (i = j; i < n; i++)
+					if (ke[i].udata == sp)
+						ke[i].udata = NULL;
+			}
+		}
+	}
+	INCOMPL();
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_kqueue_recycle(struct sess *sp)
+{
+
+	if (sp->fd < 0) {
+		SES_Delete(sp);
+		return;
+	}
+	(void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
+	VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+	if (http_RecvPrepAgain(sp->http))
+		vca_handover(sp, 0);
+	else 
+		vca_kq_sess(sp, EV_ENABLE);
+}
+
+static void
+vca_kqueue_init(void)
+{
+	AZ(pthread_create(&vca_kqueue_thread, NULL, vca_main, NULL));
+}
+
+struct acceptor acceptor_kqueue = {
+	.name =		"kqueue",
+	.init =		vca_kqueue_init,
+	.recycle =	vca_kqueue_recycle,
+};
+
+#endif /* defined(HAVE_KQUEUE) */

Added: trunk/varnish-cache/bin/varnishd/cache_acceptor_poll.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor_poll.c	2006-08-21 09:51:23 UTC (rev 861)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor_poll.c	2006-08-21 10:59:00 UTC (rev 862)
@@ -0,0 +1,195 @@
+/*
+ * $Id: cache_acceptor.c 860 2006-08-21 09:49:43Z phk $
+ *
+ * XXX: We need to pass sessions back into the event engine when they are
+ * reused.  Not sure what the most efficient way is for that.  For now
+ * write the session pointer to a pipe which the event engine monitors.
+ */
+
+#if defined(HAVE_POLL)
+
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <poll.h>
+
+#include <sys/uio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#ifndef HAVE_SRANDOMDEV
+#include "compat/srandomdev.h"
+#endif
+
+#include "heritage.h"
+#include "shmlog.h"
+#include "cache.h"
+#include "cache_acceptor.h"
+
+static pthread_t vca_poll_thread;
+static struct pollfd *pollfd;
+static unsigned npoll;
+
+static int pipes[2];
+
+static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_pollspace(int fd)
+{
+	struct pollfd *p;
+	unsigned u, v;
+
+	if (fd < npoll)
+		return;
+	if (npoll == 0)
+		npoll = 16;
+	for (u = npoll; fd >= u; )
+		u += u;
+	VSL(SLT_Debug, 0, "Acceptor Pollspace %u", u);
+	p = realloc(pollfd, u * sizeof *p);
+	assert(p != NULL);
+	memset(p + npoll, 0, (u - npoll) * sizeof *p);
+	for (v = npoll ; v <= u; v++) 
+		p->fd = -1;
+	pollfd = p;
+	npoll = u;
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_poll(int fd)
+{
+	vca_pollspace(fd);
+	pollfd[fd].fd = fd;
+	pollfd[fd].events = POLLIN;
+}
+
+static void
+vca_unpoll(int fd)
+{
+	vca_pollspace(fd);
+	pollfd[fd].fd = -1;
+	pollfd[fd].events = 0;
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_rcvhdev(struct sess *sp)
+{
+
+	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+	clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+	TAILQ_INSERT_TAIL(&sesshead, sp, list);
+	vca_poll(sp->fd);
+}
+
+static void
+accept_f(int fd)
+{
+	struct sess *sp;
+
+	sp = vca_accept_sess(fd);
+	if (sp == NULL)
+		return;
+
+	http_RecvPrep(sp->http);
+	vca_rcvhdev(sp);
+}
+
+static void *
+vca_main(void *arg)
+{
+	unsigned u, v;
+	struct sess *sp, *sp2;
+	struct timespec t;
+	int i;
+
+	(void)arg;
+
+	AZ(pipe(pipes));
+	vca_poll(pipes[0]);
+
+	if (heritage.socket >= 0)
+		vca_poll(heritage.socket);
+
+	while (1) {
+		v = poll(pollfd, npoll, 5000);
+		if (v && pollfd[pipes[0]].revents) {
+			v--;
+			i = read(pipes[0], &sp, sizeof sp);
+			assert(i == sizeof sp);
+			CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+			if (http_RecvPrepAgain(sp->http))
+				vca_handover(sp, 0);
+			else
+				vca_rcvhdev(sp);
+		}
+		if (heritage.socket >= 0 &&
+		    pollfd[heritage.socket].revents) {
+			accept_f(heritage.socket);
+			v--;
+		}
+		clock_gettime(CLOCK_MONOTONIC, &t);
+		TAILQ_FOREACH_SAFE(sp, &sesshead, list, sp2) {
+			CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+		    	if (pollfd[sp->fd].revents) {
+				v--;
+				i = http_RecvSome(sp->fd, sp->http);
+				if (i < 0)
+					continue;
+
+				vca_unpoll(sp->fd);
+				TAILQ_REMOVE(&sesshead, sp, list);
+				vca_handover(sp, i);
+				continue;
+			}
+			if (sp->t_idle.tv_sec + params->sess_timeout < t.tv_sec) {
+				TAILQ_REMOVE(&sesshead, sp, list);
+				vca_unpoll(sp->fd);
+				vca_close_session(sp, "timeout");
+				vca_return_session(sp);
+				continue;
+			}
+			if (v == 0)
+				break;
+		}
+	}
+
+	INCOMPL();
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vca_poll_recycle(struct sess *sp)
+{
+
+	if (sp->fd < 0) {
+		SES_Delete(sp);
+		return;
+	}
+	(void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
+	VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+	assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
+}
+
+static void
+vca_poll_init(void)
+{
+	AZ(pthread_create(&vca_poll_thread, NULL, vca_main, NULL));
+}
+
+struct acceptor acceptor_poll = {
+	.name =		"poll",
+	.init =		vca_poll_init,
+	.recycle =	vca_poll_recycle,
+};
+
+#endif /* defined(HAVE_POLL) */




More information about the varnish-commit mailing list