[master] 273788a Move session scheduling to new task api

Poul-Henning Kamp phk at varnish-cache.org
Mon Jan 23 12:10:15 CET 2012


commit 273788aee2c1892ed4a1334442dca28533f0bc94
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date:   Mon Jan 23 11:10:02 2012 +0000

    Move session scheduling to new task api

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 2f48427..43b4eae 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -288,7 +288,7 @@ struct wrk_accept {
 
 /* Worker pool stuff -------------------------------------------------*/
 
-typedef void pool_func_t(struct pool *pp, struct worker *wrk, void *priv);
+typedef void pool_func_t(struct worker *wrk, void *priv);
 
 struct pool_task {
 	VTAILQ_ENTRY(pool_task)		list;
@@ -324,8 +324,7 @@ struct worker {
 	struct dstat		stats;
 
 	/* New Pool stuff */
-	pool_func_t		*pool_func;
-	void			*pool_priv;
+	struct pool_task	task;
 
 	/* Pool stuff */
 	enum e_do_what		do_what;
@@ -914,7 +913,6 @@ void PipeSession(struct sess *sp);
 /* cache_pool.c */
 void Pool_Init(void);
 void Pool_Work_Thread(void *priv, struct worker *w);
-int Pool_Schedule(struct pool *pp, struct sess *sp);
 int Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how);
 
 #define WRW_IsReleased(w)	((w)->wrw.wfd == NULL)
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index a8a2b2a..20dc38a 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -199,8 +199,8 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
 	if (wrk != NULL) {
 		VTAILQ_REMOVE(&pp->idle, wrk, list);
 		Lck_Unlock(&pp->mtx);
-		wrk->pool_func = task->func;
-		wrk->pool_priv = task->priv;
+		wrk->task.func = task->func;
+		wrk->task.priv = task->priv;
 		AZ(pthread_cond_signal(&wrk->cond));
 		return (0);
 	}
@@ -210,7 +210,15 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
 		retval = -1;
 		break;
 	case POOL_QUEUE_FRONT:
-		VTAILQ_INSERT_TAIL(&pp->front_queue, task, list);
+		/* If we have too much in the queue already, refuse. */
+		if (pp->lqueue > (cache_param->queue_max * pp->nthr) / 100) {
+			pp->ndropped++;
+			retval = -1;
+		} else {
+			VTAILQ_INSERT_TAIL(&pp->front_queue, task, list);
+			pp->nqueued++;
+			pp->lqueue++;
+		}
 		break;
 	case POOL_QUEUE_BACK:
 		VTAILQ_INSERT_TAIL(&pp->back_queue, task, list);
@@ -219,6 +227,8 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
 		WRONG("Unknown enum pool_how");
 	}
 	Lck_Unlock(&pp->mtx);
+	if (how == POOL_QUEUE_FRONT && !retval)
+		AZ(pthread_cond_signal(&pp->herder_cond));
 	return (retval);
 }
 
@@ -248,8 +258,10 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
 		WS_Reset(wrk->ws, NULL);
 
 		tp = VTAILQ_FIRST(&pp->front_queue);
-		if (tp != NULL)
+		if (tp != NULL) {
+			pp->lqueue--;
 			VTAILQ_REMOVE(&pp->front_queue, tp, list);
+		}
 
 		if (tp == NULL) {
 			tp = VTAILQ_FIRST(&pp->back_queue);
@@ -260,7 +272,8 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
 		if (tp != NULL) {
 			Lck_Unlock(&pp->mtx);
 			AN(tp->func);
-			tp->func(pp, wrk, tp->priv);
+			assert(wrk->pool == pp);
+			tp->func(wrk, tp->priv);
 			stats_clean = WRK_TrySumStat(wrk);
 			Lck_Lock(&pp->mtx);
 			continue;
@@ -297,6 +310,17 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
 			(void)Lck_CondWait(&wrk->cond, &pp->mtx, NULL);
 		}
 
+		if (wrk->task.func != NULL) {
+			Lck_Unlock(&pp->mtx);
+			assert(wrk->pool == pp);
+			wrk->task.func(wrk, wrk->task.priv);
+			wrk->task.func = NULL;
+			wrk->task.priv = NULL;
+			stats_clean = WRK_TrySumStat(wrk);
+			Lck_Lock(&pp->mtx);
+			continue;
+		}
+
 		if (wrk->do_what == pool_do_die)
 			break;
 
@@ -320,7 +344,8 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
 
 		if (wrk->do_what == pool_do_sess) {
 			stats_clean = 0;
-			SES_pool_task(pp, wrk, wrk->sp);
+			assert(wrk->pool == pp);
+			SES_pool_task(wrk, wrk->sp);
 		} else if (wrk->do_what == pool_do_nothing) {
 			/* we already did */
 		} else {
@@ -334,56 +359,6 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
 }
 
 /*--------------------------------------------------------------------
- * Queue a workrequest if possible.
- *
- * Return zero if the request was queued, negative if it wasn't.
- */
-
-static int
-pool_queue(struct pool *pp, struct sess *sp)
-{
-	struct worker *wrk;
-
-	Lck_Lock(&pp->mtx);
-
-	/* If there are idle threads, we tickle the first one into action */
-	wrk = VTAILQ_FIRST(&pp->idle);
-	if (wrk != NULL) {
-		VTAILQ_REMOVE(&pp->idle, wrk, list);
-		Lck_Unlock(&pp->mtx);
-		wrk->sp = sp;
-		wrk->do_what = pool_do_sess;
-		AZ(pthread_cond_signal(&wrk->cond));
-		return (0);
-	}
-
-	/* If we have too much in the queue already, refuse. */
-	if (pp->lqueue > (cache_param->queue_max * pp->nthr) / 100) {
-		pp->ndropped++;
-		Lck_Unlock(&pp->mtx);
-		return (-1);
-	}
-
-	VTAILQ_INSERT_TAIL(&pp->queue, sp, list);
-	pp->nqueued++;
-	pp->lqueue++;
-	Lck_Unlock(&pp->mtx);
-	AZ(pthread_cond_signal(&pp->herder_cond));
-	return (0);
-}
-
-/*--------------------------------------------------------------------*/
-
-int
-Pool_Schedule(struct pool *pp, struct sess *sp)
-{
-
-	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-	AZ(sp->wrk);
-	return(pool_queue(pp, sp));
-}
-
-/*--------------------------------------------------------------------
  * Create another thread, if necessary & possible
  */
 
diff --git a/bin/varnishd/cache/cache_session.c b/bin/varnishd/cache/cache_session.c
index ba5f005..871ed8f 100644
--- a/bin/varnishd/cache/cache_session.c
+++ b/bin/varnishd/cache/cache_session.c
@@ -133,19 +133,20 @@ SES_Alloc(void)
  */
 
 void
-SES_pool_task(struct pool *pp, struct worker *wrk, void *arg)
+SES_pool_task(struct worker *wrk, void *arg)
 {
 	struct sess *sp;
 
-	AN(pp);
 	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
 	CAST_OBJ_NOTNULL(sp, arg, SESS_MAGIC);
 
 	AZ(wrk->ws->r);
 	wrk->lastused = NAN;
 	THR_SetSession(sp);
-	// AZ(wrk->sp);
-	// wrk->sp = sp;
+	if (wrk->sp == NULL)
+		wrk->sp = sp;
+	else
+		assert(wrk->sp == sp);
 	AZ(sp->wrk);
 	sp->wrk = wrk;
 	CNT_Session(sp);
@@ -178,7 +179,11 @@ SES_Schedule(struct sess *sp)
 	CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
 	AN(pp->pool);
 
-	if (Pool_Schedule(pp->pool, sp)) {
+	AZ(sp->wrk);
+	sp->task.func = SES_pool_task;
+	sp->task.priv = sp;
+
+	if (Pool_Task(pp->pool, &sp->task, POOL_QUEUE_FRONT)) {
 		VSC_C_main->client_drop_late++;
 		sp->t_idle = VTIM_real();
 		if (sp->req != NULL && sp->req->vcl != NULL) {



More information about the varnish-commit mailing list