Sophie

Sophie

distrib > Mageia > 5 > i586 > media > core-release > by-pkgid > 4d360ba0b1cda7200bbb9f8980f3433d > files > 118

python-eventlet-doc-0.13.0-7.mga5.noarch.rpm

import os
import os.path
import socket
from unittest import TestCase, main
import warnings

import eventlet
warnings.simplefilter('ignore', DeprecationWarning)
from eventlet import api
warnings.simplefilter('default', DeprecationWarning)
from eventlet import greenio, util, hubs, greenthread, spawn

from tests import skip_if_no_ssl


def check_hub():
    # Clear through the descriptor queue
    api.sleep(0)
    api.sleep(0)
    hub = hubs.get_hub()
    for nm in 'get_readers', 'get_writers':
        dct = getattr(hub, nm)()
        assert not dct, "hub.%s not empty: %s" % (nm, dct)
    # Stop the runloop (unless it's twistedhub which does not support that)
    if not getattr(hub, 'uses_twisted_reactor', None):
        hub.abort(True)
        assert not hub.running


class TestApi(TestCase):

    certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
    private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')

    def test_tcp_listener(self):
        socket = eventlet.listen(('0.0.0.0', 0))
        assert socket.getsockname()[0] == '0.0.0.0'
        socket.close()

        check_hub()

    def test_connect_tcp(self):
        def accept_once(listenfd):
            try:
                conn, addr = listenfd.accept()
                fd = conn.makefile(mode='w')
                conn.close()
                fd.write('hello\n')
                fd.close()
            finally:
                listenfd.close()

        server = eventlet.listen(('0.0.0.0', 0))
        api.spawn(accept_once, server)

        client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
        fd = client.makefile()
        client.close()
        assert fd.readline() == 'hello\n'

        assert fd.read() == ''
        fd.close()

        check_hub()

    @skip_if_no_ssl
    def test_connect_ssl(self):
        def accept_once(listenfd):
            try:
                conn, addr = listenfd.accept()
                conn.write('hello\r\n')
                greenio.shutdown_safe(conn)
                conn.close()
            finally:
                greenio.shutdown_safe(listenfd)
                listenfd.close()

        server = api.ssl_listener(('0.0.0.0', 0),
                                  self.certificate_file,
                                  self.private_key_file)
        api.spawn(accept_once, server)

        raw_client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
        client = util.wrap_ssl(raw_client)
        fd = socket._fileobject(client, 'rb', 8192)

        assert fd.readline() == 'hello\r\n'
        try:
            self.assertEquals('', fd.read(10))
        except greenio.SSL.ZeroReturnError:
            # if it's a GreenSSL object it'll do this
            pass
        greenio.shutdown_safe(client)
        client.close()
        
        check_hub()

    def test_001_trampoline_timeout(self):
        from eventlet import coros
        server_sock = eventlet.listen(('127.0.0.1', 0))
        bound_port = server_sock.getsockname()[1]
        def server(sock):
            client, addr = sock.accept()
            api.sleep(0.1)
        server_evt = spawn(server, server_sock)
        api.sleep(0)
        try:
            desc = eventlet.connect(('127.0.0.1', bound_port))
            api.trampoline(desc, read=True, write=False, timeout=0.001)
        except api.TimeoutError:
            pass # test passed
        else:
            assert False, "Didn't timeout"

        server_evt.wait()
        check_hub()

    def test_timeout_cancel(self):
        server = eventlet.listen(('0.0.0.0', 0))
        bound_port = server.getsockname()[1]

        done = [False]
        def client_closer(sock):
            while True:
                (conn, addr) = sock.accept()
                conn.close()

        def go():
            desc = eventlet.connect(('127.0.0.1', bound_port))
            try:
                api.trampoline(desc, read=True, timeout=0.1)
            except api.TimeoutError:
                assert False, "Timed out"

            server.close()
            desc.close()
            done[0] = True

        greenthread.spawn_after_local(0, go)

        server_coro = api.spawn(client_closer, server)
        while not done[0]:
            api.sleep(0)
        api.kill(server_coro)

        check_hub()

    def test_named(self):
        named_foo = api.named('tests.api_test.Foo')
        self.assertEquals(
            named_foo.__name__,
            "Foo")

    def test_naming_missing_class(self):
        self.assertRaises(
            ImportError, api.named, 'this_name_should_hopefully_not_exist.Foo')
        
        
    def test_killing_dormant(self):
        DELAY = 0.1
        state = []
        def test():
            try:
                state.append('start')
                api.sleep(DELAY)
            except:
                state.append('except')
                # catching GreenletExit
                pass
            # when switching to hub, hub makes itself the parent of this greenlet,
            # thus after the function's done, the control will go to the parent
            api.sleep(0)
            state.append('finished')
        g = api.spawn(test)
        api.sleep(DELAY/2)
        self.assertEquals(state, ['start'])
        api.kill(g)
        # will not get there, unless switching is explicitly scheduled by kill
        self.assertEquals(state,['start', 'except'])
        api.sleep(DELAY)
        self.assertEquals(state, ['start', 'except', 'finished'])

    def test_nested_with_timeout(self):
        def func():
            return api.with_timeout(0.2, api.sleep, 2, timeout_value=1)
        self.assertRaises(api.TimeoutError, api.with_timeout, 0.1, func)


class Foo(object):
    pass


if __name__ == '__main__':
    main()