Sophie

Sophie

distrib > Mageia > 5 > i586 > media > core-release > by-pkgid > 4d360ba0b1cda7200bbb9f8980f3433d > files > 171

python-eventlet-doc-0.13.0-7.mga5.noarch.rpm

from tests import LimitedTestCase, silence_warnings
from unittest import main
import eventlet
from eventlet import coros, spawn, sleep
from eventlet.event import Event


class TestQueue(LimitedTestCase):

    @silence_warnings
    def test_send_first(self):
        q = coros.queue()
        q.send('hi')
        self.assertEquals(q.wait(), 'hi')

    @silence_warnings
    def test_send_exception_first(self):
        q = coros.queue()
        q.send(exc=RuntimeError())
        self.assertRaises(RuntimeError, q.wait)

    @silence_warnings
    def test_send_last(self):
        q = coros.queue()
        def waiter(q):
            timer = eventlet.Timeout(0.1)
            self.assertEquals(q.wait(), 'hi2')
            timer.cancel()

        spawn(waiter, q)
        sleep(0)
        sleep(0)
        q.send('hi2')

    @silence_warnings
    def test_max_size(self):
        q = coros.queue(2)
        results = []

        def putter(q):
            q.send('a')
            results.append('a')
            q.send('b')
            results.append('b')
            q.send('c')
            results.append('c')

        spawn(putter, q)
        sleep(0)
        self.assertEquals(results, ['a', 'b'])
        self.assertEquals(q.wait(), 'a')
        sleep(0)
        self.assertEquals(results, ['a', 'b', 'c'])
        self.assertEquals(q.wait(), 'b')
        self.assertEquals(q.wait(), 'c')

    @silence_warnings
    def test_zero_max_size(self):
        q = coros.queue(0)
        def sender(evt, q):
            q.send('hi')
            evt.send('done')

        def receiver(evt, q):
            x = q.wait()
            evt.send(x)

        e1 = Event()
        e2 = Event()

        spawn(sender, e1, q)
        sleep(0)
        self.assert_(not e1.ready())
        spawn(receiver, e2, q)
        self.assertEquals(e2.wait(),'hi')
        self.assertEquals(e1.wait(),'done')

    @silence_warnings
    def test_multiple_waiters(self):
        # tests that multiple waiters get their results back
        q = coros.queue()

        sendings = ['1', '2', '3', '4']
        gts = [eventlet.spawn(q.wait)
                for x in sendings]
                
        eventlet.sleep(0.01) # get 'em all waiting

        q.send(sendings[0])
        q.send(sendings[1])
        q.send(sendings[2])
        q.send(sendings[3])
        results = set()
        for i, gt in enumerate(gts):
            results.add(gt.wait())
        self.assertEquals(results, set(sendings))

    @silence_warnings
    def test_waiters_that_cancel(self):
        q = coros.queue()

        def do_receive(q, evt):
            eventlet.Timeout(0, RuntimeError())
            try:
                result = q.wait()
                evt.send(result)
            except RuntimeError:
                evt.send('timed out')


        evt = Event()
        spawn(do_receive, q, evt)
        self.assertEquals(evt.wait(), 'timed out')

        q.send('hi')
        self.assertEquals(q.wait(), 'hi')

    @silence_warnings
    def test_senders_that_die(self):
        q = coros.queue()

        def do_send(q):
            q.send('sent')

        spawn(do_send, q)
        self.assertEquals(q.wait(), 'sent')

    @silence_warnings
    def test_two_waiters_one_dies(self):
        def waiter(q, evt):
            evt.send(q.wait())
        def do_receive(q, evt):
            eventlet.Timeout(0, RuntimeError())
            try:
                result = q.wait()
                evt.send(result)
            except RuntimeError:
                evt.send('timed out')

        q = coros.queue()
        dying_evt = Event()
        waiting_evt = Event()
        spawn(do_receive, q, dying_evt)
        spawn(waiter, q, waiting_evt)
        sleep(0)
        q.send('hi')
        self.assertEquals(dying_evt.wait(), 'timed out')
        self.assertEquals(waiting_evt.wait(), 'hi')

    @silence_warnings
    def test_two_bogus_waiters(self):
        def do_receive(q, evt):
            eventlet.Timeout(0, RuntimeError())
            try:
                result = q.wait()
                evt.send(result)
            except RuntimeError:
                evt.send('timed out')

        q = coros.queue()
        e1 = Event()
        e2 = Event()
        spawn(do_receive, q, e1)
        spawn(do_receive, q, e2)
        sleep(0)
        q.send('sent')
        self.assertEquals(e1.wait(), 'timed out')
        self.assertEquals(e2.wait(), 'timed out')
        self.assertEquals(q.wait(), 'sent')
                
    @silence_warnings
    def test_waiting(self):
        def do_wait(q, evt):
            result = q.wait()
            evt.send(result)

        q = coros.queue()
        e1 = Event()
        spawn(do_wait, q, e1)
        sleep(0)
        self.assertEquals(1, q.waiting())
        q.send('hi')
        sleep(0)
        self.assertEquals(0, q.waiting())
        self.assertEquals('hi', e1.wait())
        self.assertEquals(0, q.waiting())


class TestChannel(LimitedTestCase):

    @silence_warnings
    def test_send(self):
        sleep(0.1)
        channel = coros.queue(0)

        events = []

        def another_greenlet():
            events.append(channel.wait())
            events.append(channel.wait())

        spawn(another_greenlet)

        events.append('sending')
        channel.send('hello')
        events.append('sent hello')
        channel.send('world')
        events.append('sent world')

        self.assertEqual(['sending', 'hello', 'sent hello', 'world', 'sent world'], events)


    @silence_warnings
    def test_wait(self):
        sleep(0.1)
        channel = coros.queue(0)
        events = []

        def another_greenlet():
            events.append('sending hello')
            channel.send('hello')
            events.append('sending world')
            channel.send('world')
            events.append('sent world')

        spawn(another_greenlet)

        events.append('waiting')
        events.append(channel.wait())
        events.append(channel.wait())

        self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world'], events)
        sleep(0)
        self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world', 'sent world'], events)

    @silence_warnings
    def test_waiters(self):
        c = coros.Channel()
        w1 = eventlet.spawn(c.wait)
        w2 = eventlet.spawn(c.wait)
        w3 = eventlet.spawn(c.wait)
        sleep(0)
        self.assertEquals(c.waiting(), 3)
        s1 = eventlet.spawn(c.send, 1)
        s2 = eventlet.spawn(c.send, 2)
        s3 = eventlet.spawn(c.send, 3)
        sleep(0)  # this gets all the sends into a waiting state
        self.assertEquals(c.waiting(), 0)

        s1.wait()
        s2.wait()
        s3.wait()
        # NOTE: we don't guarantee that waiters are served in order
        results = sorted([w1.wait(), w2.wait(), w3.wait()])
        self.assertEquals(results, [1,2,3])

if __name__=='__main__':
    main()