Sophie

Sophie

distrib > Mageia > 5 > i586 > media > core-release > by-pkgid > 4d360ba0b1cda7200bbb9f8980f3433d > files > 121

python-eventlet-doc-0.13.0-7.mga5.noarch.rpm

from unittest import main
from tests import LimitedTestCase, silence_warnings
import eventlet
from eventlet import coros
from eventlet import event
from eventlet import greenthread

class IncrActor(coros.Actor):
    def received(self, evt):
        self.value = getattr(self, 'value', 0) + 1
        if evt: evt.send()


class TestActor(LimitedTestCase):
    mode = 'static'

    @silence_warnings
    def setUp(self):
        super(TestActor, self).setUp()
        self.actor = IncrActor()

    def tearDown(self):
        super(TestActor, self).tearDown()
        greenthread.kill(self.actor._killer)

    def test_cast(self):
        evt = event.Event()
        self.actor.cast(evt)
        evt.wait()
        evt.reset()
        self.assertEqual(self.actor.value, 1)
        self.actor.cast(evt)
        evt.wait()
        self.assertEqual(self.actor.value, 2)

    def test_cast_multi_1(self):
        # make sure that both messages make it in there
        evt = event.Event()
        evt1 = event.Event()
        self.actor.cast(evt)
        self.actor.cast(evt1)
        evt.wait()
        evt1.wait()
        self.assertEqual(self.actor.value, 2)

    def test_cast_multi_2(self):
        # the actor goes through a slightly different code path if it
        # is forced to enter its event loop prior to any cast()s
        eventlet.sleep(0)
        self.test_cast_multi_1()

    def test_sleeping_during_received(self):
        # ensure that even if the received method cooperatively
        # yields, eventually all messages are delivered
        msgs = []
        waiters = []
        def received( (message, evt) ):
            eventlet.sleep(0)
            msgs.append(message)
            evt.send()
        self.actor.received = received

        waiters.append(event.Event())
        self.actor.cast( (1, waiters[-1]))
        eventlet.sleep(0)
        waiters.append(event.Event())
        self.actor.cast( (2, waiters[-1]) )
        waiters.append(event.Event())
        self.actor.cast( (3, waiters[-1]) )
        eventlet.sleep(0)
        waiters.append(event.Event())
        self.actor.cast( (4, waiters[-1]) )
        waiters.append(event.Event())
        self.actor.cast( (5, waiters[-1]) )
        for evt in waiters:
            evt.wait()
        self.assertEqual(msgs, [1,2,3,4,5])

    def test_raising_received(self):
        msgs = []
        def received( (message, evt) ):
            evt.send()
            if message == 'fail':
                raise RuntimeError()
            else:
                msgs.append(message)

        self.actor.received = received

        evt = event.Event()
        self.actor.cast( ('fail', evt) )
        evt.wait()
        evt.reset()
        self.actor.cast( ('should_appear', evt) )
        evt.wait()
        self.assertEqual(['should_appear'], msgs)

    @silence_warnings
    def test_multiple(self):
        self.actor = IncrActor(concurrency=2)
        total = [0]
        def received( (func, ev, value) ):
            func()
            total[0] += value
            ev.send()
        self.actor.received = received

        def onemoment():
            eventlet.sleep(0.1)

        evt = event.Event()
        evt1 = event.Event()

        self.actor.cast( (onemoment, evt, 1) )
        self.actor.cast( (lambda: None, evt1, 2) )

        evt1.wait()
        self.assertEqual(total[0], 2)
        eventlet.sleep(0)
        self.assertEqual(self.actor._pool.free(), 1)
        evt.wait()
        self.assertEqual(total[0], 3)
        eventlet.sleep(0)
        self.assertEqual(self.actor._pool.free(), 2)


if __name__ == '__main__':
    main()