import warnings warnings.simplefilter('ignore', DeprecationWarning) from eventlet import saranwrap warnings.simplefilter('default', DeprecationWarning) from eventlet import greenpool, sleep import os import eventlet import sys import tempfile import time from tests import LimitedTestCase, main, skip_on_windows, skip_with_pyevent import re import StringIO # random test stuff def list_maker(): return [0,1,2] one = 1 two = 2 three = 3 class CoroutineCallingClass(object): def __init__(self): self._my_dict = {} def run_coroutine(self): eventlet.spawn_n(self._add_random_key) def _add_random_key(self): self._my_dict['random'] = 'yes, random' def get_dict(self): return self._my_dict class TestSaranwrap(LimitedTestCase): TEST_TIMEOUT=8 def assert_server_exists(self, prox): self.assert_(saranwrap.status(prox)) prox.foo = 0 self.assertEqual(0, prox.foo) @skip_on_windows @skip_with_pyevent def test_wrap_tuple(self): my_tuple = (1, 2) prox = saranwrap.wrap(my_tuple) self.assertEqual(prox[0], 1) self.assertEqual(prox[1], 2) self.assertEqual(len(my_tuple), 2) @skip_on_windows @skip_with_pyevent def test_wrap_string(self): my_object = "whatever" prox = saranwrap.wrap(my_object) self.assertEqual(str(my_object), str(prox)) self.assertEqual(len(my_object), len(prox)) self.assertEqual(my_object.join(['a', 'b']), prox.join(['a', 'b'])) @skip_on_windows @skip_with_pyevent def test_wrap_uniterable(self): # here we're treating the exception as just a normal class prox = saranwrap.wrap(FloatingPointError()) def index(): prox[0] def key(): prox['a'] self.assertRaises(IndexError, index) self.assertRaises(TypeError, key) @skip_on_windows @skip_with_pyevent def test_wrap_dict(self): my_object = {'a':1} prox = saranwrap.wrap(my_object) self.assertEqual('a', prox.keys()[0]) self.assertEqual(1, prox['a']) self.assertEqual(str(my_object), str(prox)) self.assertEqual('saran:' + repr(my_object), repr(prox)) @skip_on_windows @skip_with_pyevent def test_wrap_module_class(self): prox = saranwrap.wrap(re) self.assertEqual(saranwrap.Proxy, type(prox)) exp = prox.compile('.') self.assertEqual(exp.flags, 0) self.assert_(repr(prox.compile)) @skip_on_windows @skip_with_pyevent def test_wrap_eq(self): prox = saranwrap.wrap(re) exp1 = prox.compile('.') exp2 = prox.compile(exp1.pattern) self.assertEqual(exp1, exp2) exp3 = prox.compile('/') self.assert_(exp1 != exp3) @skip_on_windows @skip_with_pyevent def test_wrap_nonzero(self): prox = saranwrap.wrap(re) exp1 = prox.compile('.') self.assert_(bool(exp1)) prox2 = saranwrap.Proxy([1, 2, 3]) self.assert_(bool(prox2)) @skip_on_windows @skip_with_pyevent def test_multiple_wraps(self): prox1 = saranwrap.wrap(re) prox2 = saranwrap.wrap(re) x1 = prox1.compile('.') x2 = prox1.compile('.') del x2 x3 = prox2.compile('.') @skip_on_windows @skip_with_pyevent def test_dict_passthru(self): prox = saranwrap.wrap(StringIO) x = prox.StringIO('a') self.assertEqual(type(x.__dict__), saranwrap.ObjectProxy) # try it all on one line just for the sake of it self.assertEqual(type(saranwrap.wrap(StringIO).StringIO('a').__dict__), saranwrap.ObjectProxy) @skip_on_windows @skip_with_pyevent def test_is_value(self): server = saranwrap.Server(None, None, None) self.assert_(server.is_value(None)) @skip_on_windows @skip_with_pyevent def test_wrap_getitem(self): prox = saranwrap.wrap([0,1,2]) self.assertEqual(prox[0], 0) @skip_on_windows @skip_with_pyevent def test_wrap_setitem(self): prox = saranwrap.wrap([0,1,2]) prox[1] = 2 self.assertEqual(prox[1], 2) @skip_on_windows @skip_with_pyevent def test_raising_exceptions(self): prox = saranwrap.wrap(re) def nofunc(): prox.never_name_a_function_like_this() self.assertRaises(AttributeError, nofunc) @skip_on_windows @skip_with_pyevent def test_unpicklable_server_exception(self): prox = saranwrap.wrap(saranwrap) def unpickle(): prox.raise_an_unpicklable_error() self.assertRaises(saranwrap.UnrecoverableError, unpickle) # It's basically dead #self.assert_server_exists(prox) @skip_on_windows @skip_with_pyevent def test_pickleable_server_exception(self): prox = saranwrap.wrap(saranwrap) def fperror(): prox.raise_standard_error() self.assertRaises(FloatingPointError, fperror) self.assert_server_exists(prox) @skip_on_windows @skip_with_pyevent def test_print_does_not_break_wrapper(self): prox = saranwrap.wrap(saranwrap) prox.print_string('hello') self.assert_server_exists(prox) @skip_on_windows @skip_with_pyevent def test_stderr_does_not_break_wrapper(self): prox = saranwrap.wrap(saranwrap) prox.err_string('goodbye') self.assert_server_exists(prox) @skip_on_windows @skip_with_pyevent def test_status(self): prox = saranwrap.wrap(time) a = prox.gmtime(0) status = saranwrap.status(prox) self.assertEqual(status['object_count'], 1) self.assertEqual(status['next_id'], 2) self.assert_(status['pid']) # can't guess what it will be # status of an object should be the same as the module self.assertEqual(saranwrap.status(a), status) # create a new one then immediately delete it prox.gmtime(1) is_id = prox.ctime(1) # sync up deletes status = saranwrap.status(prox) self.assertEqual(status['object_count'], 1) self.assertEqual(status['next_id'], 3) prox2 = saranwrap.wrap(re) self.assert_(status['pid'] != saranwrap.status(prox2)['pid']) @skip_on_windows @skip_with_pyevent def test_del(self): prox = saranwrap.wrap(time) delme = prox.gmtime(0) status_before = saranwrap.status(prox) #print status_before['objects'] del delme # need to do an access that doesn't create an object # in order to sync up the deleted objects prox.ctime(1) status_after = saranwrap.status(prox) #print status_after['objects'] self.assertLessThan(status_after['object_count'], status_before['object_count']) @skip_on_windows @skip_with_pyevent def test_contains(self): prox = saranwrap.wrap({'a':'b'}) self.assert_('a' in prox) self.assert_('x' not in prox) @skip_on_windows @skip_with_pyevent def test_variable_and_keyword_arguments_with_function_calls(self): import optparse prox = saranwrap.wrap(optparse) parser = prox.OptionParser() z = parser.add_option('-n', action='store', type='string', dest='n') opts,args = parser.parse_args(["-nfoo"]) self.assertEqual(opts.n, 'foo') @skip_on_windows @skip_with_pyevent def test_original_proxy_going_out_of_scope(self): def make_re(): prox = saranwrap.wrap(re) # after this function returns, prox should fall out of scope return prox.compile('.') tid = make_re() self.assertEqual(tid.flags, 0) def make_list(): from tests import saranwrap_test prox = saranwrap.wrap(saranwrap_test.list_maker) # after this function returns, prox should fall out of scope return prox() proxl = make_list() self.assertEqual(proxl[2], 2) def test_status_of_none(self): try: saranwrap.status(None) self.assert_(False) except AttributeError, e: pass @skip_on_windows @skip_with_pyevent def test_not_inheriting_pythonpath(self): # construct a fake module in the temp directory temp_dir = tempfile.mkdtemp("saranwrap_test") fp = open(os.path.join(temp_dir, "tempmod.py"), "w") fp.write("""import os, sys pypath = os.environ['PYTHONPATH'] sys_path = sys.path""") fp.close() # this should fail because we haven't stuck the temp_dir in our path yet prox = saranwrap.wrap_module('tempmod') try: prox.pypath self.fail() except ImportError: pass # now try to saranwrap it sys.path.append(temp_dir) try: import tempmod prox = saranwrap.wrap(tempmod) self.assert_(prox.pypath.count(temp_dir)) self.assert_(prox.sys_path.count(temp_dir)) finally: import shutil shutil.rmtree(temp_dir) sys.path.remove(temp_dir) @skip_on_windows @skip_with_pyevent def test_contention(self): from tests import saranwrap_test prox = saranwrap.wrap(saranwrap_test) pool = greenpool.GreenPool(4) pool.spawn_n(lambda: self.assertEquals(prox.one, 1)) pool.spawn_n(lambda: self.assertEquals(prox.two, 2)) pool.spawn_n(lambda: self.assertEquals(prox.three, 3)) pool.waitall() @skip_on_windows @skip_with_pyevent def test_copy(self): import copy compound_object = {'a':[1,2,3]} prox = saranwrap.wrap(compound_object) def make_assertions(copied): self.assert_(isinstance(copied, dict)) self.assert_(isinstance(copied['a'], list)) self.assertEquals(copied, compound_object) self.assertNotEqual(id(compound_object), id(copied)) make_assertions(copy.copy(prox)) make_assertions(copy.deepcopy(prox)) @skip_on_windows @skip_with_pyevent def test_list_of_functions(self): return # this test is known to fail, we can implement it sometime in the future if we wish from tests import saranwrap_test prox = saranwrap.wrap([saranwrap_test.list_maker]) self.assertEquals(list_maker(), prox[0]()) @skip_on_windows @skip_with_pyevent def test_under_the_hood_coroutines(self): # so, we want to write a class which uses a coroutine to call # a function. Then we want to saranwrap that class, have # the object call the coroutine and verify that it ran from tests import saranwrap_test mod_proxy = saranwrap.wrap(saranwrap_test) obj_proxy = mod_proxy.CoroutineCallingClass() obj_proxy.run_coroutine() # sleep for a bit to make sure out coroutine ran by the time # we check the assert below sleep(0.1) self.assert_( 'random' in obj_proxy.get_dict(), 'Coroutine in saranwrapped object did not run') @skip_on_windows @skip_with_pyevent def test_child_process_death(self): prox = saranwrap.wrap({}) pid = saranwrap.getpid(prox) self.assertEqual(os.kill(pid, 0), None) # assert that the process is running del prox # removing all references to the proxy should kill the child process sleep(0.1) # need to let the signal handler run self.assertRaises(OSError, os.kill, pid, 0) # raises OSError if pid doesn't exist @skip_on_windows @skip_with_pyevent def test_detection_of_server_crash(self): # make the server crash here pass @skip_on_windows @skip_with_pyevent def test_equality_with_local_object(self): # we'll implement this if there's a use case for it pass @skip_on_windows @skip_with_pyevent def test_non_blocking(self): # here we test whether it's nonblocking pass if __name__ == '__main__': main()