#!/usr/bin/python # -*- encoding: utf-8; py-indent-offset: 4 -*- # +------------------------------------------------------------------+ # | ____ _ _ __ __ _ __ | # | / ___| |__ ___ ___| | __ | \/ | |/ / | # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / | # | | |___| | | | __/ (__| < | | | | . \ | # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ | # | | # | Copyright Mathias Kettner 2012 mk@mathias-kettner.de | # +------------------------------------------------------------------+ # # This file is part of Check_MK. # The official homepage is at http://mathias-kettner.de/check_mk. # # check_mk is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation in version 2. check_mk is distributed # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with- # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A # PARTICULAR PURPOSE. See the GNU General Public License for more de- # ails. You should have received a copy of the GNU General Public # License along with GNU Make; see the file COPYING. If not, write # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, # Boston, MA 02110-1301 USA. import imaplib, sys, email, email.header, re, getopt class MKImapError(Exception): def __init__(self, reason): self.reason = reason def __str__(self): return "Error accessing IMAP folder: " + str(self.reason) + "\n" state_names = { 0 : "OK", 1 : "WARN", 2 : "CRIT", 3 : "UNKNOWN", } def result(code, text): sys.stderr.write(state_names[code] + " - " + text + "\n") sys.exit(code) def bail_out(reason): sys.stderr.write(reason + "\n") sys.exit(3) def imap_login(): if use_ssl: imap = imaplib.IMAP4_SSL(server) else: imap = imaplib.IMAP4(server) imap.login(user, password) return imap # FLAGS (\Seen \Recent) # FLAGS () def parse_flags(flagsraw): flags = flagsraw.split(None, 1)[1].strip('(').strip(')').replace('\\','').split() return flags def decode_header_to_unicode(value): result = email.header.decode_header(value) parts = [] for header in result: if header[1] == None: parts.append(header[0]) # no encoding else: binstring, encoding = header parts.append(binstring.decode(encoding).strip()) return " ".join(parts) def compile_patterns(plist): for nr, p in enumerate(plist): plist[nr] = re.compile(p, ignore_case and re.I or 0) def find_in_patterns(subject, plist): for p in plist: if p.search(subject): return True return False def usage(): sys.stdout.write(""" Usage: check_imap_folder -H SERVER -u USER -p PASSWORD [OPTIONS] Logs into an IMAP account and retrieves the headers of all messages. The you can specify patterns to search in the subjects of the messages for. Options: -h, --help Output this help and exit -H, --host Hostname or IP-Address of IMAP server -s, --ssl Use SSL for the connection -u, --user USER Name of IMAP user -p, --password PASSWORD Password of this user -f, --folder FOLDER Name of the folder (default is INBOX) -a, --all scan all mails, not only the unread ones -i, --nocase ignore case when applying patterns -w, --warning RE regular expression, warning if found in subject -c, --critical RE regular expression, critical if found in subject """) sys.exit(3) server = None user = None password = None folder = "INBOX" use_ssl = False ignore_seen = True ignore_case = False critical_patterns = [ ] warning_patterns = [ ] short_options = "H:su:p:f:aiw:c:h" long_options = [ "host=", "ssl", "user=", "password=", "folder=", "all", "nocase", "warning=", "critical=", "help" ] try: opts, args = getopt.getopt(sys.argv[1:], short_options, long_options) for o, a in opts: if o in [ '-h', '--help' ]: usage() elif o in [ '-H', '--host' ]: server = a elif o in [ '-s', '--ssl' ]: use_ssl = True elif o in [ '-u', '--user' ]: user = a elif o in [ '-p', '--password' ]: password = a elif o in [ '-f', '--folder' ]: folder = a elif o in [ '-a', '--all' ]: ignore_seen = False elif o in [ '-i', '--nocase' ]: ignore_case = True elif o in [ '-w', '--warning' ]: warning_patterns.append(a) elif o in [ '-c', '--critical' ]: critical_patterns.append(a) except Exception, e: bail_out(str(e)) if server == None: bail_out("Please specify a host with -H.") if user == None: bail_out("Please specify a user name with -u.") if password == None: bail_out("Please specify a password with -p.") compile_patterns(warning_patterns) compile_patterns(critical_patterns) imap = imap_login() status, message = imap.select(folder, readonly=True) if status != "OK": result(3, "Cannot select IMAP folder " + folder) # status, message = imap.response('FLAGS') # print status, message status, message = imap.search(None, 'ALL') if status != "OK": result(3, "Cannot get list of mails in folder") mail_ids = message[0].split() num_total = 0 num_critical = 0 num_warning = 0 for mail_id in mail_ids: status, fields = imap.fetch(mail_id, "(BODY[HEADER] FLAGS)") header = fields[0][1] flags = parse_flags(fields[1]) if "Seen" in flags and ignore_seen: continue message = email.message_from_string(header) mail = dict([ (key, decode_header_to_unicode(val)) for (key, val) in message.items()]) subject = mail["Subject"].replace("\r\n", "") num_total += 1 if find_in_patterns(subject, critical_patterns): num_critical += 1 elif find_in_patterns(subject, warning_patterns): num_warning += 1 infotext = "%d%s messages" % (num_total, ignore_seen and " unread" or "") status = 0 if num_warning: status = 1 infotext += ", %d warning(!)" % num_critical if num_critical: status = 2 infotext += ", %d critical(!!)" % num_critical result(status, infotext)