Sophie

Sophie

distrib > Fedora > 13 > i386 > media > updates-src > by-pkgid > 4b866545d03360c56a47dc93f05762a2 > files > 23

nss_db-2.2.3-0.5.pre1.fc13.src.rpm

*** dbinc/repmgr.h	2007-10-31 10:23:52.000000000 -0700
--- dbinc/repmgr.h	2007-10-31 10:23:53.000000000 -0700
***************
*** 36,41 ****
--- 36,55 ----
  #endif
  
  /*
+  * The (arbitrary) maximum number of outgoing messages we're willing to hold, on
+  * a queue per connection, waiting for TCP buffer space to become available in
+  * the kernel.  Rather than exceeding this limit, we simply discard additional
+  * messages (since this is always allowed by the replication protocol).
+  *    As a special dispensation, if a message is destined for a specific remote
+  * site (i.e., it's not a broadcast), then we first try blocking the sending
+  * thread, waiting for space to become available (though we only wait a limited
+  * time).  This is so as to be able to handle the immediate flood of (a
+  * potentially large number of) outgoing messages that replication generates, in
+  * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests.
+  */
+ #define	OUT_QUEUE_LIMIT	10
+ 
+ /*
   * The system value is available from sysconf(_SC_HOST_NAME_MAX).
   * Historically, the maximum host name was 256.
   */
***************
*** 47,52 ****
--- 61,71 ----
  #define	MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20)
  typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1];
  
+ /* Default timeout values, in seconds. */
+ #define	DB_REPMGR_DEFAULT_ACK_TIMEOUT		(1 * US_PER_SEC)
+ #define	DB_REPMGR_DEFAULT_CONNECTION_RETRY	(30 * US_PER_SEC)
+ #define	DB_REPMGR_DEFAULT_ELECTION_RETRY	(10 * US_PER_SEC)
+ 
  struct __repmgr_connection;
      typedef struct __repmgr_connection REPMGR_CONNECTION;
  struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE;
***************
*** 171,178 ****
  #ifdef DB_WIN32
  	WSAEVENT event_object;
  #endif
! #define	CONN_CONNECTING	0x01	/* nonblocking connect in progress */
! #define	CONN_DEFUNCT	0x02	/* socket close pending */
  	u_int32_t flags;
  
  	/*
--- 190,198 ----
  #ifdef DB_WIN32
  	WSAEVENT event_object;
  #endif
! #define	CONN_CONGESTED	0x01	/* msg thread wait has exceeded timeout */
! #define	CONN_CONNECTING	0x02	/* nonblocking connect in progress */
! #define	CONN_DEFUNCT	0x04	/* socket close pending */
  	u_int32_t flags;
  
  	/*
***************
*** 180,189 ****
  	 * send() function's thread.  But if TCP doesn't have enough network
  	 * buffer space for us when we first try it, we instead allocate some
  	 * memory, and copy the message, and then send it as space becomes
! 	 * available in our main select() thread.
  	 */
  	OUT_Q_HEADER outbound_queue;
  	int out_queue_length;
  
  	/*
  	 * Input: while we're reading a message, we keep track of what phase
--- 200,215 ----
  	 * send() function's thread.  But if TCP doesn't have enough network
  	 * buffer space for us when we first try it, we instead allocate some
  	 * memory, and copy the message, and then send it as space becomes
! 	 * available in our main select() thread.  In some cases, if the queue
! 	 * gets too long we wait until it's drained, and then append to it.
! 	 * This condition variable's associated mutex is the normal per-repmgr
! 	 * db_rep->mutex, because that mutex is always held anyway whenever the
! 	 * output queue is consulted.
  	 */
  	OUT_Q_HEADER outbound_queue;
  	int out_queue_length;
+ 	cond_var_t drained;
+ 	int blockers;		/* ref count of msg threads waiting on us */
  
  	/*
  	 * Input: while we're reading a message, we keep track of what phase
*** dbinc_auto/int_def.in	2007-10-31 10:23:52.000000000 -0700
--- dbinc_auto/int_def.in	2007-10-31 10:23:52.000000000 -0700
***************
*** 1420,1425 ****
--- 1420,1428 ----
  #define	__repmgr_wake_waiting_senders __repmgr_wake_waiting_senders@DB_VERSION_UNIQUE_NAME@
  #define	__repmgr_await_ack __repmgr_await_ack@DB_VERSION_UNIQUE_NAME@
  #define	__repmgr_compute_wait_deadline __repmgr_compute_wait_deadline@DB_VERSION_UNIQUE_NAME@
+ #define	__repmgr_await_drain __repmgr_await_drain@DB_VERSION_UNIQUE_NAME@
+ #define	__repmgr_alloc_cond __repmgr_alloc_cond@DB_VERSION_UNIQUE_NAME@
+ #define	__repmgr_free_cond __repmgr_free_cond@DB_VERSION_UNIQUE_NAME@
  #define	__repmgr_init_sync __repmgr_init_sync@DB_VERSION_UNIQUE_NAME@
  #define	__repmgr_close_sync __repmgr_close_sync@DB_VERSION_UNIQUE_NAME@
  #define	__repmgr_net_init __repmgr_net_init@DB_VERSION_UNIQUE_NAME@
*** dbinc_auto/repmgr_ext.h	2007-10-31 10:23:52.000000000 -0700
--- dbinc_auto/repmgr_ext.h	2007-10-31 10:23:52.000000000 -0700
***************
*** 21,30 ****
  int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
  void __repmgr_stash_generation __P((DB_ENV *));
  int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *));
  int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int));
! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
  int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
  int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
  int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
--- 21,30 ----
  int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
  void __repmgr_stash_generation __P((DB_ENV *));
  int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int));
  int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *));
! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
  int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
  int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
  int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
***************
*** 39,44 ****
--- 39,47 ----
  int __repmgr_wake_waiting_senders __P((DB_ENV *));
  int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *));
  void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t));
+ int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t));
+ int __repmgr_alloc_cond __P((cond_var_t *));
+ int __repmgr_free_cond __P((cond_var_t *));
  int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
  int __repmgr_close_sync __P((DB_ENV *));
  int __repmgr_net_init __P((DB_ENV *, DB_REP *));
*** repmgr/repmgr_method.c	2007-10-31 10:23:52.000000000 -0700
--- repmgr/repmgr_method.c	2007-10-31 10:23:53.000000000 -0700
***************
*** 196,204 ****
  	int ret;
  
  	/* Set some default values. */
! 	db_rep->ack_timeout = 1 * US_PER_SEC;			/*  1 second */
! 	db_rep->connection_retry_wait = 30 * US_PER_SEC;	/* 30 seconds */
! 	db_rep->election_retry_wait = 10 * US_PER_SEC;		/* 10 seconds */
  	db_rep->config_nsites = 0;
  	db_rep->peer = DB_EID_INVALID;
  	db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
--- 196,204 ----
  	int ret;
  
  	/* Set some default values. */
! 	db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
! 	db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
! 	db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
  	db_rep->config_nsites = 0;
  	db_rep->peer = DB_EID_INVALID;
  	db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
***************
*** 238,243 ****
--- 238,244 ----
  	DB_ENV *dbenv;
  {
  	DB_REP *db_rep;
+ 	REPMGR_CONNECTION *conn;
  	int ret;
  
  	db_rep = dbenv->rep_handle;
***************
*** 254,259 ****
--- 255,266 ----
  
  	if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
  		goto unlock;
+ 
+ 	TAILQ_FOREACH(conn, &db_rep->connections, entries) {
+ 		if (conn->blockers > 0 &&
+ 		    ((ret = __repmgr_signal(&conn->drained)) != 0))
+ 			goto unlock;
+ 	}
  	UNLOCK_MUTEX(db_rep->mutex);
  
  	return (__repmgr_wake_main_thread(dbenv));
*** repmgr/repmgr_msg.c	2007-10-31 10:23:52.000000000 -0700
--- repmgr/repmgr_msg.c	2007-10-31 10:23:53.000000000 -0700
***************
*** 183,192 ****
  
  /*
   * Acknowledges a message.
-  *
-  * !!!
-  * Note that this cannot be called from the select() thread, in case we call
-  * __repmgr_bust_connection(..., FALSE).
   */
  static int
  ack_message(dbenv, generation, lsn)
--- 183,188 ----
***************
*** 227,235 ****
  		rec2.size = 0;
  
  		conn = site->ref.conn;
  		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
! 		    &control2, &rec2)) == DB_REP_UNAVAIL)
! 			ret = __repmgr_bust_connection(dbenv, conn, FALSE);
  	}
  
  	UNLOCK_MUTEX(db_rep->mutex);
--- 223,236 ----
  		rec2.size = 0;
  
  		conn = site->ref.conn;
+ 		/*
+ 		 * It's hard to imagine anyone would care about a lost ack if
+ 		 * the path to the master is so congested as to need blocking;
+ 		 * so pass "blockable" argument as FALSE.
+ 		 */
  		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
! 		    &control2, &rec2, FALSE)) == DB_REP_UNAVAIL)
! 			ret = __repmgr_bust_connection(dbenv, conn);
  	}
  
  	UNLOCK_MUTEX(db_rep->mutex);
*** repmgr/repmgr_net.c	2007-10-31 10:23:52.000000000 -0700
--- repmgr/repmgr_net.c	2007-10-31 10:23:53.000000000 -0700
***************
*** 63,69 ****
  static void setup_sending_msg
      __P((struct sending_msg *, u_int, const DBT *, const DBT *));
  static int __repmgr_send_internal
!     __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
  static int enqueue_msg
      __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
  static int flatten __P((DB_ENV *, struct sending_msg *));
--- 63,69 ----
  static void setup_sending_msg
      __P((struct sending_msg *, u_int, const DBT *, const DBT *));
  static int __repmgr_send_internal
!     __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int));
  static int enqueue_msg
      __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
  static int flatten __P((DB_ENV *, struct sending_msg *));
***************
*** 73,85 ****
   * __repmgr_send --
   *	The send function for DB_ENV->rep_set_transport.
   *
-  * !!!
-  * This is only ever called as the replication transport call-back, which means
-  * it's either on one of our message processing threads or an application
-  * thread.  It mustn't be called from the select() thread, because we might call
-  * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the
-  * select() thread.
-  *
   * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
   * PUBLIC:     const DB_LSN *, int, u_int32_t));
   */
--- 73,78 ----
***************
*** 126,134 ****
  		}
  
  		conn = site->ref.conn;
  		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
! 		    control, rec)) == DB_REP_UNAVAIL &&
! 		    (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
  			ret = t_ret;
  		if (ret != 0)
  			goto out;
--- 119,128 ----
  		}
  
  		conn = site->ref.conn;
+ 		/* Pass the "blockable" argument as TRUE. */
  		if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
! 		    control, rec, TRUE)) == DB_REP_UNAVAIL &&
! 		    (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0)
  			ret = t_ret;
  		if (ret != 0)
  			goto out;
***************
*** 222,228 ****
  	if (site->state != SITE_CONNECTED)
  		return (NULL);
  
! 	if (F_ISSET(site->ref.conn, CONN_CONNECTING))
  		return (NULL);
  	return (site);
  }
--- 216,222 ----
  	if (site->state != SITE_CONNECTED)
  		return (NULL);
  
! 	if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT))
  		return (NULL);
  	return (site);
  }
***************
*** 235,244 ****
   *
   * !!!
   * Caller must hold dbenv->mutex.
-  *
-  * !!!
-  * Note that this cannot be called from the select() thread, in case we call
-  * __repmgr_bust_connection(..., FALSE).
   */
  static int
  __repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
--- 229,234 ----
***************
*** 268,281 ****
  		    !IS_VALID_EID(conn->eid))
  			continue;
  
! 		if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
  			site = SITE_FROM_EID(conn->eid);
  			nsites++;
  			if (site->priority > 0)
  				npeers++;
  		} else if (ret == DB_REP_UNAVAIL) {
! 			if ((ret = __repmgr_bust_connection(
! 			     dbenv, conn, FALSE)) != 0)
  				return (ret);
  		} else
  			return (ret);
--- 258,277 ----
  		    !IS_VALID_EID(conn->eid))
  			continue;
  
! 		/*
! 		 * Broadcast messages are either application threads committing
! 		 * transactions, or replication status message that we can
! 		 * afford to lose.  So don't allow blocking for them (pass
! 		 * "blockable" argument as FALSE).
! 		 */
! 		if ((ret = __repmgr_send_internal(dbenv,
! 		    conn, &msg, FALSE)) == 0) {
  			site = SITE_FROM_EID(conn->eid);
  			nsites++;
  			if (site->priority > 0)
  				npeers++;
  		} else if (ret == DB_REP_UNAVAIL) {
! 			if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
  				return (ret);
  		} else
  			return (ret);
***************
*** 301,339 ****
   * intersperse writes that are part of two single messages.
   *
   * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
!  * PUBLIC:    u_int, const DBT *, const DBT *));
   */
  int
! __repmgr_send_one(dbenv, conn, msg_type, control, rec)
  	DB_ENV *dbenv;
  	REPMGR_CONNECTION *conn;
  	u_int msg_type;
  	const DBT *control, *rec;
  {
  	struct sending_msg msg;
  
  	setup_sending_msg(&msg, msg_type, control, rec);
! 	return (__repmgr_send_internal(dbenv, conn, &msg));
  }
  
  /*
   * Attempts a "best effort" to send a message on the given site.  If there is an
!  * excessive backlog of message already queued on the connection, we simply drop
!  * this message, and still return 0 even in this case.
   */
  static int
! __repmgr_send_internal(dbenv, conn, msg)
  	DB_ENV *dbenv;
  	REPMGR_CONNECTION *conn;
  	struct sending_msg *msg;
  {
! #define	OUT_QUEUE_LIMIT 10	/* arbitrary, for now */
  	REPMGR_IOVECS iovecs;
  	SITE_STRING_BUFFER buffer;
  	int ret;
  	size_t nw;
  	size_t total_written;
  
  	DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
  	if (!STAILQ_EMPTY(&conn->outbound_queue)) {
  		/*
--- 297,355 ----
   * intersperse writes that are part of two single messages.
   *
   * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
!  * PUBLIC:    u_int, const DBT *, const DBT *, int));
   */
  int
! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable)
  	DB_ENV *dbenv;
  	REPMGR_CONNECTION *conn;
  	u_int msg_type;
  	const DBT *control, *rec;
+ 	int blockable;
  {
  	struct sending_msg msg;
  
  	setup_sending_msg(&msg, msg_type, control, rec);
! 	return (__repmgr_send_internal(dbenv, conn, &msg, blockable));
  }
  
  /*
   * Attempts a "best effort" to send a message on the given site.  If there is an
!  * excessive backlog of message already queued on the connection, what shall we
!  * do?  If the caller doesn't mind blocking, we'll wait (a limited amount of
!  * time) for the queue to drain.  Otherwise we'll simply drop the message.  This
!  * is always allowed by the replication protocol.  But in the case of a
!  * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we
!  * almost always get a flood of messages that instantly fills our queue, so
!  * blocking improves performance (by avoiding the need for the client to
!  * re-request).
!  *
!  * How long shall we wait?  We could of course create a new timeout
!  * configuration type, so that the application could set it directly.  But that
!  * would start to overwhelm the user with too many choices to think about.  We
!  * already have an ACK timeout, which is the user's estimate of how long it
!  * should take to send a message to the client, have it be processed, and return
!  * a message back to us.  We multiply that by the queue size, because that's how
!  * many messages have to be swallowed up by the client before we're able to
!  * start sending again (at least to a rough approximation).
   */
  static int
! __repmgr_send_internal(dbenv, conn, msg, blockable)
  	DB_ENV *dbenv;
  	REPMGR_CONNECTION *conn;
  	struct sending_msg *msg;
+ 	int blockable;
  {
! 	DB_REP *db_rep;
  	REPMGR_IOVECS iovecs;
  	SITE_STRING_BUFFER buffer;
+ 	db_timeout_t drain_to;
  	int ret;
  	size_t nw;
  	size_t total_written;
  
+ 	db_rep = dbenv->rep_handle;
+ 
  	DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
  	if (!STAILQ_EMPTY(&conn->outbound_queue)) {
  		/*
***************
*** 344,358 ****
  		RPRINT(dbenv, (dbenv, "msg to %s to be queued",
  		    __repmgr_format_eid_loc(dbenv->rep_handle,
  		    conn->eid, buffer)));
  		if (conn->out_queue_length < OUT_QUEUE_LIMIT)
  			return (enqueue_msg(dbenv, conn, msg, 0));
  		else {
  			RPRINT(dbenv, (dbenv, "queue limit exceeded"));
  			STAT(dbenv->rep_handle->
  			    region->mstat.st_msgs_dropped++);
! 			return (0);
  		}
  	}
  
  	/*
  	 * Send as much data to the site as we can, without blocking.  Keep
--- 360,393 ----
  		RPRINT(dbenv, (dbenv, "msg to %s to be queued",
  		    __repmgr_format_eid_loc(dbenv->rep_handle,
  		    conn->eid, buffer)));
+ 		if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
+ 		    blockable && !F_ISSET(conn, CONN_CONGESTED)) {
+ 			RPRINT(dbenv, (dbenv,
+ 			    "block msg thread, await queue space"));
+ 
+ 			if ((drain_to = db_rep->ack_timeout) == 0)
+ 				drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
+ 			conn->blockers++;
+ 			ret = __repmgr_await_drain(dbenv,
+ 			    conn, drain_to * OUT_QUEUE_LIMIT);
+ 			conn->blockers--;
+ 			if (db_rep->finished)
+ 				return (DB_TIMEOUT);
+ 			if (ret != 0)
+ 				return (ret);
+ 			if (STAILQ_EMPTY(&conn->outbound_queue))
+ 				goto empty;
+ 		}
  		if (conn->out_queue_length < OUT_QUEUE_LIMIT)
  			return (enqueue_msg(dbenv, conn, msg, 0));
  		else {
  			RPRINT(dbenv, (dbenv, "queue limit exceeded"));
  			STAT(dbenv->rep_handle->
  			    region->mstat.st_msgs_dropped++);
! 			return (blockable ? DB_TIMEOUT : 0);
  		}
  	}
+ empty:
  
  	/*
  	 * Send as much data to the site as we can, without blocking.  Keep
***************
*** 498,521 ****
  
  /*
   * Abandons a connection, to recover from an error.  Upon entry the conn struct
!  * must be on the connections list.
!  *
!  * If the 'do_close' flag is true, we do the whole job; the clean-up includes
!  * removing the struct from the list and freeing all its memory, so upon return
!  * the caller must not refer to it any further.  Otherwise, we merely mark the
!  * connection for clean-up later by the main thread.
   *
   * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
!  * PUBLIC:     REPMGR_CONNECTION *, int));
   *
   * !!!
   * Caller holds mutex.
   */
  int
! __repmgr_bust_connection(dbenv, conn, do_close)
  	DB_ENV *dbenv;
  	REPMGR_CONNECTION *conn;
- 	int do_close;
  {
  	DB_REP *db_rep;
  	int connecting, ret, eid;
--- 533,553 ----
  
  /*
   * Abandons a connection, to recover from an error.  Upon entry the conn struct
!  * must be on the connections list.  For now, just mark it as unusable; it will
!  * be fully cleaned up in the top-level select thread, as soon as possible.
   *
   * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
!  * PUBLIC:     REPMGR_CONNECTION *));
   *
   * !!!
   * Caller holds mutex.
+  *
+  * Must be idempotent
   */
  int
! __repmgr_bust_connection(dbenv, conn)
  	DB_ENV *dbenv;
  	REPMGR_CONNECTION *conn;
  {
  	DB_REP *db_rep;
  	int connecting, ret, eid;
***************
*** 526,537 ****
  	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
  	eid = conn->eid;
  	connecting = F_ISSET(conn, CONN_CONNECTING);
! 	if (do_close)
! 		__repmgr_cleanup_connection(dbenv, conn);
! 	else {
! 		F_SET(conn, CONN_DEFUNCT);
! 		conn->eid = -1;
! 	}
  
  	/*
  	 * When we first accepted the incoming connection, we set conn->eid to
--- 558,566 ----
  	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
  	eid = conn->eid;
  	connecting = F_ISSET(conn, CONN_CONNECTING);
! 
! 	F_SET(conn, CONN_DEFUNCT);
! 	conn->eid = -1;
  
  	/*
  	 * When we first accepted the incoming connection, we set conn->eid to
***************
*** 557,563 ****
  			    dbenv, ELECT_FAILURE_ELECTION)) != 0)
  				return (ret);
  		}
! 	} else if (!do_close) {
  		/*
  		 * One way or another, make sure the main thread is poked, so
  		 * that we do the deferred clean-up.
--- 586,592 ----
  			    dbenv, ELECT_FAILURE_ELECTION)) != 0)
  				return (ret);
  		}
! 	} else {
  		/*
  		 * One way or another, make sure the main thread is poked, so
  		 * that we do the deferred clean-up.
***************
*** 568,577 ****
  }
  
  /*
!  * PUBLIC: void __repmgr_cleanup_connection
   * PUBLIC:    __P((DB_ENV *, REPMGR_CONNECTION *));
   */
! void
  __repmgr_cleanup_connection(dbenv, conn)
  	DB_ENV *dbenv;
  	REPMGR_CONNECTION *conn;
--- 597,610 ----
  }
  
  /*
!  * PUBLIC: int __repmgr_cleanup_connection
   * PUBLIC:    __P((DB_ENV *, REPMGR_CONNECTION *));
+  *
+  * !!!
+  * Idempotent.  This can be called repeatedly as blocking message threads (of
+  * which there could be multiples) wake up in case of error on the connection.
   */
! int
  __repmgr_cleanup_connection(dbenv, conn)
  	DB_ENV *dbenv;
  	REPMGR_CONNECTION *conn;
***************
*** 580,596 ****
  	QUEUED_OUTPUT *out;
  	REPMGR_FLAT *msg;
  	DBT *dbt;
  
  	db_rep = dbenv->rep_handle;
  
! 	TAILQ_REMOVE(&db_rep->connections, conn, entries);
  	if (conn->fd != INVALID_SOCKET) {
! 		(void)closesocket(conn->fd);
  #ifdef DB_WIN32
! 		(void)WSACloseEvent(conn->event_object);
  #endif
  	}
  
  	/*
  	 * Deallocate any input and output buffers we may have.
  	 */
--- 613,643 ----
  	QUEUED_OUTPUT *out;
  	REPMGR_FLAT *msg;
  	DBT *dbt;
+ 	int ret;
  
  	db_rep = dbenv->rep_handle;
  
! 	DB_ASSERT(dbenv, F_ISSET(conn, CONN_DEFUNCT) || db_rep->finished);
! 	
  	if (conn->fd != INVALID_SOCKET) {
! 		ret = closesocket(conn->fd);
! 		conn->fd = INVALID_SOCKET;
! 		if (ret == SOCKET_ERROR) {
! 			ret = net_errno;
! 			__db_err(dbenv, ret, "closing socket");
! 		}
  #ifdef DB_WIN32
! 		if (!WSACloseEvent(conn->event_object) && ret != 0)
! 			ret = net_errno;
  #endif
+ 		if (ret != 0)
+ 			return (ret);
  	}
  
+ 	if (conn->blockers > 0)
+ 		return (__repmgr_signal(&conn->drained));
+ 
+ 	TAILQ_REMOVE(&db_rep->connections, conn, entries);
  	/*
  	 * Deallocate any input and output buffers we may have.
  	 */
***************
*** 614,620 ****
--- 661,669 ----
  		__os_free(dbenv, out);
  	}
  
+ 	ret = __repmgr_free_cond(&conn->drained);
  	__os_free(dbenv, conn);
+ 	return (ret);
  }
  
  static int
***************
*** 1063,1069 ****
  
  	while (!TAILQ_EMPTY(&db_rep->connections)) {
  		conn = TAILQ_FIRST(&db_rep->connections);
! 		__repmgr_cleanup_connection(dbenv, conn);
  	}
  
  	for (i = 0; i < db_rep->site_cnt; i++) {
--- 1112,1118 ----
  
  	while (!TAILQ_EMPTY(&db_rep->connections)) {
  		conn = TAILQ_FIRST(&db_rep->connections);
! 		(void)__repmgr_cleanup_connection(dbenv, conn);
  	}
  
  	for (i = 0; i < db_rep->site_cnt; i++) {
*** repmgr/repmgr_posix.c	2007-10-31 10:23:52.000000000 -0700
--- repmgr/repmgr_posix.c	2007-10-31 10:23:53.000000000 -0700
***************
*** 21,26 ****
--- 21,28 ----
  size_t __repmgr_guesstimated_max = (128 * 1024);
  #endif
  
+ static int __repmgr_conn_work __P((DB_ENV *,
+     REPMGR_CONNECTION *, fd_set *, fd_set *, int));
  static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *));
  
  /*
***************
*** 189,194 ****
--- 191,284 ----
  }
  
  /*
+  * PUBLIC: int __repmgr_await_drain __P((DB_ENV *,
+  * PUBLIC:    REPMGR_CONNECTION *, db_timeout_t));
+  *
+  * Waits for space to become available on the connection's output queue.
+  * Various ways we can exit:
+  *
+  * 1. queue becomes non-full
+  * 2. exceed time limit
+  * 3. connection becomes defunct (due to error in another thread)
+  * 4. repmgr is shutting down
+  * 5. any unexpected system resource failure
+  *
+  * In cases #3 and #5 we return an error code.  Caller is responsible for
+  * distinguishing the remaining cases if desired.
+  * 
+  * !!!
+  * Caller must hold repmgr->mutex.
+  */
+ int
+ __repmgr_await_drain(dbenv, conn, timeout)
+ 	DB_ENV *dbenv;
+ 	REPMGR_CONNECTION *conn;
+ 	db_timeout_t timeout;
+ {
+ 	DB_REP *db_rep;
+ 	struct timespec deadline;
+ 	int ret;
+ 
+ 	db_rep = dbenv->rep_handle;
+ 
+ 	__repmgr_compute_wait_deadline(dbenv, &deadline, timeout);
+ 
+ 	ret = 0;
+ 	while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
+ 		ret = pthread_cond_timedwait(&conn->drained,
+ 		    &db_rep->mutex, &deadline);
+ 		switch (ret) {
+ 		case 0:
+ 			if (db_rep->finished)
+ 				goto out; /* #4. */
+ 			/*
+ 			 * Another thread could have stumbled into an error on
+ 			 * the socket while we were waiting.
+ 			 */
+ 			if (F_ISSET(conn, CONN_DEFUNCT)) {
+ 				ret = DB_REP_UNAVAIL; /* #3. */
+ 				goto out;
+ 			}
+ 			break;
+ 		case ETIMEDOUT:
+ 			F_SET(conn, CONN_CONGESTED);
+ 			ret = 0;
+ 			goto out; /* #2. */
+ 		default:
+ 			goto out; /* #5. */
+ 		}
+ 	}
+ 	/* #1. */
+ 
+ out:
+ 	return (ret);
+ }
+ 
+ /*
+  * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *));
+  *
+  * Initialize a condition variable (in allocated space).
+  */
+ int
+ __repmgr_alloc_cond(c)
+ 	cond_var_t *c;
+ {
+ 	return (pthread_cond_init(c, NULL));
+ }
+ 
+ /*
+  * PUBLIC: int __repmgr_free_cond __P((cond_var_t *));
+  *
+  * Clean up a previously initialized condition variable.
+  */
+ int
+ __repmgr_free_cond(c)
+ 	cond_var_t *c;
+ {
+ 	return (pthread_cond_destroy(c));
+ }
+ 
+ /*
   * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
   *
   * Allocate/initialize all data necessary for thread synchronization.  This
***************
*** 443,449 ****
  	REPMGR_RETRY *retry;
  	db_timespec timeout;
  	fd_set reads, writes;
! 	int ret, flow_control, maxfd, nready;
  	u_int8_t buf[10];	/* arbitrary size */
  
  	flow_control = FALSE;
--- 533,539 ----
  	REPMGR_RETRY *retry;
  	db_timespec timeout;
  	fd_set reads, writes;
! 	int ret, flow_control, maxfd;
  	u_int8_t buf[10];	/* arbitrary size */
  
  	flow_control = FALSE;
***************
*** 477,482 ****
--- 567,575 ----
  		 * each one.
  		 */
  		TAILQ_FOREACH(conn, &db_rep->connections, entries) {
+ 			if (F_ISSET(conn, CONN_DEFUNCT))
+ 				continue;
+ 			
  			if (F_ISSET(conn, CONN_CONNECTING)) {
  				FD_SET((u_int)conn->fd, &reads);
  				FD_SET((u_int)conn->fd, &writes);
***************
*** 533,616 ****
  				return (ret);
  			}
  		}
- 		nready = ret;
- 
  		LOCK_MUTEX(db_rep->mutex);
  
- 		/*
- 		 * The first priority thing we must do is to clean up any
- 		 * pending defunct connections.  Otherwise, if they have any
- 		 * lingering pending input, we get very confused if we try to
- 		 * process it.
- 		 *
- 		 * The TAILQ_FOREACH macro would be suitable here, except that
- 		 * it doesn't allow unlinking the current element, which is
- 		 * needed for cleanup_connection.
- 		 */
- 		for (conn = TAILQ_FIRST(&db_rep->connections);
- 		     conn != NULL;
- 		     conn = next) {
- 			next = TAILQ_NEXT(conn, entries);
- 			if (F_ISSET(conn, CONN_DEFUNCT))
- 				__repmgr_cleanup_connection(dbenv, conn);
- 		}
- 
  		if ((ret = __repmgr_retry_connections(dbenv)) != 0)
  			goto out;
- 		if (nready == 0)
- 			continue;
  
  		/*
! 		 * Traverse the linked list.  (Again, like TAILQ_FOREACH, except
! 		 * that we need the ability to unlink an element along the way.)
  		 */
  		for (conn = TAILQ_FIRST(&db_rep->connections);
  		     conn != NULL;
  		     conn = next) {
  			next = TAILQ_NEXT(conn, entries);
! 			if (F_ISSET(conn, CONN_CONNECTING)) {
! 				if (FD_ISSET((u_int)conn->fd, &reads) ||
! 				    FD_ISSET((u_int)conn->fd, &writes)) {
! 					if ((ret = finish_connecting(dbenv,
! 					    conn)) == DB_REP_UNAVAIL) {
! 						if ((ret =
! 						    __repmgr_bust_connection(
! 						    dbenv, conn, TRUE)) != 0)
! 							goto out;
! 					} else if (ret != 0)
! 						goto out;
! 				}
! 				continue;
! 			}
! 
! 			/*
! 			 * Here, the site is connected, and the FD_SET's are
! 			 * valid.
! 			 */
! 			if (FD_ISSET((u_int)conn->fd, &writes)) {
! 				if ((ret = __repmgr_write_some(
! 				    dbenv, conn)) == DB_REP_UNAVAIL) {
! 					if ((ret =
! 					    __repmgr_bust_connection(dbenv,
! 					    conn, TRUE)) != 0)
! 						goto out;
! 					continue;
! 				} else if (ret != 0)
! 					goto out;
! 			}
! 
! 			if (!flow_control &&
! 			    FD_ISSET((u_int)conn->fd, &reads)) {
! 				if ((ret = __repmgr_read_from_site(dbenv, conn))
! 				    == DB_REP_UNAVAIL) {
! 					if ((ret =
! 					    __repmgr_bust_connection(dbenv,
! 					    conn, TRUE)) != 0)
! 						goto out;
! 					continue;
! 				} else if (ret != 0)
! 					goto out;
! 			}
  		}
  
  		/*
--- 626,650 ----
  				return (ret);
  			}
  		}
  		LOCK_MUTEX(db_rep->mutex);
  
  		if ((ret = __repmgr_retry_connections(dbenv)) != 0)
  			goto out;
  
  		/*
! 		 * Examine each connection, to see what work needs to be done.
! 		 * 
! 		 * The TAILQ_FOREACH macro would be suitable here, except that
! 		 * it doesn't allow unlinking the current element, which is
! 		 * needed for cleanup_connection.
  		 */
  		for (conn = TAILQ_FIRST(&db_rep->connections);
  		     conn != NULL;
  		     conn = next) {
  			next = TAILQ_NEXT(conn, entries);
! 			if ((ret = __repmgr_conn_work(dbenv,
! 			    conn, &reads, &writes, flow_control)) != 0)
! 				goto out;
  		}
  
  		/*
***************
*** 637,642 ****
--- 671,719 ----
  }
  
  static int
+ __repmgr_conn_work(dbenv, conn, reads, writes, flow_control)
+ 	DB_ENV *dbenv;
+ 	REPMGR_CONNECTION *conn;
+ 	fd_set *reads, *writes;
+ 	int flow_control;
+ {
+ 	int ret;
+ 	u_int fd;
+ 
+ 	if (F_ISSET(conn, CONN_DEFUNCT)) {
+ 		/*
+ 		 * Deferred clean-up, from an error that happened in another
+ 		 * thread, while we were sleeping in select().
+ 		*/
+ 		return (__repmgr_cleanup_connection(dbenv, conn));
+ 	}
+ 
+ 	ret = 0;
+ 	fd = (u_int)conn->fd;
+ 	
+ 	if (F_ISSET(conn, CONN_CONNECTING)) {
+ 		if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes))
+ 			ret = finish_connecting(dbenv, conn);
+ 	} else {
+ 		/*
+ 		 * Here, the site is connected, and the FD_SET's are valid.
+ 		 */
+ 		if (FD_ISSET(fd, writes))
+ 			ret = __repmgr_write_some(dbenv, conn);
+ 				
+ 		if (ret == 0 && !flow_control && FD_ISSET(fd, reads))
+ 			ret = __repmgr_read_from_site(dbenv, conn);
+ 	}
+ 
+ 	if (ret == DB_REP_UNAVAIL) {
+ 		if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
+ 			return (ret);
+ 		ret = __repmgr_cleanup_connection(dbenv, conn);
+ 	}
+ 	return (ret);
+ }
+ 
+ static int
  finish_connecting(dbenv, conn)
  	DB_ENV *dbenv;
  	REPMGR_CONNECTION *conn;
***************
*** 657,662 ****
--- 734,740 ----
  		goto err_rpt;
  	}
  
+ 	DB_ASSERT(dbenv, F_ISSET(conn, CONN_CONNECTING));
  	F_CLR(conn, CONN_CONNECTING);
  	return (__repmgr_send_handshake(dbenv, conn));
  
***************
*** 671,690 ****
  	    "connecting to %s", __repmgr_format_site_loc(site, buffer));
  
  	/* If we've exhausted the list of possible addresses, give up. */
! 	if (ADDR_LIST_NEXT(&site->net_addr) == NULL)
  		return (DB_REP_UNAVAIL);
  
  	/*
  	 * This is just like a little mini-"bust_connection", except that we
  	 * don't reschedule for later, 'cuz we're just about to try again right
! 	 * now.
  	 *
  	 * !!!
  	 * Which means this must only be called on the select() thread, since
  	 * only there are we allowed to actually close a connection.
  	 */
  	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
! 	__repmgr_cleanup_connection(dbenv, conn);
  	ret = __repmgr_connect_site(dbenv, eid);
  	DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
  	return (ret);
--- 749,773 ----
  	    "connecting to %s", __repmgr_format_site_loc(site, buffer));
  
  	/* If we've exhausted the list of possible addresses, give up. */
! 	if (ADDR_LIST_NEXT(&site->net_addr) == NULL) {
! 		STAT(db_rep->region->mstat.st_connect_fail++);
  		return (DB_REP_UNAVAIL);
+ 	}
  
  	/*
  	 * This is just like a little mini-"bust_connection", except that we
  	 * don't reschedule for later, 'cuz we're just about to try again right
! 	 * now.  (Note that we don't have to worry about message threads
! 	 * blocking on a full output queue: that can't happen when we're only
! 	 * just connecting.)
  	 *
  	 * !!!
  	 * Which means this must only be called on the select() thread, since
  	 * only there are we allowed to actually close a connection.
  	 */
  	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
! 	if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
! 		return (ret);
  	ret = __repmgr_connect_site(dbenv, eid);
  	DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
  	return (ret);
*** repmgr/repmgr_sel.c	2007-10-31 10:23:52.000000000 -0700
--- repmgr/repmgr_sel.c	2007-10-31 10:23:53.000000000 -0700
***************
*** 36,45 ****
  
  /*
   * PUBLIC: int __repmgr_accept __P((DB_ENV *));
-  *
-  * !!!
-  * Only ever called in the select() thread, since we may call
-  * __repmgr_bust_connection(..., TRUE).
   */
  int
  __repmgr_accept(dbenv)
--- 36,41 ----
***************
*** 133,139 ****
  	case 0:
  		return (0);
  	case DB_REP_UNAVAIL:
! 		return (__repmgr_bust_connection(dbenv, conn, TRUE));
  	default:
  		return (ret);
  	}
--- 129,135 ----
  	case 0:
  		return (0);
  	case DB_REP_UNAVAIL:
! 		return (__repmgr_bust_connection(dbenv, conn));
  	default:
  		return (ret);
  	}
***************
*** 254,263 ****
   * starting with the "current" element of its address list and trying as many
   * addresses as necessary until the list is exhausted.
   *
-  * !!!
-  * Only ever called in the select() thread, since we may call
-  * __repmgr_bust_connection(..., TRUE).
-  *
   * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid));
   */
  int
--- 250,255 ----
***************
*** 332,338 ****
  		case 0:
  			break;
  		case DB_REP_UNAVAIL:
! 			return (__repmgr_bust_connection(dbenv, con, TRUE));
  		default:
  			return (ret);
  		}
--- 324,330 ----
  		case 0:
  			break;
  		case DB_REP_UNAVAIL:
! 			return (__repmgr_bust_connection(dbenv, con));
  		default:
  			return (ret);
  		}
***************
*** 437,443 ****
  
  	DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
  
! 	return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec));
  }
  
  /*
--- 429,443 ----
  
  	DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
  
! 	/*
! 	 * It would of course be disastrous to block the select() thread, so
! 	 * pass the "blockable" argument as FALSE.  Fortunately blocking should
! 	 * never be necessary here, because the hand-shake is always the first
! 	 * thing we send.  Which is a good thing, because it would be almost as
! 	 * disastrous if we allowed ourselves to drop a handshake.
! 	 */
! 	return (__repmgr_send_one(dbenv,
! 	    conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE));
  }
  
  /*
***************
*** 854,859 ****
--- 854,872 ----
  			conn->out_queue_length--;
  			if (--msg->ref_count <= 0)
  				__os_free(dbenv, msg);
+ 
+ 			/*
+ 			 * We've achieved enough movement to free up at least
+ 			 * one space in the outgoing queue.  Wake any message
+ 			 * threads that may be waiting for space.  Clear the
+ 			 * CONGESTED status so that when the queue reaches the
+ 			 * high-water mark again, the filling thread will be
+ 			 * allowed to try waiting again.
+ 			 */
+ 			F_CLR(conn, CONN_CONGESTED);
+ 			if (conn->blockers > 0 &&
+ 			    (ret = __repmgr_signal(&conn->drained)) != 0)
+ 				return (ret);
  		}
  	}
  
*** repmgr/repmgr_util.c	2007-10-31 10:23:52.000000000 -0700
--- repmgr/repmgr_util.c	2007-10-31 10:23:53.000000000 -0700
***************
*** 103,108 ****
--- 103,113 ----
  	db_rep = dbenv->rep_handle;
  	if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0)
  		return (ret);
+ 	if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) {
+ 		__os_free(dbenv, c);
+ 		return (ret);
+ 	}
+ 	c->blockers = 0;
  
  	c->fd = s;
  	c->flags = flags;
*** repmgr/repmgr_windows.c	2007-10-31 10:23:52.000000000 -0700
--- repmgr/repmgr_windows.c	2007-10-31 10:23:53.000000000 -0700
***************
*** 11,16 ****
--- 11,19 ----
  #define	__INCLUDE_NETWORKING	1
  #include "db_int.h"
  
+ /* Convert time-out from microseconds to milliseconds, rounding up. */
+ #define	DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS)
+ 
  typedef struct __ack_waiter {
  	HANDLE event;
  	const DB_LSN *lsnp;
***************
*** 120,136 ****
  {
  	DB_REP *db_rep;
  	ACK_WAITER *me;
! 	DWORD ret;
! 	DWORD timeout;
  
  	db_rep = dbenv->rep_handle;
  
  	if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
  		goto err;
  
- 	/* convert time-out from microseconds to milliseconds, rounding up */
  	timeout = db_rep->ack_timeout > 0 ?
! 	    ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE;
  	me->lsnp = lsnp;
  	if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
  	    FALSE)) == WAIT_FAILED) {
--- 123,137 ----
  {
  	DB_REP *db_rep;
  	ACK_WAITER *me;
! 	DWORD ret, timeout;
  
  	db_rep = dbenv->rep_handle;
  
  	if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
  		goto err;
  
  	timeout = db_rep->ack_timeout > 0 ?
! 	    DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE;
  	me->lsnp = lsnp;
  	if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
  	    FALSE)) == WAIT_FAILED) {
***************
*** 211,216 ****
--- 212,296 ----
  	db_rep->waiters->first_free = slot;
  }
  
+ /* (See requirements described in repmgr_posix.c.) */
+ int
+ __repmgr_await_drain(dbenv, conn, timeout)
+ 	DB_ENV *dbenv;
+ 	REPMGR_CONNECTION *conn;
+ 	db_timeout_t timeout;
+ {
+ 	DB_REP *db_rep;
+ 	db_timespec deadline, delta, now;
+ 	db_timeout_t t;
+ 	DWORD duration, ret;
+ 	int round_up;
+ 	
+ 	db_rep = dbenv->rep_handle;
+ 	
+ 	__os_gettime(dbenv, &deadline);
+ 	DB_TIMEOUT_TO_TIMESPEC(timeout, &delta);
+ 	timespecadd(&deadline, &delta);
+ 
+ 	while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
+ 		if (!ResetEvent(conn->drained))
+ 			return (GetLastError());
+ 
+ 		/* How long until the deadline? */
+ 		__os_gettime(dbenv, &now);
+ 		if (timespeccmp(&now, &deadline, >=)) {
+ 			F_SET(conn, CONN_CONGESTED);
+ 			return (0);
+ 		}
+ 		delta = deadline;
+ 		timespecsub(&delta, &now);
+ 		round_up = TRUE;
+ 		DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up);
+ 		duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t);
+ 
+ 		ret = SignalObjectAndWait(db_rep->mutex,
+ 		    conn->drained, duration, FALSE);
+ 		LOCK_MUTEX(db_rep->mutex);
+ 		if (ret == WAIT_FAILED)
+ 			return (GetLastError());
+ 		else if (ret == WAIT_TIMEOUT) {
+ 			F_SET(conn, CONN_CONGESTED);
+ 			return (0);
+ 		} else
+ 			DB_ASSERT(dbenv, ret == WAIT_OBJECT_0);
+ 		
+ 		if (db_rep->finished)
+ 			return (0);
+ 		if (F_ISSET(conn, CONN_DEFUNCT))
+ 			return (DB_REP_UNAVAIL);
+ 	}
+ 	return (0);
+ }
+ 
+ /*
+  * Creates a manual reset event, which is usually our best choice when we may
+  * have multiple threads waiting on a single event.
+  */
+ int
+ __repmgr_alloc_cond(c)
+ 	cond_var_t *c;
+ {
+ 	HANDLE event;
+ 
+ 	if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)
+ 		return (GetLastError());
+ 	*c = event;
+ 	return (0);
+ }
+ 
+ int
+ __repmgr_free_cond(c)
+ 	cond_var_t *c;
+ {
+ 	if (CloseHandle(*c))
+ 		return (0);
+ 	return (GetLastError());
+ }
+ 
  /*
   * Make resource allocation an all-or-nothing affair, outside of this and the
   * close_sync function.  db_rep->waiters should be non-NULL iff all of these
***************
*** 488,493 ****
--- 568,576 ----
  		 * don't hurt anything flow-control-wise.
  		 */
  		TAILQ_FOREACH(conn, &db_rep->connections, entries) {
+ 			if (F_ISSET(conn, CONN_DEFUNCT))
+ 				continue;
+ 
  			if (F_ISSET(conn, CONN_CONNECTING) ||
  			    !STAILQ_EMPTY(&conn->outbound_queue) ||
  			    (!flow_control || !IS_VALID_EID(conn->eid))) {
***************
*** 534,541 ****
  		     conn != NULL;
  		     conn = next) {
  			next = TAILQ_NEXT(conn, entries);
! 			if (F_ISSET(conn, CONN_DEFUNCT))
! 				__repmgr_cleanup_connection(dbenv, conn);
  		}
  
  		/*
--- 617,626 ----
  		     conn != NULL;
  		     conn = next) {
  			next = TAILQ_NEXT(conn, entries);
! 			if (F_ISSET(conn, CONN_DEFUNCT) &&
! 			    (ret = __repmgr_cleanup_connection(dbenv,
! 			    conn)) != 0)
! 				goto unlock;
  		}
  
  		/*
***************
*** 587,597 ****
  	return (ret);
  }
  
- /*
-  * !!!
-  * Only ever called on the select() thread, since we may call
-  * __repmgr_bust_connection(..., TRUE).
-  */
  static int
  handle_completion(dbenv, conn)
  	DB_ENV *dbenv;
--- 672,677 ----
***************
*** 651,660 ****
  		}
  	}
  
! 	return (0);
! 
! err:	if (ret == DB_REP_UNAVAIL)
! 		return (__repmgr_bust_connection(dbenv, conn, TRUE));
  	return (ret);
  }
  
--- 731,742 ----
  		}
  	}
  
! err:
! 	if (ret == DB_REP_UNAVAIL) {
! 		if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
! 			return (ret);
! 		ret = __repmgr_cleanup_connection(dbenv, conn);
! 	}
  	return (ret);
  }
  
***************
*** 708,714 ****
  	}
  
  	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
! 	__repmgr_cleanup_connection(dbenv, conn);
  	ret = __repmgr_connect_site(dbenv, eid);
  	DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
  	return (ret);
--- 790,797 ----
  	}
  
  	DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
! 	if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
! 		return (ret);
  	ret = __repmgr_connect_site(dbenv, eid);
  	DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
  	return (ret);