#!/usr/bin/python # encoding: utf-8 # +------------------------------------------------------------------+ # | ____ _ _ __ __ _ __ | # | / ___| |__ ___ ___| | __ | \/ | |/ / | # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / | # | | |___| | | | __/ (__| < | | | | . \ | # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ | # | | # | Copyright Mathias Kettner 2012 mk@mathias-kettner.de | # +------------------------------------------------------------------+ # # This file is part of Check_MK. # The official homepage is at http://mathias-kettner.de/check_mk. # # check_mk is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation in version 2. check_mk is distributed # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with- # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A # PARTICULAR PURPOSE. See the GNU General Public License for more de- # ails. You should have received a copy of the GNU General Public # License along with GNU Make; see the file COPYING. If not, write # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, # Boston, MA 02110-1301 USA. import socket, os, time, sys, getopt, signal, thread, pprint, re, select, subprocess, stat, pickle, uuid from pwd import getpwnam from grp import getgrnam VERSION="1.2.2b1" # .--Helper functions----------------------------------------------------. # | _ _ _ | # | | | | | ___| |_ __ ___ _ __ ___ | # | | |_| |/ _ \ | '_ \ / _ \ '__/ __| | # | | _ | __/ | |_) | __/ | \__ \ | # | |_| |_|\___|_| .__/ \___|_| |___/ | # | |_| | # +----------------------------------------------------------------------+ # | Various helper functions | # '----------------------------------------------------------------------' def format_exception(): import StringIO, traceback txt = StringIO.StringIO() t, v, tb = sys.exc_info() traceback.print_exception(t, v, tb, None, txt) return txt.getvalue() def bail_out(reason): log("FATAL ERROR: %s" % reason) log("%s" % format_exception()) sys.exit(1) def make_parentdirs(file_path): dir_path = os.path.dirname(file_path) if not os.path.exists(dir_path): os.makedirs(dir_path) def process_exists(pid): try: os.kill(pid, 0) return True except: return False def open_logfile(): global g_logfile g_logfile = file(g_logfile_path, "a") def log(text): global g_logfile if type(text) == unicode: text = text.encode("utf-8") try: g_logfile.write('[%.6f] %s\n' % (time.time(), text)) g_logfile.flush() except: sys.stderr.write("%s\n" % text) def verbose(text, level = 1): if opt_verbose >= level: log(text) # .--Spoolfiles Handler--------------------------------------------------. # | ____ _ __ _ _ | # | / ___| _ __ ___ ___ | |/ _(_) | ___ ___ | # | \___ \| '_ \ / _ \ / _ \| | |_| | |/ _ \/ __| | # | ___) | |_) | (_) | (_) | | _| | | __/\__ \ | # | |____/| .__/ \___/ \___/|_|_| |_|_|\___||___/ | # | |_| | # | _ _ _ _ | # | | | | | __ _ _ __ __| | | ___ _ __ | # | | |_| |/ _` | '_ \ / _` | |/ _ \ '__| | # | | _ | (_| | | | | (_| | | __/ | | # | |_| |_|\__,_|_| |_|\__,_|_|\___|_| | # | | # +----------------------------------------------------------------------+ # | Processes the spoolfiles in the spool and deferred directories | # | by processing them with 'cmk --notify spoolfile {name}' or | # | forwarding them to another mknotifyd instance if applicable | # '----------------------------------------------------------------------' class SpoolfilesHandler: def __init__(self): self._should_terminate = False self._is_running = False def run(self): log("Starting SpoolfilesHandler") self._is_running = True try: while not self._should_terminate: self.process_directory(g_spool_dir) self.process_directory(g_deferred_dir, g_config["notification_deferred_retention_time"]) time.sleep(3) except Exception, e: log("Error processing spoolfile %s" % format_exception()) log("Stopping SpoolfilesHandler") self._is_running = False def process_directory(self, dir_path, files_older_than = None): now = time.time() for root, dirs, files in os.walk(dir_path): for spoolfile in files: spoolfile_process_result = -1 # Check spoolfile type # Spoolfiles with the key forward are handled locally spoolfile_path = "%s/%s" % (root, spoolfile) file_age = now - os.stat(spoolfile_path)[8] if file_age < files_older_than: verbose("File age %d" % file_age, 2) continue try: content = eval(file(spoolfile_path).read()) if not content.get("context"): raise Exception("Unable to find key context") except Exception, e: log("Invalid spoolfile %s\n%s" % (spoolfile_path,e)) now = time.time() os.utime(spoolfile_path, (now, now)) continue if content.get("forward"): response_text = "" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(10) host, port = content.get("forward").split(':',1) del content["forward"] verbose("Forward notification to %s %s" % (host, port), 2) sock.connect((host, int(port))) sock.send(pickle.dumps(content)) # Wait for OK response while True: readable = select.select([sock], [], [], 1)[0] data = None try: chunk = sock.recv(8192) response_text += chunk if not chunk: break except: break # Error while reading except Exception, e: # Connection problems verbose(format_exception(), 2) spoolfile_process_result = response_text != "OK" and 1 or 0 else: spoolfile_process_result = os.system("cmk --notify spoolfile %s" % spoolfile_path) verbose("process result <%d> of file %s " % (spoolfile_process_result, spoolfile_path), 2) if spoolfile_process_result == 1: # Moving logfile to deferred and retry later deferredfile_path = "%s/%s" % (g_deferred_dir, spoolfile) os.rename(spoolfile_path, deferredfile_path) now = time.time() os.utime(deferredfile_path, (now, now)) else: os.remove(spoolfile_path) # .--TCP-Server----------------------------------------------------------. # | _____ ____ ____ ____ | # | |_ _/ ___| _ \ / ___| ___ _ ____ _____ _ __ | # | | || | | |_) |___\___ \ / _ \ '__\ \ / / _ \ '__| | # | | || |___| __/_____|__) | __/ | \ V / __/ | | # | |_| \____|_| |____/ \___|_| \_/ \___|_| | # | | # +----------------------------------------------------------------------+ # | Receives TCP Messages from foreign mknotifyd instances and creates | # | spoolfiles out of it | # '----------------------------------------------------------------------' class TcpServer: def __init__(self): self._tcp_socket = None self._should_terminate = False self._is_running = False self._reopen_sockets = False def open_sockets(self): listen_port = g_config["notification_daemon_listen_port"] log("Listen for remote notifications at port %d" % listen_port) try: self._tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._tcp_socket.bind(("0.0.0.0", listen_port)) self._tcp_socket.listen(200) except: log("Error opening socket.\n%s" % format_exception()) self._should_terminate = True def close_sockets(self): if self._tcp_socket: self._tcp_socket.close() self._tcp_socket = None log("No longer listen for remote notificiations") def run(self): self._should_terminate = False self._is_running = True log("Starting TcpServer") self.open_sockets() handled_connections = 0 # Debug info while not self._should_terminate: readable = select.select([self._tcp_socket], [], [], 0.5)[0] for s in readable: client_socket, addr_info = s.accept() client_data = client_socket.recv(8192) if opt_verbose > 1: verbose("Received notification %d from %s" % ( handled_connections, pprint.pformat(addr_info)), 2) handled_connections = handled_connections + 1 try: content = pickle.loads(client_data) if "context" in content: context = content["context"] if not context["CONTACTNAME"]: log("Error: Unable to process data from %s %d" % (addr_info[0], addr_info[1])) client_socket.send("ERROR") else: contact_dir = "%s/%s" % (g_spool_dir, context["CONTACTNAME"]) if not os.path.exists(contact_dir): os.makedirs(contact_dir) spoolfile = "%s/%0.2f_%s" % (contact_dir, time.time(), uuid.uuid1()) file(spoolfile,"w").write(pprint.pformat(content)) verbose("client data processed - sending OK", 2) client_socket.send("OK") except Exception, e: log("Error processing data from %s" % pprint.pformat(addr_info)) try: if client_socket: client_socket.send("ERROR") client_socket.close() client_socket = None except: pass client_socket = None if self._reopen_sockets: log("Reopen tcp socket") self.close_sockets() self.open_sockets() self._reopen_sockets = False log("Stopping TcpServer") self.close_sockets() self._is_running = False # .--Daemonize-----------------------------------------------------------. # | ____ _ | # | | _ \ __ _ ___ _ __ ___ ___ _ __ (_)_______ | # | | | | |/ _` |/ _ \ '_ ` _ \ / _ \| '_ \| |_ / _ \ | # | | |_| | (_| | __/ | | | | | (_) | | | | |/ / __/ | # | |____/ \__,_|\___|_| |_| |_|\___/|_| |_|_/___\___| | # | | # +----------------------------------------------------------------------+ # | Code for daemonizing | # '----------------------------------------------------------------------' def daemonize(user=0, group=0): # do the UNIX double-fork magic, see Stevens' "Advanced # Programming in the UNIX Environment" for details (ISBN 0201563177) try: pid = os.fork() if pid > 0: # exit first parent sys.exit(0) except OSError, e: sys.stderr.write("Fork failed (#1): %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) # decouple from parent environment # chdir -> don't prevent unmounting... os.chdir("/") # Create new process group with the process as leader os.setsid() # Set user/group depending on params if group: os.setregid(getgrnam(group)[2], getgrnam(group)[2]) if user: os.setreuid(getpwnam(user)[2], getpwnam(user)[2]) # do second fork try: pid = os.fork() if pid > 0: sys.exit(0) except OSError, e: sys.stderr.write("Fork failed (#2): %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) sys.stdout.flush() sys.stderr.flush() si = os.open("/dev/null", os.O_RDONLY) so = os.open("/dev/null", os.O_WRONLY) os.dup2(si, 0) os.dup2(so, 1) os.dup2(so, 2) os.close(si) os.close(so) log("Daemonized with PID %d." % os.getpid()) def load_configuration(): global g_config, g_config_changed last_config = g_config.copy() config_file = "%s/mknotifyd.d/wato/global.mk" % g_config_dir try: execfile(config_file, {}, g_config) except: g_config = last_config if last_config != g_config: log("Configuration has changed") g_config_changed = True def usage(): sys.stdout.write("""Usage: mknotifyd [OPTIONS] -v, --verbose Enable verbose output -g, --foreground Do not daemonize, run in foreground -s, --single Single shot, exit after one cycle """) if os.getenv("OMD_ROOT"): sys.stdout.write("""You are running OMD, which is generally a good idea. The following defaults are set: Config dir: %(g_config_dir)s Var dir: %(g_var_dir)s PID file: %(g_pid_file)s Log file: %(g_logfile_path)s """ % globals()) def run_thread(run_function, args=()): return thread.start_new_thread(run_function, args) def run_notifyd(): global g_tpc_server, g_spoolfiles_handler global g_spool_dir, g_deferred_dir global g_config_changed g_spool_dir = "%s/notify/spool" % g_var_dir g_deferred_dir = "%s/notify/deferred" % g_var_dir if not os.path.exists(g_spool_dir): os.makedirs(g_spool_dir) if not os.path.exists(g_deferred_dir): os.makedirs(g_deferred_dir) # Start worker threads if g_config["notification_daemon_listen_port"]: run_thread(g_tcp_server.run) run_thread(g_spoolfiles_handler.run) while True: try: time.sleep(1) # Read configuration again an check for changes # Our tcp server might need a restart after its config has changed load_configuration() if g_config_changed: if g_config["notification_daemon_listen_port"]: if not g_tcp_server._is_running: run_thread(g_tcp_server.run) else: g_tcp_server._reopen_sockets = True else: g_tcp_server._should_terminate = True g_config_changed = False time.sleep(0.2) # Check if worker threads are still running if g_config["notification_daemon_listen_port"] and not g_tcp_server._is_running: log("TcpServer thread crashed. Restarting...") run_thread(g_tcp_server.run) if not g_spoolfiles_handler._is_running: log("SpoolfilesHandler thread crashed. Restarting...") run_thread(g_spoolfiles_handler.run) if opt_single_cycle: raise MKSignalException(1) except MKSignalException, e: # Initiate shutdown g_tcp_server._should_terminate = True g_spoolfiles_handler._should_terminate = True now = time.time() while (g_tcp_server._is_running or g_spoolfiles_handler._is_running)\ and time.time() - now < 2: time.sleep(0.1) if g_tcp_server._is_running: log("Error: Couldn't stop TcpServer thread") if g_spoolfiles_handler._is_running: log("Error: Couldn't stop SpoolfilesHandler thread") break class MKSignalException(Exception): def __init__(self, signum): Exception.__init__(self, "Got signal %d" % signum) self._signum = signum def signal_handler(signum, stack_frame): log("Got signal %d" % signum) raise MKSignalException(signum) #. # .--Main----------------------------------------------------------------. # | __ __ _ | # | | \/ | __ _(_)_ __ | # | | |\/| |/ _` | | '_ \ | # | | | | | (_| | | | | | | # | |_| |_|\__,_|_|_| |_| | # | | # +----------------------------------------------------------------------+ # | Main entry and option parsing | # '----------------------------------------------------------------------' os.unsetenv("LANG") opt_verbose = False opt_foreground = False opt_single_cycle = False # Set default values for options omd_root = os.getenv("OMD_ROOT") if omd_root: g_config_dir = omd_root + "/etc/check_mk" g_var_dir = omd_root + "/var/check_mk" g_pid_file = omd_root + "/tmp/run/mknotifyd/pid" g_logfile_path = omd_root + "/var/log/mknotifyd.log" else: g_config_dir = "/etc/check_mk" g_var_dir = "/var/check_mk" g_pid_file = "/var/run/mknotifyd.pid" g_logfile_path = "/var/log/mknotifyd.log" g_config = { "notification_daemon_listen_port": None, "notification_deferred_retention_time": 180, "notification_forward_to": "", } short_options = "hVvgs" long_options = [ "help", "version", "verbose", "foreground", "single" ] try: opts, args = getopt.getopt(sys.argv[1:], short_options, long_options) # first parse modifers for o, a in opts: if o in [ '-v', '--verbose' ]: opt_verbose += 1 elif o in [ '-g', '--foreground' ]: opt_foreground = True elif o in [ '-s', '--single' ]: opt_single_cycle = True # now handle action options for o, a in opts: if o in [ '-h', '--help' ]: usage() sys.exit(0) elif o in [ '-V', '--version' ]: sys.stdout.write("mknotifyd version %s\n" % VERSION) sys.exit(0) # Prepare logging if running in daemon mode if not opt_foreground: open_logfile() log("-" * 65) log("mknotifyd version %s starting" % VERSION) load_configuration() g_config_changed = False # Of course its changed on startup... if os.path.exists(g_pid_file): old_pid = int(file(g_pid_file).read()) if process_exists(old_pid): bail_out("Old PID file %s still existing and mknotifyd still running with PID %d." % (g_pid_file, old_pid)) os.remove(g_pid_file) log("Removed orphaned PID file %s (process %d not running anymore)." % (g_pid_file, old_pid)) # Make sure paths exist make_parentdirs(g_logfile_path) make_parentdirs(g_pid_file) # Create worker classes g_tcp_server = TcpServer() g_spoolfiles_handler = SpoolfilesHandler() # Daemonize if not opt_foreground: make_parentdirs(g_pid_file) daemonize() # Create PID file file(g_pid_file, "w").write("%d\n" % os.getpid()) # Install signal hander signal.signal(1, signal_handler) # HUP signal.signal(2, signal_handler) # INT signal.signal(3, signal_handler) # QUIT signal.signal(15, signal_handler) # TERM # Now let's go... run_notifyd() # We reach this point, if the server has been killed by # a signal or hitting Ctrl-C (in foreground mode) os.remove(g_pid_file) log("Successfully shut down.") sys.exit(0) except Exception, e: bail_out(e)