Sophie

Sophie

distrib > Mageia > 5 > i586 > media > core-release-src > by-pkgid > 60e6e2afcfa8f217b321328e333275a6 > files > 5

pacemaker-1.1.8-9.mga5.src.rpm

diff --git a/cib/callbacks.c b/cib/callbacks.c
index 86cfd92..aa5f6a8 100644
--- a/cib/callbacks.c
+++ b/cib/callbacks.c
@@ -347,13 +347,18 @@ struct qb_ipcs_service_handlers ipc_rw_callbacks =
         int rid = 0;
 
         if(sync_reply) {
-            CRM_LOG_ASSERT(client_obj->request_id);
+            if (client_obj->ipc) {
+                CRM_LOG_ASSERT(client_obj->request_id);
 
-            rid = client_obj->request_id;
-            client_obj->request_id = 0;
+                rid = client_obj->request_id;
+                client_obj->request_id = 0;
 
-            crm_trace("Sending response %d to %s %s",
+                crm_trace("Sending response %d to %s %s",
                       rid, client_obj->name, from_peer?"(originator of delegated request)":"");
+            } else {
+                crm_trace("Sending response to %s %s",
+                      client_obj->name, from_peer?"(originator of delegated request)":"");
+            }
 
         } else {
             crm_trace("Sending an event to %s %s",
diff --git a/cib/callbacks.h b/cib/callbacks.h
index 99a5065..b8af997 100644
--- a/cib/callbacks.h
+++ b/cib/callbacks.h
@@ -41,18 +41,21 @@
     char *name;
     char *callback_id;
     char *user;
+    char *recv_buf;
     int request_id;
 
     qb_ipcs_connection_t *ipc;
 
 #ifdef HAVE_GNUTLS_GNUTLS_H
     gnutls_session *session;
+    gboolean handshake_complete;
 #else
     void *session;
 #endif
     gboolean encrypted;
+    gboolean remote_auth;
     mainloop_io_t *remote;
-        
+
     unsigned long num_calls;
 
     int pre_notify;
@@ -60,6 +63,7 @@
     int confirmations;
     int replace;
     int diffs;
+    int remote_auth_timeout;
 
     GList *delegated_calls;
 } cib_client_t;
diff --git a/cib/notify.c b/cib/notify.c
index 9dcb699..1dcda8f 100644
--- a/cib/notify.c
+++ b/cib/notify.c
@@ -83,7 +83,7 @@ void do_cib_notify(int options, const char *op, xmlNode * update,
     CRM_CHECK(client != NULL, return TRUE);
     CRM_CHECK(update_msg != NULL, return TRUE);
 
-    if (client->ipc == NULL) {
+    if (client->ipc == NULL && client->session == NULL) {
         crm_warn("Skipping client with NULL channel");
         return FALSE;
     }
diff --git a/cib/remote.c b/cib/remote.c
index 7ad7132..f9b90a0 100644
--- a/cib/remote.c
+++ b/cib/remote.c
@@ -60,10 +60,6 @@
 #  endif
 #endif
 
-#ifdef HAVE_DECL_NANOSLEEP
-#  include <time.h>
-#endif
-
 extern int remote_tls_fd;
 extern gboolean cib_shutdown_flag;
 
@@ -73,17 +69,16 @@
 #ifdef HAVE_GNUTLS_GNUTLS_H
 #  define DH_BITS 1024
 gnutls_dh_params dh_params;
-extern gnutls_anon_server_credentials anon_cred_s;
+gnutls_anon_server_credentials anon_cred_s;
 static void
 debug_log(int level, const char *str)
 {
     fputs(str, stderr);
 }
-
-extern gnutls_session *create_tls_session(int csock, int type);
-
 #endif
 
+#define REMOTE_AUTH_TIMEOUT 10000
+
 int num_clients;
 int authenticate_user(const char *user, const char *passwd);
 int cib_remote_listen(gpointer data);
@@ -121,7 +116,7 @@
 #else
         crm_notice("Starting a tls listener on port %d.", port);
         gnutls_global_init();
-/* 	gnutls_global_set_log_level (10); */
+        /* gnutls_global_set_log_level (10); */
         gnutls_global_set_log_function(debug_log);
         gnutls_dh_params_init(&dh_params);
         gnutls_dh_params_generate2(dh_params, DH_BITS);
@@ -215,37 +210,89 @@
     return FALSE;
 }
 
+static gboolean
+cib_remote_auth(xmlNode *login)
+{
+    const char *user = NULL;
+    const char *pass = NULL;
+    const char *tmp = NULL;
+
+    crm_log_xml_info(login, "Login: ");
+    if (login == NULL) {
+        return FALSE;
+    }
+
+    tmp = crm_element_name(login);
+    if (safe_str_neq(tmp, "cib_command")) {
+        crm_err("Wrong tag: %s", tmp);
+        return FALSE;
+    }
+
+    tmp = crm_element_value(login, "op");
+    if (safe_str_neq(tmp, "authenticate")) {
+        crm_err("Wrong operation: %s", tmp);
+        return FALSE;
+    }
+
+    user = crm_element_value(login, "user");
+    pass = crm_element_value(login, "password");
+
+    if (!user || !pass) {
+        crm_err("missing auth credentials");
+        return FALSE;
+    }
+
+    /* Non-root daemons can only validate the password of the
+     * user they're running as
+     */
+    if (check_group_membership(user, CRM_DAEMON_GROUP) == FALSE) {
+        crm_err("User is not a member of the required group");
+        return FALSE;
+
+    } else if (authenticate_user(user, pass) == FALSE) {
+        crm_err("PAM auth failed");
+        return FALSE;
+    }
+
+    return TRUE;
+}
+
+static gboolean
+remote_auth_timeout_cb(gpointer data)
+{
+    cib_client_t *client = data;
+
+    client->remote_auth_timeout = 0;
+
+    if (client->remote_auth == TRUE) {
+        return FALSE;
+    }
+
+    mainloop_del_fd(client->remote);
+    crm_err("Remote client authentication timed out");
+
+    return FALSE;
+}
 int
 cib_remote_listen(gpointer data)
 {
-    int lpc = 0;
     int csock = 0;
     unsigned laddr;
-    time_t now = 0;
-    time_t start = time(NULL);
     struct sockaddr_in addr;
     int ssock = *(int *)data;
+    int flag;
 
 #ifdef HAVE_GNUTLS_GNUTLS_H
     gnutls_session *session = NULL;
 #endif
     cib_client_t *new_client = NULL;
 
-    xmlNode *login = NULL;
-    const char *user = NULL;
-    const char *pass = NULL;
-    const char *tmp = NULL;
-
-#ifdef HAVE_DECL_NANOSLEEP
-    const struct timespec sleepfast = { 0, 10000000 };  /* 10 millisec */
-#endif
-
     static struct mainloop_fd_callbacks remote_client_fd_callbacks = 
         {
             .dispatch = cib_remote_msg,
             .destroy = cib_remote_connection_destroy,
-        };    
-    
+        };
+
     /* accept the connection */
     laddr = sizeof(addr);
     csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
@@ -257,10 +304,22 @@
         return TRUE;
     }
 
+    if ((flag = fcntl(csock, F_GETFL)) >= 0) {
+        if (fcntl(csock, F_SETFL, flag | O_NONBLOCK) < 0) {
+            crm_err( "fcntl() write failed");
+            close(csock);
+            return TRUE;
+        }
+    } else {
+        crm_err( "fcntl() read failed");
+        close(csock);
+        return TRUE;
+    }
+
     if (ssock == remote_tls_fd) {
 #ifdef HAVE_GNUTLS_GNUTLS_H
         /* create gnutls session for the server socket */
-        session = create_tls_session(csock, GNUTLS_SERVER);
+        session = crm_create_anon_tls_session(csock, GNUTLS_SERVER, anon_cred_s);
         if (session == NULL) {
             crm_err("TLS session creation failed");
             close(csock);
@@ -269,73 +328,13 @@
 #endif
     }
 
-    do {
-        crm_trace("Iter: %d", lpc++);
-        if (ssock == remote_tls_fd) {
-#ifdef HAVE_GNUTLS_GNUTLS_H
-            login = crm_recv_remote_msg(session, TRUE);
-#endif
-        } else {
-            login = crm_recv_remote_msg(GINT_TO_POINTER(csock), FALSE);
-        }
-        if (login != NULL) {
-            break;
-        }
-#ifdef HAVE_DECL_NANOSLEEP
-        nanosleep(&sleepfast, NULL);
-#else
-        sleep(1);
-#endif
-        now = time(NULL);
-
-        /* Peers have 3s to connect */
-    } while (login == NULL && (start - now) < 4);
-
-    crm_log_xml_info(login, "Login: ");
-    if (login == NULL) {
-        goto bail;
-    }
-
-    tmp = crm_element_name(login);
-    if (safe_str_neq(tmp, "cib_command")) {
-        crm_err("Wrong tag: %s", tmp);
-        goto bail;
-    }
-
-    tmp = crm_element_value(login, "op");
-    if (safe_str_neq(tmp, "authenticate")) {
-        crm_err("Wrong operation: %s", tmp);
-        goto bail;
-    }
-
-    user = crm_element_value(login, "user");
-    pass = crm_element_value(login, "password");
-
-    /* Non-root daemons can only validate the password of the
-     * user they're running as
-     */
-    if (check_group_membership(user, CRM_DAEMON_GROUP) == FALSE) {
-        crm_err("User is not a member of the required group");
-        goto bail;
-
-    } else if (authenticate_user(user, pass) == FALSE) {
-        crm_err("PAM auth failed");
-        goto bail;
-    }
-
-    /* send ACK */
     num_clients++;
     new_client = calloc(1, sizeof(cib_client_t));
-    new_client->name = crm_element_value_copy(login, "name");
-
-    CRM_CHECK(new_client->id == NULL, free(new_client->id));
     new_client->id = crm_generate_uuid();
-
-#if ENABLE_ACL
-    new_client->user = strdup(user);
-#endif
-
     new_client->callback_id = NULL;
+    /* clients have a few seconds to perform handshake. */
+    new_client->remote_auth_timeout = g_timeout_add(REMOTE_AUTH_TIMEOUT, remote_auth_timeout_cb, new_client);
+
     if (ssock == remote_tls_fd) {
 #ifdef HAVE_GNUTLS_GNUTLS_H
         new_client->encrypted = TRUE;
@@ -345,37 +344,19 @@
         new_client->session = GINT_TO_POINTER(csock);
     }
 
-    free_xml(login);
-    login = create_xml_node(NULL, "cib_result");
-    crm_xml_add(login, F_CIB_OPERATION, CRM_OP_REGISTER);
-    crm_xml_add(login, F_CIB_CLIENTID, new_client->id);
-    crm_send_remote_msg(new_client->session, login, new_client->encrypted);
-    free_xml(login);
-
     new_client->remote = mainloop_add_fd(
         "cib-remote-client", G_PRIORITY_DEFAULT, csock, new_client, &remote_client_fd_callbacks);
 
     g_hash_table_insert(client_list, new_client->id, new_client);
 
     return TRUE;
-
-  bail:
-    if (ssock == remote_tls_fd) {
-#ifdef HAVE_GNUTLS_GNUTLS_H
-        gnutls_bye(*session, GNUTLS_SHUT_RDWR);
-        gnutls_deinit(*session);
-        gnutls_free(session);
-#endif
-    }
-    close(csock);
-    free_xml(login);
-    return TRUE;
 }
 
 void
 cib_remote_connection_destroy(gpointer user_data)
 {
     cib_client_t *client = user_data;
+    int csock = 0;
 
     if (client == NULL) {
         return;
@@ -393,10 +374,36 @@
     crm_trace("Destroying %s (%p)", client->name, user_data);
     num_clients--;
     crm_trace("Num unfree'd clients: %d", num_clients);
+    if (client->remote_auth_timeout) {
+        g_source_remove(client->remote_auth_timeout);
+    }
+
+    if (client->encrypted) {
+#ifdef HAVE_GNUTLS_GNUTLS_H
+        if (client->session) {
+            void *sock_ptr = gnutls_transport_get_ptr(*client->session);
+            csock = GPOINTER_TO_INT(sock_ptr);
+            if (client->handshake_complete) {
+                gnutls_bye(*client->session, GNUTLS_SHUT_WR);
+            }
+            gnutls_deinit(*client->session);
+            gnutls_free(client->session);
+        }
+#endif
+    } else {
+        csock = GPOINTER_TO_INT(client->session);
+    }
+    client->session = NULL;
+
+    if (csock > 0) {
+        close(csock);
+    }
+
     free(client->name);
     free(client->callback_id);
     free(client->id);
     free(client->user);
+    free(client->recv_buf);
     free(client);
     crm_trace("Freed the cib client");
 
@@ -406,24 +413,15 @@
     return;
 }
 
-int
-cib_remote_msg(gpointer data)
+static void
+cib_handle_remote_msg(cib_client_t *client, xmlNode *command)
 {
     const char *value = NULL;
-    xmlNode *command = NULL;
-    cib_client_t *client = data;
-
-    crm_trace("%s callback", client->encrypted ? "secure" : "clear-text");
-
-    command = crm_recv_remote_msg(client->session, client->encrypted);
-    if (command == NULL) {
-        return -1;
-    }
 
     value = crm_element_name(command);
     if (safe_str_neq(value, "cib_command")) {
         crm_log_xml_trace(command, "Bad command: ");
-        goto bail;
+        return;
     }
 
     if (client->name == NULL) {
@@ -472,9 +470,95 @@
 
     crm_log_xml_trace(command, "Remote command: ");
     cib_common_callback_worker(0, 0, command, client, TRUE);
-  bail:
-    free_xml(command);
-    command = NULL;
+}
+
+int
+cib_remote_msg(gpointer data)
+{
+    xmlNode *command = NULL;
+    cib_client_t *client = data;
+    int disconnected = 0;
+    int timeout = client->remote_auth ? -1 : 1000;
+
+    crm_trace("%s callback", client->encrypted ? "secure" : "clear-text");
+
+#ifdef HAVE_GNUTLS_GNUTLS_H
+    if (client->encrypted && (client->handshake_complete == FALSE)) {
+        int rc = 0;
+
+        /* Muliple calls to handshake will be required, this callback
+         * will be invoked once the client sends more handshake data. */
+        do {
+            rc = gnutls_handshake(*client->session);
+
+            if (rc < 0 && rc != GNUTLS_E_AGAIN) {
+                crm_err("Remote cib tls handshake failed");
+                return -1;
+            }
+        } while (rc == GNUTLS_E_INTERRUPTED);
+
+        if (rc == 0) {
+            crm_debug("Remote cib tls handshake completed");
+            client->handshake_complete = TRUE;
+            if (client->remote_auth_timeout) {
+                g_source_remove(client->remote_auth_timeout);
+            }
+            /* after handshake, clients must send auth in a few seconds */
+            client->remote_auth_timeout = g_timeout_add(REMOTE_AUTH_TIMEOUT, remote_auth_timeout_cb, client);
+        }
+        return 0;
+    }
+#endif
+
+    crm_recv_remote_msg(client->session, &client->recv_buf, client->encrypted, timeout, &disconnected);
+
+    /* must pass auth before we will process anything else */
+    if (client->remote_auth == FALSE) {
+        xmlNode *reg;
+#if ENABLE_ACL
+        const char *user = NULL;
+#endif
+        command = crm_parse_remote_buffer(&client->recv_buf);
+        if (cib_remote_auth(command) == FALSE) {
+            free_xml(command);
+            return -1;
+        }
+
+        crm_debug("remote connection authenticated successfully");
+        client->remote_auth = TRUE;
+        g_source_remove(client->remote_auth_timeout);
+        client->remote_auth_timeout = 0;
+        client->name = crm_element_value_copy(command, "name");
+
+#if ENABLE_ACL
+        user = crm_element_value(command, "user");
+        if (user) {
+           new_client->user = strdup(user);
+        }
+#endif
+
+        /* send ACK */
+        reg = create_xml_node(NULL, "cib_result");
+        crm_xml_add(reg, F_CIB_OPERATION, CRM_OP_REGISTER);
+        crm_xml_add(reg, F_CIB_CLIENTID, client->id);
+        crm_send_remote_msg(client->session, reg, client->encrypted);
+        free_xml(reg);
+        free_xml(command);
+    }
+
+    command = crm_parse_remote_buffer(&client->recv_buf);
+    while (command) {
+        crm_trace("command received");
+        cib_handle_remote_msg(client, command);
+        free_xml(command);
+        command = crm_parse_remote_buffer(&client->recv_buf);
+    }
+
+    if (disconnected) {
+        crm_trace("disconnected while receiving remote cib msg.");
+        return -1;
+    }
+
     return 0;
 }
 
diff --git a/include/crm_internal.h b/include/crm_internal.h
index cf6d95d..388af59 100644
--- a/include/crm_internal.h
+++ b/include/crm_internal.h
@@ -199,8 +199,40 @@ xmlNode *create_operation_update(xmlNode * parent, lrmd_event_data_t *event, con
 long long crm_int_helper(const char *text, char **end_text);
 char *crm_concat(const char *prefix, const char *suffix, char join);
 char *generate_hash_key(const char *crm_msg_reference, const char *sys);
-xmlNode *crm_recv_remote_msg(void *session, gboolean encrypted);
-void crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted);
+
+
+/*! remote tcp/tls helper functions */
+gboolean crm_recv_remote_msg(void *session, char **recv_buf, gboolean encrypted, int total_timeout_ms, int *disconnected);
+char *crm_recv_remote_raw(void *data, gboolean encrypted, size_t max_recv, size_t *recv_len, int *disconnected);
+int crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted);
+int crm_recv_remote_ready(void *session, gboolean encrypted, int timeout_ms);
+xmlNode *crm_parse_remote_buffer(char **msg_buf);
+int crm_remote_tcp_connect(const char *host, int port);
+
+#ifdef HAVE_GNUTLS_GNUTLS_H
+/*!
+ * \internal
+ * \brief Initiate the client handshake after establishing the tcp socket.
+ * \note This is a blocking function, it will block until the entire handshake
+ *       is complete or until the timeout period is reached.
+ * \retval 0 success
+ * \retval negative, failure
+ */
+int crm_initiate_client_tls_handshake(void *session_data, int timeout_ms);
+/*!
+ * \internal
+ * \brief Create client or server session for anon DH encryption credentials
+ * \param sock, the socket the session will use for transport
+ * \param type, GNUTLS_SERVER or GNUTLS_CLIENT
+ * \param credentials, gnutls_anon_server_credentials_t or gnutls_anon_client_credentials_t
+ *
+ * \retval gnutls_session * on success
+ * \retval NULL on failure
+ */
+void *crm_create_anon_tls_session(int sock, int type, void *credentials);
+#endif
+
+#define REMOTE_MSG_TERMINATOR "\r\n\r\n"
 
 const char *daemon_option(const char *option);
 void set_daemon_option(const char *option, const char *value);
diff --git a/lib/cib/cib_remote.c b/lib/cib/cib_remote.c
index 91bca96..ca80c0e 100644
--- a/lib/cib/cib_remote.c
+++ b/lib/cib/cib_remote.c
@@ -38,14 +38,15 @@
 #ifdef HAVE_GNUTLS_GNUTLS_H
 #  undef KEYFILE
 #  include <gnutls/gnutls.h>
-extern gnutls_anon_client_credentials anon_cred_c;
-extern gnutls_session *create_tls_session(int csock, int type);
+gnutls_anon_client_credentials anon_cred_c;
+#define DEFAULT_CLIENT_HANDSHAKE_TIMEOUT 5000 /* 5 seconds */
 
 const int kx_prio[] = {
     GNUTLS_KX_ANON_DH,
     0
 };
 
+static gboolean remote_gnutls_credentials_init = FALSE;
 #else
 typedef void gnutls_session;
 #endif
@@ -61,6 +62,7 @@ struct remote_connection_s {
     gnutls_session *session;
     mainloop_io_t *source;
     char *token;
+    char *recv_buf;
 };
 
 typedef struct cib_remote_opaque_s {
@@ -76,7 +78,8 @@ struct remote_connection_s {
 } cib_remote_opaque_t;
 
 void cib_remote_connection_destroy(gpointer user_data);
-int cib_remote_dispatch(gpointer user_data);
+int cib_remote_callback_dispatch(gpointer user_data);
+int cib_remote_command_dispatch(gpointer user_data);
 int cib_remote_signon(cib_t * cib, const char *name, enum cib_conn_type type);
 int cib_remote_signoff(cib_t * cib);
 int cib_remote_free(cib_t * cib);
@@ -158,117 +161,91 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c
 {
     cib_remote_opaque_t *private = cib->variant_opaque;
 
-    shutdown(private->command.socket, SHUT_RDWR);       /* no more receptions */
-    shutdown(private->callback.socket, SHUT_RDWR);      /* no more receptions */
-    close(private->command.socket);
-    close(private->callback.socket);
-
 #ifdef HAVE_GNUTLS_GNUTLS_H
     if (private->command.encrypted) {
-        gnutls_bye(*(private->command.session), GNUTLS_SHUT_RDWR);
-        gnutls_deinit(*(private->command.session));
-        gnutls_free(private->command.session);
-
-        gnutls_bye(*(private->callback.session), GNUTLS_SHUT_RDWR);
-        gnutls_deinit(*(private->callback.session));
-        gnutls_free(private->callback.session);
+        if (private->command.session) {
+            gnutls_bye(*(private->command.session), GNUTLS_SHUT_RDWR);
+            gnutls_deinit(*(private->command.session));
+            gnutls_free(private->command.session);
+        }
 
-        gnutls_anon_free_client_credentials(anon_cred_c);
-        gnutls_global_deinit();
+        if (private->callback.session) {
+            gnutls_bye(*(private->callback.session), GNUTLS_SHUT_RDWR);
+            gnutls_deinit(*(private->callback.session));
+            gnutls_free(private->callback.session);
+        }
+        private->command.session = NULL;
+        private->callback.session = NULL;
+        if (remote_gnutls_credentials_init) {
+            gnutls_anon_free_client_credentials(anon_cred_c);
+            gnutls_global_deinit();
+            remote_gnutls_credentials_init = FALSE;
+        }
     }
 #endif
+
+    if (private->command.socket) {
+        shutdown(private->command.socket, SHUT_RDWR);       /* no more receptions */
+        close(private->command.socket);
+    }
+    if (private->callback.socket) {
+        shutdown(private->callback.socket, SHUT_RDWR);      /* no more receptions */
+        close(private->callback.socket);
+    }
+    private->command.socket = 0;
+    private->callback.socket = 0;
+
+    free(private->command.recv_buf);
+    free(private->callback.recv_buf);
+    private->command.recv_buf = NULL;
+    private->callback.recv_buf = NULL;
+
     return 0;
 }
 
 static int
-cib_tls_signon(cib_t * cib, struct remote_connection_s *connection)
+cib_tls_signon(cib_t * cib, struct remote_connection_s *connection, gboolean event_channel)
 {
     int sock;
     cib_remote_opaque_t *private = cib->variant_opaque;
-    struct sockaddr_in addr;
     int rc = 0;
-    char *server = private->server;
-
-    int ret_ga;
-    struct addrinfo *res;
-    struct addrinfo hints;
+    int disconnected = 0;
 
     xmlNode *answer = NULL;
     xmlNode *login = NULL;
 
-    static struct mainloop_fd_callbacks cib_fd_callbacks = 
-        {
-            .dispatch = cib_remote_dispatch,
-            .destroy = cib_remote_connection_destroy,
-        };
+    static struct mainloop_fd_callbacks cib_fd_callbacks = { 0, };
+
+    cib_fd_callbacks.dispatch = event_channel ? cib_remote_callback_dispatch : cib_remote_command_dispatch;
+    cib_fd_callbacks.destroy = cib_remote_connection_destroy;
 
     connection->socket = 0;
     connection->session = NULL;
 
-    /* create socket */
-    sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
-    if (sock == -1) {
-        crm_perror(LOG_ERR, "Socket creation failed");
-        return -1;
-    }
-
-    /* getaddrinfo */
-    bzero(&hints, sizeof(struct addrinfo));
-    hints.ai_flags = AI_CANONNAME;
-    hints.ai_family = AF_INET;
-    hints.ai_socktype = SOCK_RAW;
-
-    if (hints.ai_family == AF_INET6) {
-        hints.ai_protocol = IPPROTO_ICMPV6;
-    } else {
-        hints.ai_protocol = IPPROTO_ICMP;
-    }
-
-    crm_debug("Looking up %s", server);
-    ret_ga = getaddrinfo(server, NULL, &hints, &res);
-    if (ret_ga) {
-        crm_err("getaddrinfo: %s", gai_strerror(ret_ga));
-        close(sock);
-        return -1;
-    }
-
-    if (res->ai_canonname) {
-        server = res->ai_canonname;
-    }
-
-    crm_debug("Got address %s for %s", server, private->server);
-
-    if (!res->ai_addr) {
-        fprintf(stderr, "getaddrinfo failed");
-        crm_exit(1);
-    }
-#if 1
-    memcpy(&addr, res->ai_addr, res->ai_addrlen);
-#else
-    /* connect to server */
-    memset(&addr, 0, sizeof(addr));
-    addr.sin_family = AF_INET;
-    addr.sin_addr.s_addr = inet_addr(server);
-#endif
-    addr.sin_port = htons(private->port);
-
-    if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
-        crm_perror(LOG_ERR, "Connection to %s:%d failed", server, private->port);
-        close(sock);
-        return -1;
+    sock = crm_remote_tcp_connect(private->server, private->port);
+    if (sock <= 0) {
+        crm_perror(LOG_ERR, "remote tcp connection to %s:%d failed", private->server, private->port);
     }
 
+    connection->socket = sock;
     if (connection->encrypted) {
         /* initialize GnuTls lib */
 #ifdef HAVE_GNUTLS_GNUTLS_H
-        gnutls_global_init();
-        gnutls_anon_allocate_client_credentials(&anon_cred_c);
+        if (remote_gnutls_credentials_init == FALSE) {
+            gnutls_global_init();
+            gnutls_anon_allocate_client_credentials(&anon_cred_c);
+            remote_gnutls_credentials_init = TRUE;
+        }
 
         /* bind the socket to GnuTls lib */
-        connection->session = create_tls_session(sock, GNUTLS_CLIENT);
-        if (connection->session == NULL) {
-            crm_perror(LOG_ERR, "Session creation for %s:%d failed", server, private->port);
-            close(sock);
+        connection->session = crm_create_anon_tls_session(sock, GNUTLS_CLIENT, anon_cred_c);
+
+        if (crm_initiate_client_tls_handshake(connection->session, DEFAULT_CLIENT_HANDSHAKE_TIMEOUT) != 0) {
+            crm_err("Session creation for %s:%d failed", private->server, private->port);
+
+            gnutls_deinit(*connection->session);
+            gnutls_free(connection->session);
+            connection->session = NULL;
             cib_tls_close(cib);
             return -1;
         }
@@ -289,7 +266,14 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c
     crm_send_remote_msg(connection->session, login, connection->encrypted);
     free_xml(login);
 
-    answer = crm_recv_remote_msg(connection->session, connection->encrypted);
+    crm_recv_remote_msg(connection->session, &connection->recv_buf, connection->encrypted, -1, &disconnected);
+
+    if (disconnected) {
+        rc = -ENOTCONN;
+    }
+
+    answer = crm_parse_remote_buffer(&connection->recv_buf);
+
     crm_log_xml_trace(answer, "Reply");
     if (answer == NULL) {
         rc = -EPROTO;
@@ -310,12 +294,15 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c
             connection->token = strdup(tmp_ticket);
         }
     }
+    free_xml(answer);
+    answer = NULL;
 
     if (rc != 0) {
         cib_tls_close(cib);
+        return rc;
     }
 
-    connection->socket = sock;
+    crm_trace("remote client connection established");
     connection->source = mainloop_add_fd("cib-remote", G_PRIORITY_HIGH, connection->socket, cib, &cib_fd_callbacks);
     return rc;
 }
@@ -331,35 +318,61 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c
 }
 
 int
-cib_remote_dispatch(gpointer user_data)
+cib_remote_command_dispatch(gpointer user_data)
+{
+    int disconnected = 0;
+    cib_t *cib = user_data;
+    cib_remote_opaque_t *private = cib->variant_opaque;
+
+    crm_recv_remote_msg(private->command.session, &private->command.recv_buf, private->command.encrypted, -1, &disconnected);
+
+    free(private->command.recv_buf);
+    private->command.recv_buf = NULL;
+    crm_err("received late reply for remote cib connection, discarding");
+
+    if (disconnected) {
+        return -1;
+    }
+    return 0;
+}
+
+int
+cib_remote_callback_dispatch(gpointer user_data)
 {
     cib_t *cib = user_data;
     cib_remote_opaque_t *private = cib->variant_opaque;
 
     xmlNode *msg = NULL;
-    const char *type = NULL;
+    int disconnected = 0;
 
     crm_info("Message on callback channel");
-    msg = crm_recv_remote_msg(private->callback.session, private->callback.encrypted);
 
-    type = crm_element_value(msg, F_TYPE);
-    crm_trace("Activating %s callbacks...", type);
+    crm_recv_remote_msg(private->callback.session, &private->callback.recv_buf, private->callback.encrypted, -1, &disconnected);
 
-    if (safe_str_eq(type, T_CIB)) {
-        cib_native_callback(cib, msg, 0, 0);
+    msg = crm_parse_remote_buffer(&private->callback.recv_buf);
+    while (msg) {
+        const char *type = crm_element_value(msg, F_TYPE);
+        crm_trace("Activating %s callbacks...", type);
 
-    } else if (safe_str_eq(type, T_CIB_NOTIFY)) {
-        g_list_foreach(cib->notify_list, cib_native_notify, msg);
+        if (safe_str_eq(type, T_CIB)) {
+            cib_native_callback(cib, msg, 0, 0);
 
-    } else {
-        crm_err("Unknown message type: %s", type);
-    }
+        } else if (safe_str_eq(type, T_CIB_NOTIFY)) {
+            g_list_foreach(cib->notify_list, cib_native_notify, msg);
+
+        } else {
+            crm_err("Unknown message type: %s", type);
+        }
 
-    if (msg != NULL) {
         free_xml(msg);
-        return 0;
+        msg = crm_parse_remote_buffer(&private->callback.recv_buf);
+    }
+
+    if (disconnected) {
+        return -1;
     }
-    return -1;
+
+    return 0;
 }
 
 int
@@ -394,11 +407,11 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c
     }
 
     if (rc == pcmk_ok) {
-        rc = cib_tls_signon(cib, &(private->command));
+        rc = cib_tls_signon(cib, &(private->command), FALSE);
     }
 
     if (rc == pcmk_ok) {
-        rc = cib_tls_signon(cib, &(private->callback));
+        rc = cib_tls_signon(cib, &(private->callback), TRUE);
     }
 
     if (rc == pcmk_ok) {
@@ -463,37 +476,20 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c
     return rc;
 }
 
-static gboolean timer_expired = FALSE;
-static struct timer_rec_s *sync_timer = NULL;
-static gboolean
-cib_timeout_handler(gpointer data)
-{
-    struct timer_rec_s *timer = data;
-
-    timer_expired = TRUE;
-    crm_err("Call %d timed out after %ds", timer->call_id, timer->timeout);
-
-    /* Always return TRUE, never remove the handler
-     * We do that after the while-loop in cib_native_perform_op()
-     */
-    return TRUE;
-}
-
 int
 cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char *section,
                       xmlNode * data, xmlNode ** output_data, int call_options, const char *name)
 {
     int rc = pcmk_ok;
+    int disconnected = 0;
+    int remaining_time = 0;
+    time_t start_time;
 
     xmlNode *op_msg = NULL;
     xmlNode *op_reply = NULL;
 
     cib_remote_opaque_t *private = cib->variant_opaque;
 
-    if (sync_timer == NULL) {
-        sync_timer = calloc(1, sizeof(struct timer_rec_s));
-    }
-
     if (cib->state == cib_disconnected) {
         return -ENOTCONN;
     }
@@ -524,7 +520,11 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c
     }
 
     crm_trace("Sending %s message to CIB service", op);
-    crm_send_remote_msg(private->command.session, op_msg, private->command.encrypted);
+    if (!(call_options & cib_sync_call)) {
+        crm_send_remote_msg(private->callback.session, op_msg, private->command.encrypted);
+    } else {
+        crm_send_remote_msg(private->command.session, op_msg, private->command.encrypted);
+    }
     free_xml(op_msg);
 
     if ((call_options & cib_discard_reply)) {
@@ -537,30 +537,21 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c
 
     crm_trace("Waiting for a syncronous reply");
 
-    if (cib->call_timeout > 0) {
-        /* We need this, even with msgfromIPC_timeout(), because we might
-         * get other/older replies that don't match the active request
-         */
-        timer_expired = FALSE;
-        sync_timer->call_id = cib->call_id;
-        sync_timer->timeout = cib->call_timeout * 1000;
-        sync_timer->ref = g_timeout_add(sync_timer->timeout, cib_timeout_handler, sync_timer);
-    }
+    start_time = time(NULL);
+    remaining_time = cib->call_timeout ? cib->call_timeout : 60;
 
-    while (timer_expired == FALSE) {
+    while (remaining_time > 0 && !disconnected) {
         int reply_id = -1;
         int msg_id = cib->call_id;
 
-        op_reply = crm_recv_remote_msg(private->command.session, private->command.encrypted);
-        if (op_reply == NULL) {
+        crm_recv_remote_msg(private->command.session, &private->command.recv_buf, private->command.encrypted, remaining_time * 1000, &disconnected);
+        op_reply = crm_parse_remote_buffer(&private->command.recv_buf);
+
+        if (!op_reply) {
             break;
         }
 
         crm_element_value_int(op_reply, F_CIB_CALLID, &reply_id);
-        CRM_CHECK(reply_id > 0, free_xml(op_reply);
-                  if (sync_timer->ref > 0) {
-                  g_source_remove(sync_timer->ref); sync_timer->ref = 0;}
-                  return -ENOMSG) ;
 
         if (reply_id == msg_id) {
             break;
@@ -579,15 +570,9 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c
 
         free_xml(op_reply);
         op_reply = NULL;
-    }
-
-    if (sync_timer->ref > 0) {
-        g_source_remove(sync_timer->ref);
-        sync_timer->ref = 0;
-    }
 
-    if (timer_expired) {
-        return -ETIME;
+        /* wasn't the right reply, try and read some more */
+        remaining_time = time(NULL) - start_time;
     }
 
     /* if(IPC_ISRCONN(native->command_channel) == FALSE) { */
@@ -596,7 +581,10 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c
     /*      cib->state = cib_disconnected; */
     /* } */
 
-    if (op_reply == NULL) {
+    if (disconnected) {
+        crm_err("Disconnected while waiting for reply.");
+        return -ENOTCONN;
+    } else if (op_reply == NULL) {
         crm_err("No reply message - empty");
         return -ENOMSG;
     }
diff --git a/lib/common/mainloop.c b/lib/common/mainloop.c
index 09cf6e9..1e413b6 100644
--- a/lib/common/mainloop.c
+++ b/lib/common/mainloop.c
@@ -178,6 +178,7 @@ struct trigger_s {
     source->trigger = FALSE;
     if (source->id > 0) {
         g_source_remove(source->id);
+        source->id = 0;
     }
     return TRUE;
 }
diff --git a/lib/common/remote.c b/lib/common/remote.c
index 7f04097..ae61481 100644
--- a/lib/common/remote.c
+++ b/lib/common/remote.c
@@ -25,8 +25,10 @@
 #include <sys/stat.h>
 #include <unistd.h>
 #include <sys/socket.h>
-
+#include <arpa/inet.h>
 #include <netinet/ip.h>
+#include <netdb.h>
+
 
 #include <stdlib.h>
 #include <errno.h>
@@ -42,7 +44,7 @@
 #endif
 
 #ifdef HAVE_GNUTLS_GNUTLS_H
-const int tls_kx_order[] = {
+const int anon_tls_kx_order[] = {
     GNUTLS_KX_ANON_DH,
     GNUTLS_KX_DHE_RSA,
     GNUTLS_KX_DHE_DSS,
@@ -50,22 +52,32 @@
     0
 };
 
-gnutls_anon_client_credentials anon_cred_c;
-gnutls_anon_server_credentials anon_cred_s;
-static char *cib_send_tls(gnutls_session * session, xmlNode * msg);
-static char *cib_recv_tls(gnutls_session * session);
-#endif
+int
+crm_initiate_client_tls_handshake(void *session_data, int timeout_ms)
+{
+    int rc = 0;
+    int pollrc = 0;
+    time_t start = time(NULL);
+    gnutls_session *session = session_data;
 
-char *cib_recv_plaintext(int sock);
-char *cib_send_plaintext(int sock, xmlNode * msg);
+    do {
+        rc = gnutls_handshake(*session);
+        if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
+            pollrc = crm_recv_remote_ready(session, TRUE, 1000);
+            if (pollrc < 0) {
+                /* poll returned error, there is no hope */
+                rc = -1;
+            }
+        }
+    } while (((time(NULL) - start) < (timeout_ms/1000)) &&
+            (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
 
-#ifdef HAVE_GNUTLS_GNUTLS_H
-gnutls_session *create_tls_session(int csock, int type);
+    return rc;
+}
 
-gnutls_session *
-create_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ )
+void *
+crm_create_anon_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */, void *credentials)
 {
-    int rc = 0;
     gnutls_session *session = gnutls_malloc(sizeof(gnutls_session));
 
     gnutls_init(session, type);
@@ -75,266 +87,619 @@
 /*	gnutls_priority_set_direct (*session, "NONE:+VERS-TLS-ALL:+CIPHER-ALL:+MAC-ALL:+SIGN-ALL:+COMP-ALL:+ANON-DH", NULL); */
 #  else
     gnutls_set_default_priority(*session);
-    gnutls_kx_set_priority(*session, tls_kx_order);
+    gnutls_kx_set_priority(*session, anon_tls_kx_order);
 #  endif
     gnutls_transport_set_ptr(*session, (gnutls_transport_ptr) GINT_TO_POINTER(csock));
     switch (type) {
-        case GNUTLS_SERVER:
-            gnutls_credentials_set(*session, GNUTLS_CRD_ANON, anon_cred_s);
-            break;
-        case GNUTLS_CLIENT:
-            gnutls_credentials_set(*session, GNUTLS_CRD_ANON, anon_cred_c);
-            break;
+    case GNUTLS_SERVER:
+        gnutls_credentials_set(*session, GNUTLS_CRD_ANON, (gnutls_anon_server_credentials_t) credentials);
+        break;
+    case GNUTLS_CLIENT:
+        gnutls_credentials_set(*session, GNUTLS_CRD_ANON, (gnutls_anon_client_credentials_t) credentials);
+        break;
     }
 
-    do {
-        rc = gnutls_handshake(*session);
-    } while (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN);
-
-    if (rc < 0) {
-        crm_err("Handshake failed: %s", gnutls_strerror(rc));
-        gnutls_deinit(*session);
-        gnutls_free(session);
-        return NULL;
-    }
     return session;
 }
 
-static char *
-cib_send_tls(gnutls_session * session, xmlNode * msg)
+static int
+crm_send_tls(gnutls_session * session, const char *buf, size_t len)
 {
-    char *xml_text = NULL;
-
-#  if 0
-    const char *name = crm_element_name(msg);
+    const char *unsent = buf;
+    int rc = 0;
+    int total_send;
 
-    if (safe_str_neq(name, "cib_command")) {
-        xmlNodeSetName(msg, "cib_result");
+    if (buf == NULL) {
+        return -1;
     }
-#  endif
-    xml_text = dump_xml_unformatted(msg);
-    if (xml_text != NULL) {
-        char *unsent = xml_text;
-        int len = strlen(xml_text);
-        int rc = 0;
 
-        len++;                  /* null char */
-        crm_trace("Message size: %d", len);
+    total_send = len;
+    crm_trace("Message size: %d", len);
 
-        while (TRUE) {
-            rc = gnutls_record_send(*session, unsent, len);
-            crm_debug("Sent %d bytes", rc);
+    while (TRUE) {
+        rc = gnutls_record_send(*session, unsent, len);
 
-            if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
-                crm_debug("Retry");
+        if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
+            crm_debug("Retry");
 
-            } else if (rc < 0) {
-                crm_debug("Connection terminated");
-                break;
+        } else if (rc < 0) {
+            crm_err("Connection terminated rc = %d", rc);
+            break;
 
-            } else if (rc < len) {
-                crm_debug("Only sent %d of %d bytes", rc, len);
-                len -= rc;
-                unsent += rc;
-            } else {
-                break;
-            }
+        } else if (rc < len) {
+            crm_debug("Only sent %d of %d bytes", rc, len);
+            len -= rc;
+            unsent += rc;
+        } else {
+            crm_debug("Sent %d bytes", rc);
+            break;
         }
-
     }
-    free(xml_text);
-    return NULL;
 
+    return rc < 0 ? rc : total_send;
 }
 
+
+/*!
+ * \internal
+ * \brief Read bytes off non blocking tls session.
+ *
+ * \param session - tls session to read
+ * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit
+ *
+ * \note only use with NON-Blocking sockets. Should only be used after polling socket.
+ *       This function will return once max_size is met, the socket read buffer
+ *       is empty, or an error is encountered.
+ *
+ * \retval '\0' terminated buffer on success
+ */
 static char *
-cib_recv_tls(gnutls_session * session)
+crm_recv_tls(gnutls_session * session, size_t max_size, size_t *recv_len, int *disconnected)
 {
     char *buf = NULL;
-
     int rc = 0;
-    int len = 0;
-    int chunk_size = 1024;
+    size_t len = 0;
+    size_t chunk_size = max_size ? max_size : 1024;
+    size_t buf_size = 0;
+    size_t read_size = 0;
 
     if (session == NULL) {
-        return NULL;
+        if (disconnected) {
+            *disconnected = 1;
+        }
+        goto done;
     }
 
-    buf = calloc(1, chunk_size);
+    buf = calloc(1, chunk_size + 1);
+    buf_size = chunk_size;
 
     while (TRUE) {
-        errno = 0;
-        rc = gnutls_record_recv(*session, buf + len, chunk_size);
-        crm_trace("Got %d more bytes. errno=%d", rc, errno);
+        read_size = buf_size - len;
 
-        if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
-            crm_trace("Retry");
+        /* automatically grow the buffer when needed if max_size is not set.*/
+        if (!max_size && (read_size < (chunk_size / 2))) {
+            buf_size += chunk_size;
+            crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size, buf_size);
+            buf = realloc(buf, buf_size + 1);
+            CRM_ASSERT(buf != NULL);
 
-        } else if (rc == GNUTLS_E_UNEXPECTED_PACKET_LENGTH) {
-            crm_trace("Session disconnected");
-            goto bail;
+            read_size = buf_size - len;
+        }
 
-        } else if (rc < 0) {
-            crm_err("Error receiving message: %s (%d)", gnutls_strerror(rc), rc);
-            goto bail;
+        rc = gnutls_record_recv(*session, buf + len, read_size);
 
-        } else if (rc == chunk_size) {
+        if (rc > 0) {
+            crm_trace("Got %d more bytes.", rc);
             len += rc;
-            chunk_size *= 2;
-            buf = realloc(buf, len + chunk_size);
-            crm_trace("Retry with %d more bytes", (int)chunk_size);
-            CRM_ASSERT(buf != NULL);
-
-        } else if (buf[len + rc - 1] != 0) {
-            crm_trace("Last char is %d '%c'", buf[len + rc - 1], buf[len + rc - 1]);
-            crm_trace("Retry with %d more bytes", (int)chunk_size);
-            len += rc;
-            buf = realloc(buf, len + chunk_size);
-            CRM_ASSERT(buf != NULL);
+            /* always null terminate buffer, the +1 to alloc always allows for this.*/
+            buf[len] = '\0';
+        }
+        if (max_size && (max_size == read_size)) {
+            crm_trace("Buffer max read size %d met" , max_size);
+            goto done;
+        }
 
-        } else {
-            crm_trace("Got %d more bytes", (int)rc);
-            return buf;
+        /* process any errors. */
+        if (rc == GNUTLS_E_INTERRUPTED) {
+            crm_trace("EINTR encoutered, retry tls read");
+        } else if (rc == GNUTLS_E_AGAIN) {
+            crm_trace("non-blocking, exiting read on rc = %d", rc);
+            goto done;
+        } else if (rc <= 0) {
+            if (rc == 0) {
+                crm_debug("EOF encoutered during TLS read");
+            } else {
+                crm_debug("Error receiving message: %s (%d)", gnutls_strerror(rc), rc);
+            }
+            if (disconnected) {
+                *disconnected = 1;
+            }
+            goto done;
         }
     }
-  bail:
-    free(buf);
-    return NULL;
+
+done:
+    if (recv_len) {
+        *recv_len = len;
+    }
+    if (!len) {
+        free(buf);
+        buf = NULL;
+    }
+    return buf;
 
 }
 #endif
 
-char *
-cib_send_plaintext(int sock, xmlNode * msg)
+static int
+crm_send_plaintext(int sock, const char *buf, size_t len)
 {
-    char *xml_text = dump_xml_unformatted(msg);
 
-    if (xml_text != NULL) {
-        int rc = 0;
-        char *unsent = xml_text;
-        int len = strlen(xml_text);
+    int rc = 0;
+    const char *unsent = buf;
+    int total_send;
 
-        len++;                  /* null char */
-        crm_trace("Message on socket %d: size=%d", sock, len);
-  retry:
-        rc = write(sock, unsent, len);
-        if (rc < 0) {
-            switch (errno) {
-                case EINTR:
-                case EAGAIN:
-                    crm_trace("Retry");
-                    goto retry;
-                default:
-                    crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, len);
-                    break;
-            }
+    if (buf == NULL) {
+        return -1;
+    }
+    total_send = len;
 
-        } else if (rc < len) {
-            crm_trace("Only sent %d of %d remaining bytes", rc, len);
-            len -= rc;
-            unsent += rc;
+    crm_trace("Message on socket %d: size=%d", sock, len);
+  retry:
+    rc = write(sock, unsent, len);
+    if (rc < 0) {
+        switch (errno) {
+        case EINTR:
+        case EAGAIN:
+            crm_trace("Retry");
             goto retry;
-
-        } else {
-            crm_trace("Sent %d bytes: %.100s", rc, xml_text);
+        default:
+            crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int) len);
+            break;
         }
+
+    } else if (rc < len) {
+        crm_trace("Only sent %d of %d remaining bytes", rc, len);
+        len -= rc;
+        unsent += rc;
+        goto retry;
+
+     } else {
+        crm_trace("Sent %d bytes: %.100s", rc, buf);
     }
-    free(xml_text);
-    return NULL;
+
+    return rc < 0 ? rc : total_send;
 
 }
 
-char *
-cib_recv_plaintext(int sock)
+/*!
+ * \internal
+ * \brief Read bytes off non blocking socket.
+ *
+ * \param session - tls session to read
+ * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit
+ *
+ * \note only use with NON-Blocking sockets. Should only be used after polling socket.
+ *       This function will return once max_size is met, the socket read buffer
+ *       is empty, or an error is encountered.
+ *
+ * \retval '\0' terminated buffer on success
+ */
+static char *
+crm_recv_plaintext(int sock, size_t max_size, size_t *recv_len, int *disconnected)
 {
     char *buf = NULL;
-
     ssize_t rc = 0;
     ssize_t len = 0;
-    ssize_t chunk_size = 512;
+    ssize_t chunk_size = max_size ? max_size : 1024;
+    size_t buf_size = 0;
+    size_t read_size = 0;
 
-    buf = calloc(1, chunk_size);
+    if (sock <= 0) {
+        if (disconnected) {
+            *disconnected = 1;
+        }
+        goto done;
+    }
 
-    while (1) {
-        errno = 0;
-        rc = read(sock, buf + len, chunk_size);
-        crm_trace("Got %d more bytes. errno=%d", (int)rc, errno);
-
-        if (errno == EINTR || errno == EAGAIN) {
-            crm_trace("Retry: %d", (int)rc);
-            if (rc > 0) {
-                len += rc;
-                buf = realloc(buf, len + chunk_size);
-                CRM_ASSERT(buf != NULL);
-            }
+    buf = calloc(1, chunk_size + 1);
+    buf_size = chunk_size;
 
-        } else if (rc < 0) {
-            crm_perror(LOG_ERR, "Error receiving message: %d", (int)rc);
-            goto bail;
+    while (TRUE) {
+        errno = 0;
+        read_size = buf_size - len;
 
-        } else if (rc == chunk_size) {
-            len += rc;
-            chunk_size *= 2;
-            buf = realloc(buf, len + chunk_size);
-            crm_trace("Retry with %d more bytes", (int)chunk_size);
+        /* automatically grow the buffer when needed if max_size is not set.*/
+        if (!max_size && (read_size < (chunk_size / 2))) {
+            buf_size += chunk_size;
+            crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size, buf_size);
+            buf = realloc(buf, buf_size + 1);
             CRM_ASSERT(buf != NULL);
 
-        } else if (buf[len + rc - 1] != 0) {
-            crm_trace("Last char is %d '%c'", buf[len + rc - 1], buf[len + rc - 1]);
-            crm_trace("Retry with %d more bytes", (int)chunk_size);
+            read_size = buf_size - len;
+        }
+
+        rc = read(sock, buf + len, chunk_size);
+
+        if (rc > 0) {
+            crm_trace("Got %d more bytes. errno=%d", (int)rc, errno);
             len += rc;
-            buf = realloc(buf, len + chunk_size);
-            CRM_ASSERT(buf != NULL);
+            /* always null terminate buffer, the +1 to alloc always allows for this.*/
+            buf[len] = '\0';
+        }
+        if (max_size && (max_size == read_size)) {
+            crm_trace("Buffer max read size %d met" , max_size);
+            goto done;
+        }
 
-        } else {
-            return buf;
+        if (rc > 0) {
+            continue;
+        } else if (rc == 0) {
+            if (disconnected) {
+                *disconnected = 1;
+            }
+            crm_trace("EOF encoutered during read");
+            goto done;
+        }
+
+        /* process errors */
+        if (errno == EINTR) {
+            crm_trace("EINTER encoutered, retry socket read.");
+        } else if (errno == EAGAIN) {
+            crm_trace("non-blocking, exiting read on rc = %d", rc);
+            goto done;
+        } else if (errno <= 0) {
+            if (disconnected) {
+                *disconnected = 1;
+            }
+            crm_debug("Error receiving message: %d", (int)rc);
+            goto done;
         }
     }
-  bail:
-    free(buf);
-    return NULL;
 
+done:
+    if (recv_len) {
+        *recv_len = len;
+    }
+    if (!len) {
+        free(buf);
+        buf = NULL;
+    }
+    return buf;
 }
 
-void
-crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted)
+static int
+crm_send_remote_msg_raw(void *session, const char *buf, size_t len, gboolean encrypted)
 {
+    int rc = -1;
     if (encrypted) {
 #ifdef HAVE_GNUTLS_GNUTLS_H
-        cib_send_tls(session, msg);
+        rc = crm_send_tls(session, buf, len);
 #else
         CRM_ASSERT(encrypted == FALSE);
 #endif
     } else {
-        cib_send_plaintext(GPOINTER_TO_INT(session), msg);
+        rc = crm_send_plaintext(GPOINTER_TO_INT(session), buf, len);
     }
+    return rc;
 }
 
+int
+crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted)
+{
+    int rc = -1;
+    char *xml_text = NULL;
+    int len = 0;
+
+    xml_text = dump_xml_unformatted(msg);
+    if (xml_text) {
+        len = strlen(xml_text);
+    } else {
+        crm_err("Invalid XML, can not send msg");
+        return -1;
+    }
+
+    rc = crm_send_remote_msg_raw(session, xml_text, len, encrypted);
+    if (rc < 0) {
+        goto done;
+    }
+    rc = crm_send_remote_msg_raw(session, REMOTE_MSG_TERMINATOR, strlen(REMOTE_MSG_TERMINATOR), encrypted);
+
+done:
+    if (rc < 0) {
+        crm_err("Failed to send remote msg, rc = %d", rc);
+    }
+
+    free(xml_text);
+    return rc;
+}
+
+/*!
+ * \internal
+ * \brief handles the recv buffer and parsing out msgs.
+ * \note new_data is owned by this function once it is passed in.
+ */
 xmlNode *
-crm_recv_remote_msg(void *session, gboolean encrypted)
+crm_parse_remote_buffer(char **msg_buf)
 {
-    char *reply = NULL;
+    char *buf = NULL;
+    char *start = NULL;
+    char *end = NULL;
     xmlNode *xml = NULL;
 
+    if (*msg_buf == NULL) {
+        return NULL;
+    }
+
+    /* take ownership of the buffer */
+    buf = *msg_buf;
+    *msg_buf = NULL;
+
+    /* MSGS are separated by a '\r\n\r\n'. Split a message off the buffer and return it. */
+    start = buf;
+    end = strstr(start, REMOTE_MSG_TERMINATOR);
+
+    while (!xml && end) {
+
+        /* grab the message */
+        end[0] = '\0';
+        end += strlen(REMOTE_MSG_TERMINATOR);
+
+        xml = string2xml(start);
+        if (xml == NULL) {
+            crm_err("Couldn't parse: '%.120s'", start);
+        }
+        start = end;
+        end = strstr(start, REMOTE_MSG_TERMINATOR);
+    }
+
+    if (xml && start) {
+        /* we have msgs left over, save it until next time */
+        *msg_buf = strdup(start);
+        free(buf);
+    } else if (!xml) {
+        /* no msg present */
+        *msg_buf = buf;
+    }
+
+    return xml;
+}
+
+/*!
+ * \internal
+ * \brief Determine if a remote session has data to read
+ *
+ * \retval 0, timeout occured.
+ * \retval positive, data is ready to be read
+ * \retval negative, session has ended
+ */
+int
+crm_recv_remote_ready(void *session, gboolean encrypted, int timeout /* ms */)
+{
+    struct pollfd fds = { 0, };
+    int sock = 0;
+    void *sock_ptr = NULL;
+    int rc = 0;
+    time_t start;
+
+    if (encrypted) {
+#ifdef HAVE_GNUTLS_GNUTLS_H
+        gnutls_session *tls_session = session;
+        sock_ptr = gnutls_transport_get_ptr(*tls_session);
+#else
+        CRM_ASSERT(encrypted == FALSE);
+#endif
+    } else {
+        sock_ptr = session;
+    }
+
+    sock = GPOINTER_TO_INT(sock_ptr);
+    if (sock <= 0) {
+        return -ENOTCONN;
+    }
+
+    start = time(NULL);
+    errno = 0;
+    do {
+        fds.fd = sock;
+        fds.events = POLLIN;
+
+        /* If we got an EINTR while polling, and we have a
+         * specific timeout we are trying to honor, attempt
+         * to adjust the timeout to the closest second. */
+        if (errno == EINTR && (timeout > 0)) {
+            timeout = timeout - ((time(NULL) - start) * 1000);
+            if (timeout < 1000) {
+                timeout = 1000;
+            }
+        }
+
+        rc = poll(&fds, 1, timeout);
+    } while (rc < 0 && errno == EINTR);
+
+    return rc;
+}
+
+char *
+crm_recv_remote_raw(void *session, gboolean encrypted, size_t max_recv, size_t *recv_len, int *disconnected)
+{
+    char *reply = NULL;
+    if (recv_len) {
+        *recv_len = 0;
+    }
+
+    if (disconnected) {
+        *disconnected = 0;
+    }
+
     if (encrypted) {
 #ifdef HAVE_GNUTLS_GNUTLS_H
-        reply = cib_recv_tls(session);
+        reply = crm_recv_tls(session, max_recv, recv_len, disconnected);
 #else
         CRM_ASSERT(encrypted == FALSE);
 #endif
     } else {
-        reply = cib_recv_plaintext(GPOINTER_TO_INT(session));
+        reply = crm_recv_plaintext(GPOINTER_TO_INT(session), max_recv, recv_len, disconnected);
     }
     if (reply == NULL || strlen(reply) == 0) {
         crm_trace("Empty reply");
+    }
 
-    } else {
-        xml = string2xml(reply);
-        if (xml == NULL) {
-            crm_err("Couldn't parse: '%.120s'", reply);
+    return reply;
+}
+
+/*!
+ * \internal
+ * \brief Read data off the socket until at least one full message is present or timeout occures.
+ * \retval TRUE message read
+ * \retval FALSE full message not read
+ */
+
+gboolean
+crm_recv_remote_msg(void *session, char **recv_buf, gboolean encrypted, int total_timeout /*ms */, int *disconnected)
+{
+    int ret;
+    size_t request_len = 0;
+    time_t start = time(NULL);
+    char *raw_request = NULL;
+    int remaining_timeout = 0;
+
+    if (total_timeout == 0) {
+        total_timeout = 10000;
+    } else if (total_timeout < 0) {
+        total_timeout = 60000;
+    }
+    *disconnected = 0;
+
+    remaining_timeout = total_timeout;
+    while ((remaining_timeout > 0) && !(*disconnected)) {
+
+        /* read some more off the tls buffer if we still have time left. */
+        crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d", total_timeout, remaining_timeout);
+        ret = crm_recv_remote_ready(session, encrypted, remaining_timeout);
+        raw_request = NULL;
+
+        if (ret == 0) {
+            crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout);
+            return FALSE;
+
+        } else if (ret < 0) {
+            if (errno != EINTR) {
+                crm_debug("poll returned error while waiting for msg, rc: %d, errno: %d", ret, errno);
+                *disconnected = 1;
+                return FALSE;
+            }
+            crm_debug("poll EINTR encountered during poll, retrying");
+        } else {
+            raw_request = crm_recv_remote_raw(session, encrypted, 0, &request_len, disconnected);
+        }
+
+        remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
+
+        if (!raw_request) {
+            crm_debug("Empty msg received after poll");
+            continue;
+        }
+
+        if (*recv_buf) {
+            int old_len = strlen(*recv_buf);
+
+            crm_trace("Expanding recv buffer from %d to %d", old_len, old_len+request_len);
+
+            *recv_buf = realloc(*recv_buf, old_len + request_len + 1);
+            memcpy(*recv_buf + old_len, raw_request, request_len);
+            *(*recv_buf+old_len+request_len) = '\0';
+            free(raw_request);
+        } else {
+            *recv_buf = raw_request;
+        }
+
+        if (strstr(*recv_buf, REMOTE_MSG_TERMINATOR)) {
+            return TRUE;
         }
     }
 
-    free(reply);
-    return xml;
+    return FALSE;
 }
+
+/*!
+ * \internal
+ * \brief tcp connection to server at specified port
+ * \retval positive, socket fd.
+ * \retval negative, failed to connect.
+ */
+int
+crm_remote_tcp_connect(const char *host, int port)
+{
+    struct addrinfo *res;
+    struct addrinfo *rp;
+    struct addrinfo hints;
+    const char *server = host;
+    int ret_ga;
+    int sock;
+
+    /* getaddrinfo */
+    memset(&hints, 0, sizeof(struct addrinfo));
+    hints.ai_family = AF_UNSPEC;    /* Allow IPv4 or IPv6 */
+    hints.ai_socktype = SOCK_STREAM;
+    hints.ai_flags = AI_CANONNAME;
+
+    crm_debug("Looking up %s", server);
+    ret_ga = getaddrinfo(server, NULL, &hints, &res);
+    if (ret_ga) {
+        crm_err("getaddrinfo: %s", gai_strerror(ret_ga));
+        return -1;
+    }
+
+    if (!res || !res->ai_addr) {
+        crm_err("getaddrinfo failed");
+        return -1;
+    }
+
+    for (rp = res; rp != NULL; rp = rp->ai_next) {
+        struct sockaddr *addr = rp->ai_addr;
+        int flag = 0;
+        if (!addr) {
+            continue;
+        }
+
+        if (rp->ai_canonname) {
+            server = res->ai_canonname;
+        }
+        crm_debug("Got address %s for %s", server, host);
+
+        /* create socket */
+        sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
+        if (sock == -1) {
+            crm_err("Socket creation failed for remote client connection.");
+            continue;
+        }
+        if (addr->sa_family == AF_INET6) {
+            struct sockaddr_in6 *addr_in = (struct sockaddr_in6 *) addr;
+            addr_in->sin6_port = htons(port);
+        } else {
+            struct sockaddr_in *addr_in = (struct sockaddr_in *) addr;
+            addr_in->sin_port = htons(port);
+            crm_info("Attempting to connect to remote server at %s:%d", inet_ntoa(addr_in->sin_addr), port);
+        }
+
+        if (connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
+            if ((flag = fcntl(sock, F_GETFL)) >= 0) {
+                if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
+                    crm_err( "fcntl() write failed");
+                    close(sock);
+                    sock = -1;
+                    continue;
+                }
+            }
+            break;                  /* Success */
+        }
+
+        close(sock);
+        sock = -1;
+    }
+    freeaddrinfo(res);
+
+    return sock;
+}
+
diff --git a/tools/crm_mon.c b/tools/crm_mon.c
index 5c2e687..fe59264 100644
--- a/tools/crm_mon.c
+++ b/tools/crm_mon.c
@@ -275,7 +275,7 @@
             if (rc == pcmk_ok) {
                 rc = cib->cmds->set_connection_dnotify(cib, mon_cib_connection_destroy);
                 if (rc == -EPROTONOSUPPORT) {
-                    print_as("Notification setup failed, won't be able to reconnect after failure");
+                    print_as("Notification setup not supported, won't be able to reconnect after failure");
                     if (as_console) {
                         sleep(2);
                     }