Sophie

Sophie

distrib > Fedora > 18 > i386 > by-pkgid > b746669f85fc995c0c36b41b2fe494a0 > files > 42

python3-dirq-1.4-1.fc18.noarch.rpm

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Test program for testing dirq.QueueSet module.
"""

import os
import re
import shutil
import sys
import tempfile
import time
from optparse import OptionParser

sys.path.insert(1, re.sub('/\w*$', '', os.getcwd()))
import dirq
from dirq import queue

OS = ''
TEST = ''


def init():
    """ Initialize. """
    global OS, TEST
    parser = OptionParser(usage="%prog [OPTIONS] [--] TEST",
                          version=("%%prog %s" % dirq.VERSION))
    parser.add_option('-d', '--debug', dest='debug', action="store_true",
                      default=False, help="show debugging information")
    parser.add_option('-p', '--path', dest='path', type='string', default='',
                      help="set the queue path")
    parser.add_option('-c', '--count', dest='count', type='int', default=0,
                      help="set the elements count")
    parser.add_option("-s", "--size", dest="size", type='int', default=0,
                      help="set the body size for added elements")
    parser.add_option("-r", "--random", dest="random", action="store_true",
                      default=False, help="randomize the body size")
    parser.add_option("--header", dest="header", action="store_true",
                      default=False, help="set header for added elements")
    parser.add_option("--maxelts", dest="maxelts", type='int', default=0,
                      help="set the maximum number of elements per directory")

    OS, args = parser.parse_args()
    if not OS.path:
        print("*** mandatory option not set: path")
        sys.exit(1)
    if len(args) != 0:
        TEST = args[0]
    else:
        parser.print_help()
        sys.exit()


def debug(format, *arguments):
    """Report a debugging message.
    """
    if not OS.debug:
        return
    message = format % arguments
    message = re.sub('\s+$', '.', message)
    sys.stderr.write("# %s %s[%d]: %s\n" %
                     (time.strftime("%Y/%m/%d-%H:%M:%S",
                                    time.localtime(time.time())),
                      os.path.basename(sys.argv[0]),
                      os.getpid(), message))


def new_dirq(path, _schema):
    """Create a new dirq.Queue object, optionally with schema.
    """
    if _schema:
        schema = {'body': 'string'}
    else:
        schema = {}
    return queue.Queue(path, maxelts=OS.maxelts, schema=schema)


def new_dirqs():
    """Create a new Directory::Queue object, optionally with schema.
    """
    time1 = time.time()
    qs = queue.QueueSet(OS.path.split(','))
    debug("created queue set in %.4f seconds", time.time() - time1)
    return qs


def test_count():
    """Count the elements in the queue.
    """
    qs = new_dirqs()
    time1 = time.time()
    count = qs.count()
    time2 = time.time()
    debug("queue set has %d elements", count)
    debug("done in %.4f seconds", time2 - time1)


def test_add():
    """Add elements to the queue.
    """


def test_complex():
    """Add elements to the queue.
    """
    wd = OS.path
    os.mkdir(wd)
    qn = 6
    paths = []
    for i in range(qn):
        paths.append(wd + '/q%i' % i)
    count = OS.count or 1000
    debug("creating %i initial queues. adding %i elements into each." %
          (qn, count))
    queues = []
    t1 = time.time()
    while qn:
        q = new_dirq(paths[qn - 1], 1)
        debug("adding %d elements to the queue...", count)
        element = {}
        done = 0
        time1 = time.time()
        while not count or done < count:
            done += 1
            element['body'] = 'Element %i \u263A\n' % done
            q.add(element)
        time2 = time.time()
        debug("done in %.4f seconds", time2 - time1)
        queues.append(q)
        qn -= 1
    debug("total done in %.4f seconds", time.time() - t1)

    time1 = time.time()
    i = 3
    qs = queue.QueueSet(queues[0:i])
    debug("created queue set in %.4f seconds", time.time() - time1)
    debug("elements in %i queues: %i" % (i, qs.count()))

    debug("adding remaining queues to the set.")
    t1 = time.time()
    qs.add(queues[i:])
    debug("done in %.4f sec." % (time.time() - t1))
    debug("total element with added queues: %i" % qs.count())

    debug("removing %i first queues." % i)
    t1 = time.time()
    for q in queues[0:i]:
        qs.remove(q)
    debug("done in %.4f sec." % (time.time() - t1))

    debug("number of elements left: %i" % qs.count())

    debug("deleting queues from disk...")
    for path in paths:
        shutil.rmtree(path, ignore_errors=True)
    debug("done.")


def test_iterate():
    """Iterate through the set of queues (only lock+unlock).
    """
    debug("iterating all elements in the set of queues (one pass)...")
    qs = new_dirqs()
    done = 0
    time1 = time.time()
    name = qs.first()
    while name:
        if not qs.lock(name):
            name = qs.next()
            continue
        qs.unlock(name)
        done += 1
        name = qs.next()
    time2 = time.time()
    debug("done in %.4f seconds (%d elements)", time2 - time1, done)


def main_complex():
    """A wrapper to run from a library.
    """
    global OS

    class options(object):
        """ options class. """
        path = tempfile.mkdtemp() + '/dirqset'
        count = 100
        random = False
        size = False
        header = False
        debug = True
        maxelts = 0
    OS = options()
    try:
        test_complex()
    except Exception:
        error = sys.exc_info()[1]
        shutil.rmtree(OS.path, ignore_errors=True)
        raise error
    shutil.rmtree(OS.path, ignore_errors=True)

if __name__ == "__main__":
    init()
    if TEST == "count":
        test_count()
    elif TEST == "add":
        test_add()
    elif TEST == "complex":
        test_complex()
    elif TEST == "iterate":
        test_iterate()
    else:
        print("unsupported test:", TEST)