Sophie

Sophie

distrib > Mageia > 4 > x86_64 > by-pkgid > 23755043ecaf0c3e2edc914f9b45b2c6 > files > 133

python-eventlet-doc-0.13.0-3.mga4.noarch.rpm

import os
import sys
import time
import traceback
from tests import skipped, skip_unless, using_pyevent, get_database_auth, LimitedTestCase
import eventlet
from eventlet import event
try:
    from eventlet.green import MySQLdb
except ImportError:
    MySQLdb = False

def mysql_requirement(_f):
    """We want to skip tests if using pyevent, MySQLdb is not installed, or if
    there is no database running on the localhost that the auth file grants
    us access to.
    
    This errs on the side of skipping tests if everything is not right, but
    it's better than a million tests failing when you don't care about mysql
    support."""
    if using_pyevent(_f):
        return False
    if MySQLdb is False:
        print "Skipping mysql tests, MySQLdb not importable"
        return False
    try:
        auth = get_database_auth()['MySQLdb'].copy()
        MySQLdb.connect(**auth)
        return True
    except MySQLdb.OperationalError:
        print "Skipping mysql tests, error when connecting:"
        traceback.print_exc()
        return False

class MySQLdbTester(LimitedTestCase):
    def setUp(self):
        self._auth = get_database_auth()['MySQLdb']
        self.create_db()
        self.connection = None
        self.connection = MySQLdb.connect(**self._auth)
        cursor = self.connection.cursor()
        cursor.execute("""CREATE TABLE gargleblatz
        (
        a INTEGER
        );""")
        self.connection.commit()
        cursor.close()

    def tearDown(self):
        if self.connection:
            self.connection.close()
        self.drop_db()

    @skip_unless(mysql_requirement)    
    def create_db(self):
        auth = self._auth.copy()
        try:
            self.drop_db()
        except Exception:
            pass
        dbname = 'test_%d_%d' % (os.getpid(), int(time.time()*1000))
        db = MySQLdb.connect(**auth).cursor()
        db.execute("create database "+dbname)
        db.close()
        self._auth['db'] = dbname
        del db

    def drop_db(self):
        db = MySQLdb.connect(**self._auth).cursor()
        db.execute("drop database "+self._auth['db'])
        db.close()
        del db

    def set_up_dummy_table(self, connection=None):
        close_connection = False
        if connection is None:
            close_connection = True
            if self.connection is None:
                connection = MySQLdb.connect(**self._auth)
            else:
                connection = self.connection

        cursor = connection.cursor()
        cursor.execute(self.dummy_table_sql)
        connection.commit()
        cursor.close()
        if close_connection:
            connection.close()

    dummy_table_sql = """CREATE TEMPORARY TABLE test_table
        (
        row_id INTEGER PRIMARY KEY AUTO_INCREMENT,
        value_int INTEGER,
        value_float FLOAT,
        value_string VARCHAR(200),
        value_uuid CHAR(36),
        value_binary BLOB,
        value_binary_string VARCHAR(200) BINARY,
        value_enum ENUM('Y','N'),
        created TIMESTAMP
        ) ENGINE=InnoDB;"""

    def assert_cursor_yields(self, curs):
        counter = [0]
        def tick():
            while True:
                counter[0] += 1
                eventlet.sleep()
        gt = eventlet.spawn(tick)
        curs.execute("select 1")
        rows = curs.fetchall()
        self.assertEqual(rows, ((1L,),))
        self.assert_(counter[0] > 0, counter[0])
        gt.kill()

    def assert_cursor_works(self, cursor):
        cursor.execute("select 1")
        rows = cursor.fetchall()
        self.assertEqual(rows, ((1L,),))
        self.assert_cursor_yields(cursor)
        
    def assert_connection_works(self, conn):
        curs = conn.cursor()
        self.assert_cursor_works(curs)

    def test_module_attributes(self):
        import MySQLdb as orig
        for key in dir(orig):
            if key not in ('__author__', '__path__', '__revision__',
                           '__version__', '__loader__'):
                self.assert_(hasattr(MySQLdb, key), "%s %s" % (key, getattr(orig, key)))

    def test_connecting(self):
        self.assert_(self.connection is not None)
        
    def test_connecting_annoyingly(self):
        self.assert_connection_works(MySQLdb.Connect(**self._auth))
        self.assert_connection_works(MySQLdb.Connection(**self._auth))
        self.assert_connection_works(MySQLdb.connections.Connection(**self._auth))

    def test_create_cursor(self):
        cursor = self.connection.cursor()
        cursor.close()

    def test_run_query(self):
        cursor = self.connection.cursor()
        self.assert_cursor_works(cursor)
        cursor.close()

    def test_run_bad_query(self):
        cursor = self.connection.cursor()
        try:
            cursor.execute("garbage blah blah")
            self.assert_(False)
        except AssertionError:
            raise
        except Exception:
            pass
        cursor.close()

    def fill_up_table(self, conn):
        curs = conn.cursor()
        for i in range(1000):
            curs.execute('insert into test_table (value_int) values (%s)' % i)
        conn.commit()

    def test_yields(self):
        conn = self.connection
        self.set_up_dummy_table(conn)
        self.fill_up_table(conn)
        curs = conn.cursor()
        results = []
        SHORT_QUERY = "select * from test_table"
        evt = event.Event()
        def a_query():
            self.assert_cursor_works(curs)
            curs.execute(SHORT_QUERY)
            results.append(2)
            evt.send()
        eventlet.spawn(a_query)
        results.append(1)
        self.assertEqual([1], results)
        evt.wait()
        self.assertEqual([1, 2], results)

    def test_visibility_from_other_connections(self):
        conn = MySQLdb.connect(**self._auth)
        conn2 = MySQLdb.connect(**self._auth)
        curs = conn.cursor()
        try:
            curs2 = conn2.cursor()
            curs2.execute("insert into gargleblatz (a) values (%s)" % (314159))
            self.assertEqual(curs2.rowcount, 1)
            conn2.commit()
            selection_query = "select * from gargleblatz"
            curs2.execute(selection_query)
            self.assertEqual(curs2.rowcount, 1)
            del curs2, conn2
            # create a new connection, it should see the addition
            conn3 = MySQLdb.connect(**self._auth)
            curs3 = conn3.cursor()
            curs3.execute(selection_query)
            self.assertEqual(curs3.rowcount, 1)
            # now, does the already-open connection see it?
            curs.execute(selection_query)
            self.assertEqual(curs.rowcount, 1)
            del curs3, conn3
        finally:
            # clean up my litter
            curs.execute("delete from gargleblatz where a=314159")
            conn.commit()

from tests import patcher_test

class MonkeyPatchTester(patcher_test.ProcessBase):
    @skip_unless(mysql_requirement)    
    def test_monkey_patching(self):
        output, lines = self.run_script("""
from eventlet import patcher
import MySQLdb as m
from eventlet.green import MySQLdb as gm
patcher.monkey_patch(all=True, MySQLdb=True)
print "mysqltest", ",".join(sorted(patcher.already_patched.keys()))
print "connect", m.connect == gm.connect
""")
        self.assertEqual(len(lines), 3)
        self.assertEqual(lines[0].replace("psycopg,", ""),
                         'mysqltest MySQLdb,os,select,socket,thread,time')
        self.assertEqual(lines[1], "connect True")