Sophie

Sophie

distrib > Mageia > 7 > armv7hl > media > core-updates > by-pkgid > 05cb0e982c3387afdb1d2b5f913d9e82 > files > 88

python-eventlet-doc-0.24.1-1.1.mga7.noarch.rpm

import eventlet
from eventlet import event, hubs, queue
import tests


def do_bail(q):
    eventlet.Timeout(0, RuntimeError())
    try:
        result = q.get()
        return result
    except RuntimeError:
        return 'timed out'


class TestQueue(tests.LimitedTestCase):
    def test_send_first(self):
        q = eventlet.Queue()
        q.put('hi')
        self.assertEqual(q.get(), 'hi')

    def test_send_last(self):
        q = eventlet.Queue()

        def waiter(q):
            self.assertEqual(q.get(), 'hi2')

        gt = eventlet.spawn(eventlet.with_timeout, 0.1, waiter, q)
        eventlet.sleep(0)
        eventlet.sleep(0)
        q.put('hi2')
        gt.wait()

    def test_max_size(self):
        q = eventlet.Queue(2)
        results = []

        def putter(q):
            q.put('a')
            results.append('a')
            q.put('b')
            results.append('b')
            q.put('c')
            results.append('c')

        gt = eventlet.spawn(putter, q)
        eventlet.sleep(0)
        self.assertEqual(results, ['a', 'b'])
        self.assertEqual(q.get(), 'a')
        eventlet.sleep(0)
        self.assertEqual(results, ['a', 'b', 'c'])
        self.assertEqual(q.get(), 'b')
        self.assertEqual(q.get(), 'c')
        gt.wait()

    def test_zero_max_size(self):
        q = eventlet.Queue(0)

        def sender(evt, q):
            q.put('hi')
            evt.send('done')

        def receiver(q):
            x = q.get()
            return x

        evt = event.Event()
        gt = eventlet.spawn(sender, evt, q)
        eventlet.sleep(0)
        assert not evt.ready()
        gt2 = eventlet.spawn(receiver, q)
        self.assertEqual(gt2.wait(), 'hi')
        self.assertEqual(evt.wait(), 'done')
        gt.wait()

    def test_resize_up(self):
        q = eventlet.Queue(0)

        def sender(evt, q):
            q.put('hi')
            evt.send('done')

        evt = event.Event()
        gt = eventlet.spawn(sender, evt, q)
        eventlet.sleep(0)
        assert not evt.ready()
        q.resize(1)
        eventlet.sleep(0)
        assert evt.ready()
        gt.wait()

    def test_resize_down(self):
        q = eventlet.Queue(5)

        for i in range(5):
            q.put(i)

        self.assertEqual(list(q.queue), list(range(5)))
        q.resize(1)
        eventlet.sleep(0)
        self.assertEqual(list(q.queue), list(range(5)))

    def test_resize_to_Unlimited(self):
        q = eventlet.Queue(0)

        def sender(evt, q):
            q.put('hi')
            evt.send('done')

        evt = event.Event()
        gt = eventlet.spawn(sender, evt, q)
        eventlet.sleep()
        self.assertFalse(evt.ready())
        q.resize(None)
        eventlet.sleep()
        self.assertTrue(evt.ready())
        gt.wait()

    def test_multiple_waiters(self):
        # tests that multiple waiters get their results back
        q = eventlet.Queue()

        sendings = ['1', '2', '3', '4']
        gts = [eventlet.spawn(q.get) for x in sendings]

        eventlet.sleep(0.01)  # get 'em all waiting

        q.put(sendings[0])
        q.put(sendings[1])
        q.put(sendings[2])
        q.put(sendings[3])
        results = set()
        for i, gt in enumerate(gts):
            results.add(gt.wait())
        self.assertEqual(results, set(sendings))

    def test_waiters_that_cancel(self):
        q = eventlet.Queue()

        gt = eventlet.spawn(do_bail, q)
        self.assertEqual(gt.wait(), 'timed out')

        q.put('hi')
        self.assertEqual(q.get(), 'hi')

    def test_getting_before_sending(self):
        q = eventlet.Queue()
        gt = eventlet.spawn(q.put, 'sent')
        self.assertEqual(q.get(), 'sent')
        gt.wait()

    def test_two_waiters_one_dies(self):
        def waiter(q):
            return q.get()

        q = eventlet.Queue()
        dying = eventlet.spawn(do_bail, q)
        waiting = eventlet.spawn(waiter, q)
        eventlet.sleep(0)
        q.put('hi')
        self.assertEqual(dying.wait(), 'timed out')
        self.assertEqual(waiting.wait(), 'hi')

    def test_two_bogus_waiters(self):
        q = eventlet.Queue()
        gt1 = eventlet.spawn(do_bail, q)
        gt2 = eventlet.spawn(do_bail, q)
        eventlet.sleep(0)
        q.put('sent')
        self.assertEqual(gt1.wait(), 'timed out')
        self.assertEqual(gt2.wait(), 'timed out')
        self.assertEqual(q.get(), 'sent')

    def test_waiting(self):
        q = eventlet.Queue()
        gt1 = eventlet.spawn(q.get)
        eventlet.sleep(0)
        self.assertEqual(1, q.getting())
        q.put('hi')
        eventlet.sleep(0)
        self.assertEqual(0, q.getting())
        self.assertEqual('hi', gt1.wait())
        self.assertEqual(0, q.getting())

    def test_channel_send(self):
        channel = eventlet.Queue(0)
        events = []

        def another_greenlet():
            events.append(channel.get())
            events.append(channel.get())

        eventlet.spawn(another_greenlet)

        events.append('sending')
        channel.put('hello')
        events.append('sent hello')
        channel.put('world')
        events.append('sent world')

        self.assertEqual(['sending', 'hello', 'sent hello', 'world', 'sent world'], events)

    def test_channel_wait(self):
        channel = eventlet.Queue(0)
        events = []

        def another_greenlet():
            events.append('sending hello')
            channel.put('hello')
            events.append('sending world')
            channel.put('world')
            events.append('sent world')

        eventlet.spawn(another_greenlet)

        events.append('waiting')
        events.append(channel.get())
        events.append(channel.get())

        self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world'], events)
        eventlet.sleep(0)
        self.assertEqual(
            ['waiting', 'sending hello', 'hello', 'sending world', 'world', 'sent world'], events)

    def test_channel_waiters(self):
        c = eventlet.Queue(0)
        w1 = eventlet.spawn(c.get)
        w2 = eventlet.spawn(c.get)
        w3 = eventlet.spawn(c.get)
        eventlet.sleep(0)
        self.assertEqual(c.getting(), 3)
        s1 = eventlet.spawn(c.put, 1)
        s2 = eventlet.spawn(c.put, 2)
        s3 = eventlet.spawn(c.put, 3)

        s1.wait()
        s2.wait()
        s3.wait()
        self.assertEqual(c.getting(), 0)
        # NOTE: we don't guarantee that waiters are served in order
        results = sorted([w1.wait(), w2.wait(), w3.wait()])
        self.assertEqual(results, [1, 2, 3])

    def test_channel_sender_timing_out(self):
        c = eventlet.Queue(0)
        self.assertRaises(queue.Full, c.put, "hi", timeout=0.001)
        self.assertRaises(queue.Empty, c.get_nowait)

    def test_task_done(self):
        channel = eventlet.Queue(0)
        X = object()
        gt = eventlet.spawn(channel.put, X)
        result = channel.get()
        assert result is X, (result, X)
        assert channel.unfinished_tasks == 1, channel.unfinished_tasks
        channel.task_done()
        assert channel.unfinished_tasks == 0, channel.unfinished_tasks
        gt.wait()

    def test_join_doesnt_block_when_queue_is_already_empty(self):
        queue = eventlet.Queue()
        queue.join()

    def test_zero_length_queue_nonblocking_put(self):
        hub = hubs.get_hub()
        queue = eventlet.Queue(0)
        got = []

        def fetch_item():
            got.append(queue.get())

        for _ in range(10):
            good_getter = eventlet.spawn(fetch_item)
            bad_getter = eventlet.spawn(fetch_item)
            hub.schedule_call_global(0, bad_getter.throw, Exception("kaboom"))
        eventlet.sleep(0)

        for i in range(10):
            queue.put(i)

        self.assertEqual(got, list(range(10)))


def store_result(result, func, *args):
    try:
        result.append(func(*args))
    except Exception as exc:
        result.append(exc)


class TestNoWait(tests.LimitedTestCase):
    def test_put_nowait_simple(self):
        hub = hubs.get_hub()
        result = []
        q = eventlet.Queue(1)
        hub.schedule_call_global(0, store_result, result, q.put_nowait, 2)
        hub.schedule_call_global(0, store_result, result, q.put_nowait, 3)
        eventlet.sleep(0)
        eventlet.sleep(0)
        assert len(result) == 2, result
        assert result[0] is None, result
        assert isinstance(result[1], queue.Full), result

    def test_get_nowait_simple(self):
        hub = hubs.get_hub()
        result = []
        q = eventlet.Queue(1)
        q.put(4)
        hub.schedule_call_global(0, store_result, result, q.get_nowait)
        hub.schedule_call_global(0, store_result, result, q.get_nowait)
        eventlet.sleep(0)
        assert len(result) == 2, result
        assert result[0] == 4, result
        assert isinstance(result[1], queue.Empty), result

    # get_nowait must work from the mainloop
    def test_get_nowait_unlock(self):
        hub = hubs.get_hub()
        result = []
        q = eventlet.Queue(0)
        p = eventlet.spawn(q.put, 5)
        assert q.empty(), q
        assert q.full(), q
        eventlet.sleep(0)
        assert q.empty(), q
        assert q.full(), q
        hub.schedule_call_global(0, store_result, result, q.get_nowait)
        eventlet.sleep(0)
        assert q.empty(), q
        assert q.full(), q
        assert result == [5], result
        # TODO add ready to greenthread
        # assert p.ready(), p
        assert p.dead, p
        assert q.empty(), q

    # put_nowait must work from the mainloop
    def test_put_nowait_unlock(self):
        hub = hubs.get_hub()
        result = []
        q = eventlet.Queue(0)
        eventlet.spawn(q.get)
        assert q.empty(), q
        assert q.full(), q
        eventlet.sleep(0)
        assert q.empty(), q
        assert q.full(), q
        hub.schedule_call_global(0, store_result, result, q.put_nowait, 10)
        # TODO ready method on greenthread
        # assert not p.ready(), p
        eventlet.sleep(0)
        assert result == [None], result
        # TODO ready method
        # assert p.ready(), p
        assert q.full(), q
        assert q.empty(), q

    def test_wait_except(self):
        # https://github.com/eventlet/eventlet/issues/407
        q = eventlet.Queue()

        def get():
            q.get()
            raise KeyboardInterrupt

        eventlet.spawn(get)
        eventlet.sleep()

        with tests.assert_raises(KeyboardInterrupt):
            q.put(None)
            eventlet.sleep()