diff --git a/cib/callbacks.c b/cib/callbacks.c index 86cfd92..aa5f6a8 100644 --- a/cib/callbacks.c +++ b/cib/callbacks.c @@ -347,13 +347,18 @@ struct qb_ipcs_service_handlers ipc_rw_callbacks = int rid = 0; if(sync_reply) { - CRM_LOG_ASSERT(client_obj->request_id); + if (client_obj->ipc) { + CRM_LOG_ASSERT(client_obj->request_id); - rid = client_obj->request_id; - client_obj->request_id = 0; + rid = client_obj->request_id; + client_obj->request_id = 0; - crm_trace("Sending response %d to %s %s", + crm_trace("Sending response %d to %s %s", rid, client_obj->name, from_peer?"(originator of delegated request)":""); + } else { + crm_trace("Sending response to %s %s", + client_obj->name, from_peer?"(originator of delegated request)":""); + } } else { crm_trace("Sending an event to %s %s", diff --git a/cib/callbacks.h b/cib/callbacks.h index 99a5065..b8af997 100644 --- a/cib/callbacks.h +++ b/cib/callbacks.h @@ -41,18 +41,21 @@ char *name; char *callback_id; char *user; + char *recv_buf; int request_id; qb_ipcs_connection_t *ipc; #ifdef HAVE_GNUTLS_GNUTLS_H gnutls_session *session; + gboolean handshake_complete; #else void *session; #endif gboolean encrypted; + gboolean remote_auth; mainloop_io_t *remote; - + unsigned long num_calls; int pre_notify; @@ -60,6 +63,7 @@ int confirmations; int replace; int diffs; + int remote_auth_timeout; GList *delegated_calls; } cib_client_t; diff --git a/cib/notify.c b/cib/notify.c index 9dcb699..1dcda8f 100644 --- a/cib/notify.c +++ b/cib/notify.c @@ -83,7 +83,7 @@ void do_cib_notify(int options, const char *op, xmlNode * update, CRM_CHECK(client != NULL, return TRUE); CRM_CHECK(update_msg != NULL, return TRUE); - if (client->ipc == NULL) { + if (client->ipc == NULL && client->session == NULL) { crm_warn("Skipping client with NULL channel"); return FALSE; } diff --git a/cib/remote.c b/cib/remote.c index 7ad7132..f9b90a0 100644 --- a/cib/remote.c +++ b/cib/remote.c @@ -60,10 +60,6 @@ # endif #endif -#ifdef HAVE_DECL_NANOSLEEP -# include <time.h> -#endif - extern int remote_tls_fd; extern gboolean cib_shutdown_flag; @@ -73,17 +69,16 @@ #ifdef HAVE_GNUTLS_GNUTLS_H # define DH_BITS 1024 gnutls_dh_params dh_params; -extern gnutls_anon_server_credentials anon_cred_s; +gnutls_anon_server_credentials anon_cred_s; static void debug_log(int level, const char *str) { fputs(str, stderr); } - -extern gnutls_session *create_tls_session(int csock, int type); - #endif +#define REMOTE_AUTH_TIMEOUT 10000 + int num_clients; int authenticate_user(const char *user, const char *passwd); int cib_remote_listen(gpointer data); @@ -121,7 +116,7 @@ #else crm_notice("Starting a tls listener on port %d.", port); gnutls_global_init(); -/* gnutls_global_set_log_level (10); */ + /* gnutls_global_set_log_level (10); */ gnutls_global_set_log_function(debug_log); gnutls_dh_params_init(&dh_params); gnutls_dh_params_generate2(dh_params, DH_BITS); @@ -215,37 +210,89 @@ return FALSE; } +static gboolean +cib_remote_auth(xmlNode *login) +{ + const char *user = NULL; + const char *pass = NULL; + const char *tmp = NULL; + + crm_log_xml_info(login, "Login: "); + if (login == NULL) { + return FALSE; + } + + tmp = crm_element_name(login); + if (safe_str_neq(tmp, "cib_command")) { + crm_err("Wrong tag: %s", tmp); + return FALSE; + } + + tmp = crm_element_value(login, "op"); + if (safe_str_neq(tmp, "authenticate")) { + crm_err("Wrong operation: %s", tmp); + return FALSE; + } + + user = crm_element_value(login, "user"); + pass = crm_element_value(login, "password"); + + if (!user || !pass) { + crm_err("missing auth credentials"); + return FALSE; + } + + /* Non-root daemons can only validate the password of the + * user they're running as + */ + if (check_group_membership(user, CRM_DAEMON_GROUP) == FALSE) { + crm_err("User is not a member of the required group"); + return FALSE; + + } else if (authenticate_user(user, pass) == FALSE) { + crm_err("PAM auth failed"); + return FALSE; + } + + return TRUE; +} + +static gboolean +remote_auth_timeout_cb(gpointer data) +{ + cib_client_t *client = data; + + client->remote_auth_timeout = 0; + + if (client->remote_auth == TRUE) { + return FALSE; + } + + mainloop_del_fd(client->remote); + crm_err("Remote client authentication timed out"); + + return FALSE; +} int cib_remote_listen(gpointer data) { - int lpc = 0; int csock = 0; unsigned laddr; - time_t now = 0; - time_t start = time(NULL); struct sockaddr_in addr; int ssock = *(int *)data; + int flag; #ifdef HAVE_GNUTLS_GNUTLS_H gnutls_session *session = NULL; #endif cib_client_t *new_client = NULL; - xmlNode *login = NULL; - const char *user = NULL; - const char *pass = NULL; - const char *tmp = NULL; - -#ifdef HAVE_DECL_NANOSLEEP - const struct timespec sleepfast = { 0, 10000000 }; /* 10 millisec */ -#endif - static struct mainloop_fd_callbacks remote_client_fd_callbacks = { .dispatch = cib_remote_msg, .destroy = cib_remote_connection_destroy, - }; - + }; + /* accept the connection */ laddr = sizeof(addr); csock = accept(ssock, (struct sockaddr *)&addr, &laddr); @@ -257,10 +304,22 @@ return TRUE; } + if ((flag = fcntl(csock, F_GETFL)) >= 0) { + if (fcntl(csock, F_SETFL, flag | O_NONBLOCK) < 0) { + crm_err( "fcntl() write failed"); + close(csock); + return TRUE; + } + } else { + crm_err( "fcntl() read failed"); + close(csock); + return TRUE; + } + if (ssock == remote_tls_fd) { #ifdef HAVE_GNUTLS_GNUTLS_H /* create gnutls session for the server socket */ - session = create_tls_session(csock, GNUTLS_SERVER); + session = crm_create_anon_tls_session(csock, GNUTLS_SERVER, anon_cred_s); if (session == NULL) { crm_err("TLS session creation failed"); close(csock); @@ -269,73 +328,13 @@ #endif } - do { - crm_trace("Iter: %d", lpc++); - if (ssock == remote_tls_fd) { -#ifdef HAVE_GNUTLS_GNUTLS_H - login = crm_recv_remote_msg(session, TRUE); -#endif - } else { - login = crm_recv_remote_msg(GINT_TO_POINTER(csock), FALSE); - } - if (login != NULL) { - break; - } -#ifdef HAVE_DECL_NANOSLEEP - nanosleep(&sleepfast, NULL); -#else - sleep(1); -#endif - now = time(NULL); - - /* Peers have 3s to connect */ - } while (login == NULL && (start - now) < 4); - - crm_log_xml_info(login, "Login: "); - if (login == NULL) { - goto bail; - } - - tmp = crm_element_name(login); - if (safe_str_neq(tmp, "cib_command")) { - crm_err("Wrong tag: %s", tmp); - goto bail; - } - - tmp = crm_element_value(login, "op"); - if (safe_str_neq(tmp, "authenticate")) { - crm_err("Wrong operation: %s", tmp); - goto bail; - } - - user = crm_element_value(login, "user"); - pass = crm_element_value(login, "password"); - - /* Non-root daemons can only validate the password of the - * user they're running as - */ - if (check_group_membership(user, CRM_DAEMON_GROUP) == FALSE) { - crm_err("User is not a member of the required group"); - goto bail; - - } else if (authenticate_user(user, pass) == FALSE) { - crm_err("PAM auth failed"); - goto bail; - } - - /* send ACK */ num_clients++; new_client = calloc(1, sizeof(cib_client_t)); - new_client->name = crm_element_value_copy(login, "name"); - - CRM_CHECK(new_client->id == NULL, free(new_client->id)); new_client->id = crm_generate_uuid(); - -#if ENABLE_ACL - new_client->user = strdup(user); -#endif - new_client->callback_id = NULL; + /* clients have a few seconds to perform handshake. */ + new_client->remote_auth_timeout = g_timeout_add(REMOTE_AUTH_TIMEOUT, remote_auth_timeout_cb, new_client); + if (ssock == remote_tls_fd) { #ifdef HAVE_GNUTLS_GNUTLS_H new_client->encrypted = TRUE; @@ -345,37 +344,19 @@ new_client->session = GINT_TO_POINTER(csock); } - free_xml(login); - login = create_xml_node(NULL, "cib_result"); - crm_xml_add(login, F_CIB_OPERATION, CRM_OP_REGISTER); - crm_xml_add(login, F_CIB_CLIENTID, new_client->id); - crm_send_remote_msg(new_client->session, login, new_client->encrypted); - free_xml(login); - new_client->remote = mainloop_add_fd( "cib-remote-client", G_PRIORITY_DEFAULT, csock, new_client, &remote_client_fd_callbacks); g_hash_table_insert(client_list, new_client->id, new_client); return TRUE; - - bail: - if (ssock == remote_tls_fd) { -#ifdef HAVE_GNUTLS_GNUTLS_H - gnutls_bye(*session, GNUTLS_SHUT_RDWR); - gnutls_deinit(*session); - gnutls_free(session); -#endif - } - close(csock); - free_xml(login); - return TRUE; } void cib_remote_connection_destroy(gpointer user_data) { cib_client_t *client = user_data; + int csock = 0; if (client == NULL) { return; @@ -393,10 +374,36 @@ crm_trace("Destroying %s (%p)", client->name, user_data); num_clients--; crm_trace("Num unfree'd clients: %d", num_clients); + if (client->remote_auth_timeout) { + g_source_remove(client->remote_auth_timeout); + } + + if (client->encrypted) { +#ifdef HAVE_GNUTLS_GNUTLS_H + if (client->session) { + void *sock_ptr = gnutls_transport_get_ptr(*client->session); + csock = GPOINTER_TO_INT(sock_ptr); + if (client->handshake_complete) { + gnutls_bye(*client->session, GNUTLS_SHUT_WR); + } + gnutls_deinit(*client->session); + gnutls_free(client->session); + } +#endif + } else { + csock = GPOINTER_TO_INT(client->session); + } + client->session = NULL; + + if (csock > 0) { + close(csock); + } + free(client->name); free(client->callback_id); free(client->id); free(client->user); + free(client->recv_buf); free(client); crm_trace("Freed the cib client"); @@ -406,24 +413,15 @@ return; } -int -cib_remote_msg(gpointer data) +static void +cib_handle_remote_msg(cib_client_t *client, xmlNode *command) { const char *value = NULL; - xmlNode *command = NULL; - cib_client_t *client = data; - - crm_trace("%s callback", client->encrypted ? "secure" : "clear-text"); - - command = crm_recv_remote_msg(client->session, client->encrypted); - if (command == NULL) { - return -1; - } value = crm_element_name(command); if (safe_str_neq(value, "cib_command")) { crm_log_xml_trace(command, "Bad command: "); - goto bail; + return; } if (client->name == NULL) { @@ -472,9 +470,95 @@ crm_log_xml_trace(command, "Remote command: "); cib_common_callback_worker(0, 0, command, client, TRUE); - bail: - free_xml(command); - command = NULL; +} + +int +cib_remote_msg(gpointer data) +{ + xmlNode *command = NULL; + cib_client_t *client = data; + int disconnected = 0; + int timeout = client->remote_auth ? -1 : 1000; + + crm_trace("%s callback", client->encrypted ? "secure" : "clear-text"); + +#ifdef HAVE_GNUTLS_GNUTLS_H + if (client->encrypted && (client->handshake_complete == FALSE)) { + int rc = 0; + + /* Muliple calls to handshake will be required, this callback + * will be invoked once the client sends more handshake data. */ + do { + rc = gnutls_handshake(*client->session); + + if (rc < 0 && rc != GNUTLS_E_AGAIN) { + crm_err("Remote cib tls handshake failed"); + return -1; + } + } while (rc == GNUTLS_E_INTERRUPTED); + + if (rc == 0) { + crm_debug("Remote cib tls handshake completed"); + client->handshake_complete = TRUE; + if (client->remote_auth_timeout) { + g_source_remove(client->remote_auth_timeout); + } + /* after handshake, clients must send auth in a few seconds */ + client->remote_auth_timeout = g_timeout_add(REMOTE_AUTH_TIMEOUT, remote_auth_timeout_cb, client); + } + return 0; + } +#endif + + crm_recv_remote_msg(client->session, &client->recv_buf, client->encrypted, timeout, &disconnected); + + /* must pass auth before we will process anything else */ + if (client->remote_auth == FALSE) { + xmlNode *reg; +#if ENABLE_ACL + const char *user = NULL; +#endif + command = crm_parse_remote_buffer(&client->recv_buf); + if (cib_remote_auth(command) == FALSE) { + free_xml(command); + return -1; + } + + crm_debug("remote connection authenticated successfully"); + client->remote_auth = TRUE; + g_source_remove(client->remote_auth_timeout); + client->remote_auth_timeout = 0; + client->name = crm_element_value_copy(command, "name"); + +#if ENABLE_ACL + user = crm_element_value(command, "user"); + if (user) { + new_client->user = strdup(user); + } +#endif + + /* send ACK */ + reg = create_xml_node(NULL, "cib_result"); + crm_xml_add(reg, F_CIB_OPERATION, CRM_OP_REGISTER); + crm_xml_add(reg, F_CIB_CLIENTID, client->id); + crm_send_remote_msg(client->session, reg, client->encrypted); + free_xml(reg); + free_xml(command); + } + + command = crm_parse_remote_buffer(&client->recv_buf); + while (command) { + crm_trace("command received"); + cib_handle_remote_msg(client, command); + free_xml(command); + command = crm_parse_remote_buffer(&client->recv_buf); + } + + if (disconnected) { + crm_trace("disconnected while receiving remote cib msg."); + return -1; + } + return 0; } diff --git a/include/crm_internal.h b/include/crm_internal.h index cf6d95d..388af59 100644 --- a/include/crm_internal.h +++ b/include/crm_internal.h @@ -199,8 +199,40 @@ xmlNode *create_operation_update(xmlNode * parent, lrmd_event_data_t *event, con long long crm_int_helper(const char *text, char **end_text); char *crm_concat(const char *prefix, const char *suffix, char join); char *generate_hash_key(const char *crm_msg_reference, const char *sys); -xmlNode *crm_recv_remote_msg(void *session, gboolean encrypted); -void crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted); + + +/*! remote tcp/tls helper functions */ +gboolean crm_recv_remote_msg(void *session, char **recv_buf, gboolean encrypted, int total_timeout_ms, int *disconnected); +char *crm_recv_remote_raw(void *data, gboolean encrypted, size_t max_recv, size_t *recv_len, int *disconnected); +int crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted); +int crm_recv_remote_ready(void *session, gboolean encrypted, int timeout_ms); +xmlNode *crm_parse_remote_buffer(char **msg_buf); +int crm_remote_tcp_connect(const char *host, int port); + +#ifdef HAVE_GNUTLS_GNUTLS_H +/*! + * \internal + * \brief Initiate the client handshake after establishing the tcp socket. + * \note This is a blocking function, it will block until the entire handshake + * is complete or until the timeout period is reached. + * \retval 0 success + * \retval negative, failure + */ +int crm_initiate_client_tls_handshake(void *session_data, int timeout_ms); +/*! + * \internal + * \brief Create client or server session for anon DH encryption credentials + * \param sock, the socket the session will use for transport + * \param type, GNUTLS_SERVER or GNUTLS_CLIENT + * \param credentials, gnutls_anon_server_credentials_t or gnutls_anon_client_credentials_t + * + * \retval gnutls_session * on success + * \retval NULL on failure + */ +void *crm_create_anon_tls_session(int sock, int type, void *credentials); +#endif + +#define REMOTE_MSG_TERMINATOR "\r\n\r\n" const char *daemon_option(const char *option); void set_daemon_option(const char *option, const char *value); diff --git a/lib/cib/cib_remote.c b/lib/cib/cib_remote.c index 91bca96..ca80c0e 100644 --- a/lib/cib/cib_remote.c +++ b/lib/cib/cib_remote.c @@ -38,14 +38,15 @@ #ifdef HAVE_GNUTLS_GNUTLS_H # undef KEYFILE # include <gnutls/gnutls.h> -extern gnutls_anon_client_credentials anon_cred_c; -extern gnutls_session *create_tls_session(int csock, int type); +gnutls_anon_client_credentials anon_cred_c; +#define DEFAULT_CLIENT_HANDSHAKE_TIMEOUT 5000 /* 5 seconds */ const int kx_prio[] = { GNUTLS_KX_ANON_DH, 0 }; +static gboolean remote_gnutls_credentials_init = FALSE; #else typedef void gnutls_session; #endif @@ -61,6 +62,7 @@ struct remote_connection_s { gnutls_session *session; mainloop_io_t *source; char *token; + char *recv_buf; }; typedef struct cib_remote_opaque_s { @@ -76,7 +78,8 @@ struct remote_connection_s { } cib_remote_opaque_t; void cib_remote_connection_destroy(gpointer user_data); -int cib_remote_dispatch(gpointer user_data); +int cib_remote_callback_dispatch(gpointer user_data); +int cib_remote_command_dispatch(gpointer user_data); int cib_remote_signon(cib_t * cib, const char *name, enum cib_conn_type type); int cib_remote_signoff(cib_t * cib); int cib_remote_free(cib_t * cib); @@ -158,117 +161,91 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c { cib_remote_opaque_t *private = cib->variant_opaque; - shutdown(private->command.socket, SHUT_RDWR); /* no more receptions */ - shutdown(private->callback.socket, SHUT_RDWR); /* no more receptions */ - close(private->command.socket); - close(private->callback.socket); - #ifdef HAVE_GNUTLS_GNUTLS_H if (private->command.encrypted) { - gnutls_bye(*(private->command.session), GNUTLS_SHUT_RDWR); - gnutls_deinit(*(private->command.session)); - gnutls_free(private->command.session); - - gnutls_bye(*(private->callback.session), GNUTLS_SHUT_RDWR); - gnutls_deinit(*(private->callback.session)); - gnutls_free(private->callback.session); + if (private->command.session) { + gnutls_bye(*(private->command.session), GNUTLS_SHUT_RDWR); + gnutls_deinit(*(private->command.session)); + gnutls_free(private->command.session); + } - gnutls_anon_free_client_credentials(anon_cred_c); - gnutls_global_deinit(); + if (private->callback.session) { + gnutls_bye(*(private->callback.session), GNUTLS_SHUT_RDWR); + gnutls_deinit(*(private->callback.session)); + gnutls_free(private->callback.session); + } + private->command.session = NULL; + private->callback.session = NULL; + if (remote_gnutls_credentials_init) { + gnutls_anon_free_client_credentials(anon_cred_c); + gnutls_global_deinit(); + remote_gnutls_credentials_init = FALSE; + } } #endif + + if (private->command.socket) { + shutdown(private->command.socket, SHUT_RDWR); /* no more receptions */ + close(private->command.socket); + } + if (private->callback.socket) { + shutdown(private->callback.socket, SHUT_RDWR); /* no more receptions */ + close(private->callback.socket); + } + private->command.socket = 0; + private->callback.socket = 0; + + free(private->command.recv_buf); + free(private->callback.recv_buf); + private->command.recv_buf = NULL; + private->callback.recv_buf = NULL; + return 0; } static int -cib_tls_signon(cib_t * cib, struct remote_connection_s *connection) +cib_tls_signon(cib_t * cib, struct remote_connection_s *connection, gboolean event_channel) { int sock; cib_remote_opaque_t *private = cib->variant_opaque; - struct sockaddr_in addr; int rc = 0; - char *server = private->server; - - int ret_ga; - struct addrinfo *res; - struct addrinfo hints; + int disconnected = 0; xmlNode *answer = NULL; xmlNode *login = NULL; - static struct mainloop_fd_callbacks cib_fd_callbacks = - { - .dispatch = cib_remote_dispatch, - .destroy = cib_remote_connection_destroy, - }; + static struct mainloop_fd_callbacks cib_fd_callbacks = { 0, }; + + cib_fd_callbacks.dispatch = event_channel ? cib_remote_callback_dispatch : cib_remote_command_dispatch; + cib_fd_callbacks.destroy = cib_remote_connection_destroy; connection->socket = 0; connection->session = NULL; - /* create socket */ - sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - if (sock == -1) { - crm_perror(LOG_ERR, "Socket creation failed"); - return -1; - } - - /* getaddrinfo */ - bzero(&hints, sizeof(struct addrinfo)); - hints.ai_flags = AI_CANONNAME; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_RAW; - - if (hints.ai_family == AF_INET6) { - hints.ai_protocol = IPPROTO_ICMPV6; - } else { - hints.ai_protocol = IPPROTO_ICMP; - } - - crm_debug("Looking up %s", server); - ret_ga = getaddrinfo(server, NULL, &hints, &res); - if (ret_ga) { - crm_err("getaddrinfo: %s", gai_strerror(ret_ga)); - close(sock); - return -1; - } - - if (res->ai_canonname) { - server = res->ai_canonname; - } - - crm_debug("Got address %s for %s", server, private->server); - - if (!res->ai_addr) { - fprintf(stderr, "getaddrinfo failed"); - crm_exit(1); - } -#if 1 - memcpy(&addr, res->ai_addr, res->ai_addrlen); -#else - /* connect to server */ - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = inet_addr(server); -#endif - addr.sin_port = htons(private->port); - - if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) { - crm_perror(LOG_ERR, "Connection to %s:%d failed", server, private->port); - close(sock); - return -1; + sock = crm_remote_tcp_connect(private->server, private->port); + if (sock <= 0) { + crm_perror(LOG_ERR, "remote tcp connection to %s:%d failed", private->server, private->port); } + connection->socket = sock; if (connection->encrypted) { /* initialize GnuTls lib */ #ifdef HAVE_GNUTLS_GNUTLS_H - gnutls_global_init(); - gnutls_anon_allocate_client_credentials(&anon_cred_c); + if (remote_gnutls_credentials_init == FALSE) { + gnutls_global_init(); + gnutls_anon_allocate_client_credentials(&anon_cred_c); + remote_gnutls_credentials_init = TRUE; + } /* bind the socket to GnuTls lib */ - connection->session = create_tls_session(sock, GNUTLS_CLIENT); - if (connection->session == NULL) { - crm_perror(LOG_ERR, "Session creation for %s:%d failed", server, private->port); - close(sock); + connection->session = crm_create_anon_tls_session(sock, GNUTLS_CLIENT, anon_cred_c); + + if (crm_initiate_client_tls_handshake(connection->session, DEFAULT_CLIENT_HANDSHAKE_TIMEOUT) != 0) { + crm_err("Session creation for %s:%d failed", private->server, private->port); + + gnutls_deinit(*connection->session); + gnutls_free(connection->session); + connection->session = NULL; cib_tls_close(cib); return -1; } @@ -289,7 +266,14 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c crm_send_remote_msg(connection->session, login, connection->encrypted); free_xml(login); - answer = crm_recv_remote_msg(connection->session, connection->encrypted); + crm_recv_remote_msg(connection->session, &connection->recv_buf, connection->encrypted, -1, &disconnected); + + if (disconnected) { + rc = -ENOTCONN; + } + + answer = crm_parse_remote_buffer(&connection->recv_buf); + crm_log_xml_trace(answer, "Reply"); if (answer == NULL) { rc = -EPROTO; @@ -310,12 +294,15 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c connection->token = strdup(tmp_ticket); } } + free_xml(answer); + answer = NULL; if (rc != 0) { cib_tls_close(cib); + return rc; } - connection->socket = sock; + crm_trace("remote client connection established"); connection->source = mainloop_add_fd("cib-remote", G_PRIORITY_HIGH, connection->socket, cib, &cib_fd_callbacks); return rc; } @@ -331,35 +318,61 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c } int -cib_remote_dispatch(gpointer user_data) +cib_remote_command_dispatch(gpointer user_data) +{ + int disconnected = 0; + cib_t *cib = user_data; + cib_remote_opaque_t *private = cib->variant_opaque; + + crm_recv_remote_msg(private->command.session, &private->command.recv_buf, private->command.encrypted, -1, &disconnected); + + free(private->command.recv_buf); + private->command.recv_buf = NULL; + crm_err("received late reply for remote cib connection, discarding"); + + if (disconnected) { + return -1; + } + return 0; +} + +int +cib_remote_callback_dispatch(gpointer user_data) { cib_t *cib = user_data; cib_remote_opaque_t *private = cib->variant_opaque; xmlNode *msg = NULL; - const char *type = NULL; + int disconnected = 0; crm_info("Message on callback channel"); - msg = crm_recv_remote_msg(private->callback.session, private->callback.encrypted); - type = crm_element_value(msg, F_TYPE); - crm_trace("Activating %s callbacks...", type); + crm_recv_remote_msg(private->callback.session, &private->callback.recv_buf, private->callback.encrypted, -1, &disconnected); - if (safe_str_eq(type, T_CIB)) { - cib_native_callback(cib, msg, 0, 0); + msg = crm_parse_remote_buffer(&private->callback.recv_buf); + while (msg) { + const char *type = crm_element_value(msg, F_TYPE); + crm_trace("Activating %s callbacks...", type); - } else if (safe_str_eq(type, T_CIB_NOTIFY)) { - g_list_foreach(cib->notify_list, cib_native_notify, msg); + if (safe_str_eq(type, T_CIB)) { + cib_native_callback(cib, msg, 0, 0); - } else { - crm_err("Unknown message type: %s", type); - } + } else if (safe_str_eq(type, T_CIB_NOTIFY)) { + g_list_foreach(cib->notify_list, cib_native_notify, msg); + + } else { + crm_err("Unknown message type: %s", type); + } - if (msg != NULL) { free_xml(msg); - return 0; + msg = crm_parse_remote_buffer(&private->callback.recv_buf); + } + + if (disconnected) { + return -1; } - return -1; + + return 0; } int @@ -394,11 +407,11 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c } if (rc == pcmk_ok) { - rc = cib_tls_signon(cib, &(private->command)); + rc = cib_tls_signon(cib, &(private->command), FALSE); } if (rc == pcmk_ok) { - rc = cib_tls_signon(cib, &(private->callback)); + rc = cib_tls_signon(cib, &(private->callback), TRUE); } if (rc == pcmk_ok) { @@ -463,37 +476,20 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c return rc; } -static gboolean timer_expired = FALSE; -static struct timer_rec_s *sync_timer = NULL; -static gboolean -cib_timeout_handler(gpointer data) -{ - struct timer_rec_s *timer = data; - - timer_expired = TRUE; - crm_err("Call %d timed out after %ds", timer->call_id, timer->timeout); - - /* Always return TRUE, never remove the handler - * We do that after the while-loop in cib_native_perform_op() - */ - return TRUE; -} - int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const char *section, xmlNode * data, xmlNode ** output_data, int call_options, const char *name) { int rc = pcmk_ok; + int disconnected = 0; + int remaining_time = 0; + time_t start_time; xmlNode *op_msg = NULL; xmlNode *op_reply = NULL; cib_remote_opaque_t *private = cib->variant_opaque; - if (sync_timer == NULL) { - sync_timer = calloc(1, sizeof(struct timer_rec_s)); - } - if (cib->state == cib_disconnected) { return -ENOTCONN; } @@ -524,7 +520,11 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c } crm_trace("Sending %s message to CIB service", op); - crm_send_remote_msg(private->command.session, op_msg, private->command.encrypted); + if (!(call_options & cib_sync_call)) { + crm_send_remote_msg(private->callback.session, op_msg, private->command.encrypted); + } else { + crm_send_remote_msg(private->command.session, op_msg, private->command.encrypted); + } free_xml(op_msg); if ((call_options & cib_discard_reply)) { @@ -537,30 +537,21 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c crm_trace("Waiting for a syncronous reply"); - if (cib->call_timeout > 0) { - /* We need this, even with msgfromIPC_timeout(), because we might - * get other/older replies that don't match the active request - */ - timer_expired = FALSE; - sync_timer->call_id = cib->call_id; - sync_timer->timeout = cib->call_timeout * 1000; - sync_timer->ref = g_timeout_add(sync_timer->timeout, cib_timeout_handler, sync_timer); - } + start_time = time(NULL); + remaining_time = cib->call_timeout ? cib->call_timeout : 60; - while (timer_expired == FALSE) { + while (remaining_time > 0 && !disconnected) { int reply_id = -1; int msg_id = cib->call_id; - op_reply = crm_recv_remote_msg(private->command.session, private->command.encrypted); - if (op_reply == NULL) { + crm_recv_remote_msg(private->command.session, &private->command.recv_buf, private->command.encrypted, remaining_time * 1000, &disconnected); + op_reply = crm_parse_remote_buffer(&private->command.recv_buf); + + if (!op_reply) { break; } crm_element_value_int(op_reply, F_CIB_CALLID, &reply_id); - CRM_CHECK(reply_id > 0, free_xml(op_reply); - if (sync_timer->ref > 0) { - g_source_remove(sync_timer->ref); sync_timer->ref = 0;} - return -ENOMSG) ; if (reply_id == msg_id) { break; @@ -579,15 +570,9 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c free_xml(op_reply); op_reply = NULL; - } - - if (sync_timer->ref > 0) { - g_source_remove(sync_timer->ref); - sync_timer->ref = 0; - } - if (timer_expired) { - return -ETIME; + /* wasn't the right reply, try and read some more */ + remaining_time = time(NULL) - start_time; } /* if(IPC_ISRCONN(native->command_channel) == FALSE) { */ @@ -596,7 +581,10 @@ int cib_remote_perform_op(cib_t * cib, const char *op, const char *host, const c /* cib->state = cib_disconnected; */ /* } */ - if (op_reply == NULL) { + if (disconnected) { + crm_err("Disconnected while waiting for reply."); + return -ENOTCONN; + } else if (op_reply == NULL) { crm_err("No reply message - empty"); return -ENOMSG; } diff --git a/lib/common/mainloop.c b/lib/common/mainloop.c index 09cf6e9..1e413b6 100644 --- a/lib/common/mainloop.c +++ b/lib/common/mainloop.c @@ -178,6 +178,7 @@ struct trigger_s { source->trigger = FALSE; if (source->id > 0) { g_source_remove(source->id); + source->id = 0; } return TRUE; } diff --git a/lib/common/remote.c b/lib/common/remote.c index 7f04097..ae61481 100644 --- a/lib/common/remote.c +++ b/lib/common/remote.c @@ -25,8 +25,10 @@ #include <sys/stat.h> #include <unistd.h> #include <sys/socket.h> - +#include <arpa/inet.h> #include <netinet/ip.h> +#include <netdb.h> + #include <stdlib.h> #include <errno.h> @@ -42,7 +44,7 @@ #endif #ifdef HAVE_GNUTLS_GNUTLS_H -const int tls_kx_order[] = { +const int anon_tls_kx_order[] = { GNUTLS_KX_ANON_DH, GNUTLS_KX_DHE_RSA, GNUTLS_KX_DHE_DSS, @@ -50,22 +52,32 @@ 0 }; -gnutls_anon_client_credentials anon_cred_c; -gnutls_anon_server_credentials anon_cred_s; -static char *cib_send_tls(gnutls_session * session, xmlNode * msg); -static char *cib_recv_tls(gnutls_session * session); -#endif +int +crm_initiate_client_tls_handshake(void *session_data, int timeout_ms) +{ + int rc = 0; + int pollrc = 0; + time_t start = time(NULL); + gnutls_session *session = session_data; -char *cib_recv_plaintext(int sock); -char *cib_send_plaintext(int sock, xmlNode * msg); + do { + rc = gnutls_handshake(*session); + if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) { + pollrc = crm_recv_remote_ready(session, TRUE, 1000); + if (pollrc < 0) { + /* poll returned error, there is no hope */ + rc = -1; + } + } + } while (((time(NULL) - start) < (timeout_ms/1000)) && + (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN)); -#ifdef HAVE_GNUTLS_GNUTLS_H -gnutls_session *create_tls_session(int csock, int type); + return rc; +} -gnutls_session * -create_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ ) +void * +crm_create_anon_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */, void *credentials) { - int rc = 0; gnutls_session *session = gnutls_malloc(sizeof(gnutls_session)); gnutls_init(session, type); @@ -75,266 +87,619 @@ /* gnutls_priority_set_direct (*session, "NONE:+VERS-TLS-ALL:+CIPHER-ALL:+MAC-ALL:+SIGN-ALL:+COMP-ALL:+ANON-DH", NULL); */ # else gnutls_set_default_priority(*session); - gnutls_kx_set_priority(*session, tls_kx_order); + gnutls_kx_set_priority(*session, anon_tls_kx_order); # endif gnutls_transport_set_ptr(*session, (gnutls_transport_ptr) GINT_TO_POINTER(csock)); switch (type) { - case GNUTLS_SERVER: - gnutls_credentials_set(*session, GNUTLS_CRD_ANON, anon_cred_s); - break; - case GNUTLS_CLIENT: - gnutls_credentials_set(*session, GNUTLS_CRD_ANON, anon_cred_c); - break; + case GNUTLS_SERVER: + gnutls_credentials_set(*session, GNUTLS_CRD_ANON, (gnutls_anon_server_credentials_t) credentials); + break; + case GNUTLS_CLIENT: + gnutls_credentials_set(*session, GNUTLS_CRD_ANON, (gnutls_anon_client_credentials_t) credentials); + break; } - do { - rc = gnutls_handshake(*session); - } while (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN); - - if (rc < 0) { - crm_err("Handshake failed: %s", gnutls_strerror(rc)); - gnutls_deinit(*session); - gnutls_free(session); - return NULL; - } return session; } -static char * -cib_send_tls(gnutls_session * session, xmlNode * msg) +static int +crm_send_tls(gnutls_session * session, const char *buf, size_t len) { - char *xml_text = NULL; - -# if 0 - const char *name = crm_element_name(msg); + const char *unsent = buf; + int rc = 0; + int total_send; - if (safe_str_neq(name, "cib_command")) { - xmlNodeSetName(msg, "cib_result"); + if (buf == NULL) { + return -1; } -# endif - xml_text = dump_xml_unformatted(msg); - if (xml_text != NULL) { - char *unsent = xml_text; - int len = strlen(xml_text); - int rc = 0; - len++; /* null char */ - crm_trace("Message size: %d", len); + total_send = len; + crm_trace("Message size: %d", len); - while (TRUE) { - rc = gnutls_record_send(*session, unsent, len); - crm_debug("Sent %d bytes", rc); + while (TRUE) { + rc = gnutls_record_send(*session, unsent, len); - if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) { - crm_debug("Retry"); + if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) { + crm_debug("Retry"); - } else if (rc < 0) { - crm_debug("Connection terminated"); - break; + } else if (rc < 0) { + crm_err("Connection terminated rc = %d", rc); + break; - } else if (rc < len) { - crm_debug("Only sent %d of %d bytes", rc, len); - len -= rc; - unsent += rc; - } else { - break; - } + } else if (rc < len) { + crm_debug("Only sent %d of %d bytes", rc, len); + len -= rc; + unsent += rc; + } else { + crm_debug("Sent %d bytes", rc); + break; } - } - free(xml_text); - return NULL; + return rc < 0 ? rc : total_send; } + +/*! + * \internal + * \brief Read bytes off non blocking tls session. + * + * \param session - tls session to read + * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit + * + * \note only use with NON-Blocking sockets. Should only be used after polling socket. + * This function will return once max_size is met, the socket read buffer + * is empty, or an error is encountered. + * + * \retval '\0' terminated buffer on success + */ static char * -cib_recv_tls(gnutls_session * session) +crm_recv_tls(gnutls_session * session, size_t max_size, size_t *recv_len, int *disconnected) { char *buf = NULL; - int rc = 0; - int len = 0; - int chunk_size = 1024; + size_t len = 0; + size_t chunk_size = max_size ? max_size : 1024; + size_t buf_size = 0; + size_t read_size = 0; if (session == NULL) { - return NULL; + if (disconnected) { + *disconnected = 1; + } + goto done; } - buf = calloc(1, chunk_size); + buf = calloc(1, chunk_size + 1); + buf_size = chunk_size; while (TRUE) { - errno = 0; - rc = gnutls_record_recv(*session, buf + len, chunk_size); - crm_trace("Got %d more bytes. errno=%d", rc, errno); + read_size = buf_size - len; - if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) { - crm_trace("Retry"); + /* automatically grow the buffer when needed if max_size is not set.*/ + if (!max_size && (read_size < (chunk_size / 2))) { + buf_size += chunk_size; + crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size, buf_size); + buf = realloc(buf, buf_size + 1); + CRM_ASSERT(buf != NULL); - } else if (rc == GNUTLS_E_UNEXPECTED_PACKET_LENGTH) { - crm_trace("Session disconnected"); - goto bail; + read_size = buf_size - len; + } - } else if (rc < 0) { - crm_err("Error receiving message: %s (%d)", gnutls_strerror(rc), rc); - goto bail; + rc = gnutls_record_recv(*session, buf + len, read_size); - } else if (rc == chunk_size) { + if (rc > 0) { + crm_trace("Got %d more bytes.", rc); len += rc; - chunk_size *= 2; - buf = realloc(buf, len + chunk_size); - crm_trace("Retry with %d more bytes", (int)chunk_size); - CRM_ASSERT(buf != NULL); - - } else if (buf[len + rc - 1] != 0) { - crm_trace("Last char is %d '%c'", buf[len + rc - 1], buf[len + rc - 1]); - crm_trace("Retry with %d more bytes", (int)chunk_size); - len += rc; - buf = realloc(buf, len + chunk_size); - CRM_ASSERT(buf != NULL); + /* always null terminate buffer, the +1 to alloc always allows for this.*/ + buf[len] = '\0'; + } + if (max_size && (max_size == read_size)) { + crm_trace("Buffer max read size %d met" , max_size); + goto done; + } - } else { - crm_trace("Got %d more bytes", (int)rc); - return buf; + /* process any errors. */ + if (rc == GNUTLS_E_INTERRUPTED) { + crm_trace("EINTR encoutered, retry tls read"); + } else if (rc == GNUTLS_E_AGAIN) { + crm_trace("non-blocking, exiting read on rc = %d", rc); + goto done; + } else if (rc <= 0) { + if (rc == 0) { + crm_debug("EOF encoutered during TLS read"); + } else { + crm_debug("Error receiving message: %s (%d)", gnutls_strerror(rc), rc); + } + if (disconnected) { + *disconnected = 1; + } + goto done; } } - bail: - free(buf); - return NULL; + +done: + if (recv_len) { + *recv_len = len; + } + if (!len) { + free(buf); + buf = NULL; + } + return buf; } #endif -char * -cib_send_plaintext(int sock, xmlNode * msg) +static int +crm_send_plaintext(int sock, const char *buf, size_t len) { - char *xml_text = dump_xml_unformatted(msg); - if (xml_text != NULL) { - int rc = 0; - char *unsent = xml_text; - int len = strlen(xml_text); + int rc = 0; + const char *unsent = buf; + int total_send; - len++; /* null char */ - crm_trace("Message on socket %d: size=%d", sock, len); - retry: - rc = write(sock, unsent, len); - if (rc < 0) { - switch (errno) { - case EINTR: - case EAGAIN: - crm_trace("Retry"); - goto retry; - default: - crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, len); - break; - } + if (buf == NULL) { + return -1; + } + total_send = len; - } else if (rc < len) { - crm_trace("Only sent %d of %d remaining bytes", rc, len); - len -= rc; - unsent += rc; + crm_trace("Message on socket %d: size=%d", sock, len); + retry: + rc = write(sock, unsent, len); + if (rc < 0) { + switch (errno) { + case EINTR: + case EAGAIN: + crm_trace("Retry"); goto retry; - - } else { - crm_trace("Sent %d bytes: %.100s", rc, xml_text); + default: + crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int) len); + break; } + + } else if (rc < len) { + crm_trace("Only sent %d of %d remaining bytes", rc, len); + len -= rc; + unsent += rc; + goto retry; + + } else { + crm_trace("Sent %d bytes: %.100s", rc, buf); } - free(xml_text); - return NULL; + + return rc < 0 ? rc : total_send; } -char * -cib_recv_plaintext(int sock) +/*! + * \internal + * \brief Read bytes off non blocking socket. + * + * \param session - tls session to read + * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit + * + * \note only use with NON-Blocking sockets. Should only be used after polling socket. + * This function will return once max_size is met, the socket read buffer + * is empty, or an error is encountered. + * + * \retval '\0' terminated buffer on success + */ +static char * +crm_recv_plaintext(int sock, size_t max_size, size_t *recv_len, int *disconnected) { char *buf = NULL; - ssize_t rc = 0; ssize_t len = 0; - ssize_t chunk_size = 512; + ssize_t chunk_size = max_size ? max_size : 1024; + size_t buf_size = 0; + size_t read_size = 0; - buf = calloc(1, chunk_size); + if (sock <= 0) { + if (disconnected) { + *disconnected = 1; + } + goto done; + } - while (1) { - errno = 0; - rc = read(sock, buf + len, chunk_size); - crm_trace("Got %d more bytes. errno=%d", (int)rc, errno); - - if (errno == EINTR || errno == EAGAIN) { - crm_trace("Retry: %d", (int)rc); - if (rc > 0) { - len += rc; - buf = realloc(buf, len + chunk_size); - CRM_ASSERT(buf != NULL); - } + buf = calloc(1, chunk_size + 1); + buf_size = chunk_size; - } else if (rc < 0) { - crm_perror(LOG_ERR, "Error receiving message: %d", (int)rc); - goto bail; + while (TRUE) { + errno = 0; + read_size = buf_size - len; - } else if (rc == chunk_size) { - len += rc; - chunk_size *= 2; - buf = realloc(buf, len + chunk_size); - crm_trace("Retry with %d more bytes", (int)chunk_size); + /* automatically grow the buffer when needed if max_size is not set.*/ + if (!max_size && (read_size < (chunk_size / 2))) { + buf_size += chunk_size; + crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size, buf_size); + buf = realloc(buf, buf_size + 1); CRM_ASSERT(buf != NULL); - } else if (buf[len + rc - 1] != 0) { - crm_trace("Last char is %d '%c'", buf[len + rc - 1], buf[len + rc - 1]); - crm_trace("Retry with %d more bytes", (int)chunk_size); + read_size = buf_size - len; + } + + rc = read(sock, buf + len, chunk_size); + + if (rc > 0) { + crm_trace("Got %d more bytes. errno=%d", (int)rc, errno); len += rc; - buf = realloc(buf, len + chunk_size); - CRM_ASSERT(buf != NULL); + /* always null terminate buffer, the +1 to alloc always allows for this.*/ + buf[len] = '\0'; + } + if (max_size && (max_size == read_size)) { + crm_trace("Buffer max read size %d met" , max_size); + goto done; + } - } else { - return buf; + if (rc > 0) { + continue; + } else if (rc == 0) { + if (disconnected) { + *disconnected = 1; + } + crm_trace("EOF encoutered during read"); + goto done; + } + + /* process errors */ + if (errno == EINTR) { + crm_trace("EINTER encoutered, retry socket read."); + } else if (errno == EAGAIN) { + crm_trace("non-blocking, exiting read on rc = %d", rc); + goto done; + } else if (errno <= 0) { + if (disconnected) { + *disconnected = 1; + } + crm_debug("Error receiving message: %d", (int)rc); + goto done; } } - bail: - free(buf); - return NULL; +done: + if (recv_len) { + *recv_len = len; + } + if (!len) { + free(buf); + buf = NULL; + } + return buf; } -void -crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted) +static int +crm_send_remote_msg_raw(void *session, const char *buf, size_t len, gboolean encrypted) { + int rc = -1; if (encrypted) { #ifdef HAVE_GNUTLS_GNUTLS_H - cib_send_tls(session, msg); + rc = crm_send_tls(session, buf, len); #else CRM_ASSERT(encrypted == FALSE); #endif } else { - cib_send_plaintext(GPOINTER_TO_INT(session), msg); + rc = crm_send_plaintext(GPOINTER_TO_INT(session), buf, len); } + return rc; } +int +crm_send_remote_msg(void *session, xmlNode * msg, gboolean encrypted) +{ + int rc = -1; + char *xml_text = NULL; + int len = 0; + + xml_text = dump_xml_unformatted(msg); + if (xml_text) { + len = strlen(xml_text); + } else { + crm_err("Invalid XML, can not send msg"); + return -1; + } + + rc = crm_send_remote_msg_raw(session, xml_text, len, encrypted); + if (rc < 0) { + goto done; + } + rc = crm_send_remote_msg_raw(session, REMOTE_MSG_TERMINATOR, strlen(REMOTE_MSG_TERMINATOR), encrypted); + +done: + if (rc < 0) { + crm_err("Failed to send remote msg, rc = %d", rc); + } + + free(xml_text); + return rc; +} + +/*! + * \internal + * \brief handles the recv buffer and parsing out msgs. + * \note new_data is owned by this function once it is passed in. + */ xmlNode * -crm_recv_remote_msg(void *session, gboolean encrypted) +crm_parse_remote_buffer(char **msg_buf) { - char *reply = NULL; + char *buf = NULL; + char *start = NULL; + char *end = NULL; xmlNode *xml = NULL; + if (*msg_buf == NULL) { + return NULL; + } + + /* take ownership of the buffer */ + buf = *msg_buf; + *msg_buf = NULL; + + /* MSGS are separated by a '\r\n\r\n'. Split a message off the buffer and return it. */ + start = buf; + end = strstr(start, REMOTE_MSG_TERMINATOR); + + while (!xml && end) { + + /* grab the message */ + end[0] = '\0'; + end += strlen(REMOTE_MSG_TERMINATOR); + + xml = string2xml(start); + if (xml == NULL) { + crm_err("Couldn't parse: '%.120s'", start); + } + start = end; + end = strstr(start, REMOTE_MSG_TERMINATOR); + } + + if (xml && start) { + /* we have msgs left over, save it until next time */ + *msg_buf = strdup(start); + free(buf); + } else if (!xml) { + /* no msg present */ + *msg_buf = buf; + } + + return xml; +} + +/*! + * \internal + * \brief Determine if a remote session has data to read + * + * \retval 0, timeout occured. + * \retval positive, data is ready to be read + * \retval negative, session has ended + */ +int +crm_recv_remote_ready(void *session, gboolean encrypted, int timeout /* ms */) +{ + struct pollfd fds = { 0, }; + int sock = 0; + void *sock_ptr = NULL; + int rc = 0; + time_t start; + + if (encrypted) { +#ifdef HAVE_GNUTLS_GNUTLS_H + gnutls_session *tls_session = session; + sock_ptr = gnutls_transport_get_ptr(*tls_session); +#else + CRM_ASSERT(encrypted == FALSE); +#endif + } else { + sock_ptr = session; + } + + sock = GPOINTER_TO_INT(sock_ptr); + if (sock <= 0) { + return -ENOTCONN; + } + + start = time(NULL); + errno = 0; + do { + fds.fd = sock; + fds.events = POLLIN; + + /* If we got an EINTR while polling, and we have a + * specific timeout we are trying to honor, attempt + * to adjust the timeout to the closest second. */ + if (errno == EINTR && (timeout > 0)) { + timeout = timeout - ((time(NULL) - start) * 1000); + if (timeout < 1000) { + timeout = 1000; + } + } + + rc = poll(&fds, 1, timeout); + } while (rc < 0 && errno == EINTR); + + return rc; +} + +char * +crm_recv_remote_raw(void *session, gboolean encrypted, size_t max_recv, size_t *recv_len, int *disconnected) +{ + char *reply = NULL; + if (recv_len) { + *recv_len = 0; + } + + if (disconnected) { + *disconnected = 0; + } + if (encrypted) { #ifdef HAVE_GNUTLS_GNUTLS_H - reply = cib_recv_tls(session); + reply = crm_recv_tls(session, max_recv, recv_len, disconnected); #else CRM_ASSERT(encrypted == FALSE); #endif } else { - reply = cib_recv_plaintext(GPOINTER_TO_INT(session)); + reply = crm_recv_plaintext(GPOINTER_TO_INT(session), max_recv, recv_len, disconnected); } if (reply == NULL || strlen(reply) == 0) { crm_trace("Empty reply"); + } - } else { - xml = string2xml(reply); - if (xml == NULL) { - crm_err("Couldn't parse: '%.120s'", reply); + return reply; +} + +/*! + * \internal + * \brief Read data off the socket until at least one full message is present or timeout occures. + * \retval TRUE message read + * \retval FALSE full message not read + */ + +gboolean +crm_recv_remote_msg(void *session, char **recv_buf, gboolean encrypted, int total_timeout /*ms */, int *disconnected) +{ + int ret; + size_t request_len = 0; + time_t start = time(NULL); + char *raw_request = NULL; + int remaining_timeout = 0; + + if (total_timeout == 0) { + total_timeout = 10000; + } else if (total_timeout < 0) { + total_timeout = 60000; + } + *disconnected = 0; + + remaining_timeout = total_timeout; + while ((remaining_timeout > 0) && !(*disconnected)) { + + /* read some more off the tls buffer if we still have time left. */ + crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d", total_timeout, remaining_timeout); + ret = crm_recv_remote_ready(session, encrypted, remaining_timeout); + raw_request = NULL; + + if (ret == 0) { + crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout); + return FALSE; + + } else if (ret < 0) { + if (errno != EINTR) { + crm_debug("poll returned error while waiting for msg, rc: %d, errno: %d", ret, errno); + *disconnected = 1; + return FALSE; + } + crm_debug("poll EINTR encountered during poll, retrying"); + } else { + raw_request = crm_recv_remote_raw(session, encrypted, 0, &request_len, disconnected); + } + + remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000); + + if (!raw_request) { + crm_debug("Empty msg received after poll"); + continue; + } + + if (*recv_buf) { + int old_len = strlen(*recv_buf); + + crm_trace("Expanding recv buffer from %d to %d", old_len, old_len+request_len); + + *recv_buf = realloc(*recv_buf, old_len + request_len + 1); + memcpy(*recv_buf + old_len, raw_request, request_len); + *(*recv_buf+old_len+request_len) = '\0'; + free(raw_request); + } else { + *recv_buf = raw_request; + } + + if (strstr(*recv_buf, REMOTE_MSG_TERMINATOR)) { + return TRUE; } } - free(reply); - return xml; + return FALSE; } + +/*! + * \internal + * \brief tcp connection to server at specified port + * \retval positive, socket fd. + * \retval negative, failed to connect. + */ +int +crm_remote_tcp_connect(const char *host, int port) +{ + struct addrinfo *res; + struct addrinfo *rp; + struct addrinfo hints; + const char *server = host; + int ret_ga; + int sock; + + /* getaddrinfo */ + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_CANONNAME; + + crm_debug("Looking up %s", server); + ret_ga = getaddrinfo(server, NULL, &hints, &res); + if (ret_ga) { + crm_err("getaddrinfo: %s", gai_strerror(ret_ga)); + return -1; + } + + if (!res || !res->ai_addr) { + crm_err("getaddrinfo failed"); + return -1; + } + + for (rp = res; rp != NULL; rp = rp->ai_next) { + struct sockaddr *addr = rp->ai_addr; + int flag = 0; + if (!addr) { + continue; + } + + if (rp->ai_canonname) { + server = res->ai_canonname; + } + crm_debug("Got address %s for %s", server, host); + + /* create socket */ + sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP); + if (sock == -1) { + crm_err("Socket creation failed for remote client connection."); + continue; + } + if (addr->sa_family == AF_INET6) { + struct sockaddr_in6 *addr_in = (struct sockaddr_in6 *) addr; + addr_in->sin6_port = htons(port); + } else { + struct sockaddr_in *addr_in = (struct sockaddr_in *) addr; + addr_in->sin_port = htons(port); + crm_info("Attempting to connect to remote server at %s:%d", inet_ntoa(addr_in->sin_addr), port); + } + + if (connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) { + if ((flag = fcntl(sock, F_GETFL)) >= 0) { + if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) { + crm_err( "fcntl() write failed"); + close(sock); + sock = -1; + continue; + } + } + break; /* Success */ + } + + close(sock); + sock = -1; + } + freeaddrinfo(res); + + return sock; +} + diff --git a/tools/crm_mon.c b/tools/crm_mon.c index 5c2e687..fe59264 100644 --- a/tools/crm_mon.c +++ b/tools/crm_mon.c @@ -275,7 +275,7 @@ if (rc == pcmk_ok) { rc = cib->cmds->set_connection_dnotify(cib, mon_cib_connection_destroy); if (rc == -EPROTONOSUPPORT) { - print_as("Notification setup failed, won't be able to reconnect after failure"); + print_as("Notification setup not supported, won't be able to reconnect after failure"); if (as_console) { sleep(2); }