Sophie

Sophie

distrib > Mageia > 5 > i586 > media > core-release > by-pkgid > 4d360ba0b1cda7200bbb9f8980f3433d > files > 188

python-eventlet-doc-0.13.0-7.mga5.noarch.rpm

import cgi
from eventlet import greenthread
import eventlet
import errno
import os
import socket
import sys
from tests import skipped, LimitedTestCase, skip_with_pyevent, skip_if_no_ssl
from unittest import main

from eventlet import greenio
from eventlet import event
from eventlet.green import socket as greensocket
from eventlet import wsgi
from eventlet.support import get_errno

from tests import find_command

httplib = eventlet.import_patched('httplib')

certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')

try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO


def hello_world(env, start_response):
    if env['PATH_INFO'] == 'notexist':
        start_response('404 Not Found', [('Content-type', 'text/plain')])
        return ["not found"]

    start_response('200 OK', [('Content-type', 'text/plain')])
    return ["hello world"]


def chunked_app(env, start_response):
    start_response('200 OK', [('Content-type', 'text/plain')])
    yield "this"
    yield "is"
    yield "chunked"


def chunked_fail_app(environ, start_response):
    """http://rhodesmill.org/brandon/2013/chunked-wsgi/
    """
    headers = [('Content-Type', 'text/plain')]
    start_response('200 OK', headers)

    # We start streaming data just fine.
    yield "The dwarves of yore made mighty spells,"
    yield "While hammers fell like ringing bells"

    # Then the back-end fails!
    try:
        1 / 0
    except Exception:
        start_response('500 Error', headers, sys.exc_info())
        return

    # So rest of the response data is not available.
    yield "In places deep, where dark things sleep,"
    yield "In hollow halls beneath the fells."


def big_chunks(env, start_response):
    start_response('200 OK', [('Content-type', 'text/plain')])
    line = 'a' * 8192
    for x in range(10):
        yield line


def use_write(env, start_response):
    if env['PATH_INFO'] == '/a':
        write = start_response('200 OK', [('Content-type', 'text/plain'),
                                          ('Content-Length', '5')])
        write('abcde')
    if env['PATH_INFO'] == '/b':
        write = start_response('200 OK', [('Content-type', 'text/plain')])
        write('abcde')
    return []


def chunked_post(env, start_response):
    start_response('200 OK', [('Content-type', 'text/plain')])
    if env['PATH_INFO'] == '/a':
        return [env['wsgi.input'].read()]
    elif env['PATH_INFO'] == '/b':
        return [x for x in iter(lambda: env['wsgi.input'].read(4096), '')]
    elif env['PATH_INFO'] == '/c':
        return [x for x in iter(lambda: env['wsgi.input'].read(1), '')]


def already_handled(env, start_response):
    start_response('200 OK', [('Content-type', 'text/plain')])
    return wsgi.ALREADY_HANDLED


class Site(object):
    def __init__(self):
        self.application = hello_world

    def __call__(self, env, start_response):
        return self.application(env, start_response)


class IterableApp(object):

    def __init__(self, send_start_response=False, return_val=wsgi.ALREADY_HANDLED):
        self.send_start_response = send_start_response
        self.return_val = return_val
        self.env = {}

    def __call__(self, env, start_response):
        self.env = env
        if self.send_start_response:
            start_response('200 OK', [('Content-type', 'text/plain')])
        return self.return_val


class IterableSite(Site):
    def __call__(self, env, start_response):
        it = self.application(env, start_response)
        for i in it:
            yield i


CONTENT_LENGTH = 'content-length'


"""
HTTP/1.1 200 OK
Date: foo
Content-length: 11

hello world
"""


class ConnectionClosed(Exception):
    pass


def read_http(sock):
    fd = sock.makefile()
    try:
        response_line = fd.readline()
    except socket.error, exc:
        if get_errno(exc) == 10053:
            raise ConnectionClosed
        raise
    if not response_line:
        raise ConnectionClosed(response_line)

    header_lines = []
    while True:
        line = fd.readline()
        if line == '\r\n':
            break
        else:
            header_lines.append(line)
    headers = dict()
    for x in header_lines:
        x = x.strip()
        if not x:
            continue
        key, value = x.split(': ', 1)
        assert key.lower() not in headers, "%s header duplicated" % key
        headers[key.lower()] = value

    if CONTENT_LENGTH in headers:
        num = int(headers[CONTENT_LENGTH])
        body = fd.read(num)
    else:
        # read until EOF
        body = fd.read()

    return response_line, headers, body


class _TestBase(LimitedTestCase):
    def setUp(self):
        super(_TestBase, self).setUp()
        self.logfile = StringIO()
        self.site = Site()
        self.killer = None
        self.set_site()
        self.spawn_server()

    def tearDown(self):
        greenthread.kill(self.killer)
        eventlet.sleep(0)
        super(_TestBase, self).tearDown()

    def spawn_server(self, **kwargs):
        """Spawns a new wsgi server with the given arguments.
        Sets self.port to the port of the server, and self.killer is the greenlet
        running it.

        Kills any previously-running server."""
        eventlet.sleep(0) # give previous server a chance to start
        if self.killer:
            greenthread.kill(self.killer)

        new_kwargs = dict(max_size=128,
                          log=self.logfile,
                          site=self.site)
        new_kwargs.update(kwargs)

        if 'sock' not in new_kwargs:
            new_kwargs['sock'] = eventlet.listen(('localhost', 0))

        self.port = new_kwargs['sock'].getsockname()[1]
        self.killer = eventlet.spawn_n(
            wsgi.server,
            **new_kwargs)

    def set_site(self):
        raise NotImplementedError


class TestHttpd(_TestBase):
    def set_site(self):
        self.site = Site()

    def test_001_server(self):
        sock = eventlet.connect(
            ('localhost', self.port))

        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
        fd.flush()
        result = fd.read()
        fd.close()
        ## The server responds with the maximum version it supports
        self.assert_(result.startswith('HTTP'), result)
        self.assert_(result.endswith('hello world'))

    def test_002_keepalive(self):
        sock = eventlet.connect(
            ('localhost', self.port))

        fd = sock.makefile('w')
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fd.flush()
        read_http(sock)
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fd.flush()
        read_http(sock)
        fd.close()

    def test_003_passing_non_int_to_read(self):
        # This should go in greenio_test
        sock = eventlet.connect(
            ('localhost', self.port))

        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fd.flush()
        cancel = eventlet.Timeout(1, RuntimeError)
        self.assertRaises(TypeError, fd.read, "This shouldn't work")
        cancel.cancel()
        fd.close()

    def test_004_close_keepalive(self):
        sock = eventlet.connect(
            ('localhost', self.port))

        fd = sock.makefile('w')
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fd.flush()
        read_http(sock)
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
        fd.flush()
        read_http(sock)
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fd.flush()
        self.assertRaises(ConnectionClosed, read_http, sock)
        fd.close()

    @skipped
    def test_005_run_apachebench(self):
        url = 'http://localhost:12346/'
        # ab is apachebench
        from eventlet.green import subprocess
        subprocess.call([find_command('ab'),
                         '-c','64','-n','1024', '-k', url],
                        stdout=subprocess.PIPE)

    def test_006_reject_long_urls(self):
        sock = eventlet.connect(
            ('localhost', self.port))
        path_parts = []
        for ii in range(3000):
            path_parts.append('path')
        path = '/'.join(path_parts)
        request = 'GET /%s HTTP/1.0\r\nHost: localhost\r\n\r\n' % path
        fd = sock.makefile('rw')
        fd.write(request)
        fd.flush()
        result = fd.readline()
        if result:
            # windows closes the socket before the data is flushed,
            # so we never get anything back
            status = result.split(' ')[1]
            self.assertEqual(status, '414')
        fd.close()

    def test_007_get_arg(self):
        # define a new handler that does a get_arg as well as a read_body
        def new_app(env, start_response):
            body = env['wsgi.input'].read()
            a = cgi.parse_qs(body).get('a', [1])[0]
            start_response('200 OK', [('Content-type', 'text/plain')])
            return ['a is %s, body is %s' % (a, body)]

        self.site.application = new_app
        sock = eventlet.connect(
            ('localhost', self.port))
        request = '\r\n'.join((
            'POST / HTTP/1.0',
            'Host: localhost',
            'Content-Length: 3',
            '',
            'a=a'))
        fd = sock.makefile('w')
        fd.write(request)
        fd.flush()

        # send some junk after the actual request
        fd.write('01234567890123456789')
        reqline, headers, body = read_http(sock)
        self.assertEqual(body, 'a is a, body is a=a')
        fd.close()

    def test_008_correctresponse(self):
        sock = eventlet.connect(
            ('localhost', self.port))

        fd = sock.makefile('w')
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fd.flush()
        response_line_200,_,_ = read_http(sock)
        fd.write('GET /notexist HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fd.flush()
        response_line_404,_,_ = read_http(sock)
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fd.flush()
        response_line_test,_,_ = read_http(sock)
        self.assertEqual(response_line_200,response_line_test)
        fd.close()

    def test_009_chunked_response(self):
        self.site.application = chunked_app
        sock = eventlet.connect(
            ('localhost', self.port))

        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
        fd.flush()
        self.assert_('Transfer-Encoding: chunked' in fd.read())

    def test_010_no_chunked_http_1_0(self):
        self.site.application = chunked_app
        sock = eventlet.connect(
            ('localhost', self.port))

        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: close\r\n\r\n')
        fd.flush()
        self.assert_('Transfer-Encoding: chunked' not in fd.read())

    def test_011_multiple_chunks(self):
        self.site.application = big_chunks
        sock = eventlet.connect(
            ('localhost', self.port))

        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
        fd.flush()
        headers = ''
        while True:
            line = fd.readline()
            if line == '\r\n':
                break
            else:
                headers += line
        self.assert_('Transfer-Encoding: chunked' in headers)
        chunks = 0
        chunklen = int(fd.readline(), 16)
        while chunklen:
            chunks += 1
            chunk = fd.read(chunklen)
            fd.readline()  # CRLF
            chunklen = int(fd.readline(), 16)
        self.assert_(chunks > 1)
        response = fd.read()
        # Require a CRLF to close the message body
        self.assertEqual(response, '\r\n')

    @skip_if_no_ssl
    def test_012_ssl_server(self):
        def wsgi_app(environ, start_response):
            start_response('200 OK', {})
            return [environ['wsgi.input'].read()]

        certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
        private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')

        server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
                                        certfile=certificate_file,
                                        keyfile=private_key_file,
                                        server_side=True)
        self.spawn_server(sock=server_sock, site=wsgi_app)

        sock = eventlet.connect(('localhost', self.port))
        sock = eventlet.wrap_ssl(sock)
        sock.write('POST /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nContent-length:3\r\n\r\nabc')
        result = sock.read(8192)
        self.assertEquals(result[-3:], 'abc')

    @skip_if_no_ssl
    def test_013_empty_return(self):
        def wsgi_app(environ, start_response):
            start_response("200 OK", [])
            return [""]

        certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
        private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
        server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
                                        certfile=certificate_file,
                                        keyfile=private_key_file,
                                        server_side=True)
        self.spawn_server(sock=server_sock, site=wsgi_app)

        sock = eventlet.connect(('localhost', server_sock.getsockname()[1]))
        sock = eventlet.wrap_ssl(sock)
        sock.write('GET /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
        result = sock.read(8192)
        self.assertEquals(result[-4:], '\r\n\r\n')

    def test_014_chunked_post(self):
        self.site.application = chunked_post
        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('PUT /a HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
                 'Transfer-Encoding: chunked\r\n\r\n'
                 '2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n')
        fd.flush()
        while True:
            if fd.readline() == '\r\n':
                break
        response = fd.read()
        self.assert_(response == 'oh hai', 'invalid response %s' % response)

        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('PUT /b HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
                 'Transfer-Encoding: chunked\r\n\r\n'
                 '2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n')
        fd.flush()
        while True:
            if fd.readline() == '\r\n':
                break
        response = fd.read()
        self.assert_(response == 'oh hai', 'invalid response %s' % response)

        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('PUT /c HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
                 'Transfer-Encoding: chunked\r\n\r\n'
                 '2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n')
        fd.flush()
        while True:
            if fd.readline() == '\r\n':
                break
        response = fd.read(8192)
        self.assert_(response == 'oh hai', 'invalid response %s' % response)

    def test_015_write(self):
        self.site.application = use_write
        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('w')
        fd.write('GET /a HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
        fd.flush()
        response_line, headers, body = read_http(sock)
        self.assert_('content-length' in headers)

        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('w')
        fd.write('GET /b HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
        fd.flush()
        response_line, headers, body = read_http(sock)
        self.assert_('transfer-encoding' in headers)
        self.assert_(headers['transfer-encoding'] == 'chunked')

    def test_016_repeated_content_length(self):
        """
        content-length header was being doubled up if it was set in
        start_response and could also be inferred from the iterator
        """
        def wsgi_app(environ, start_response):
            start_response('200 OK', [('Content-Length', '7')])
            return ['testing']
        self.site.application = wsgi_app
        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('GET /a HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
        fd.flush()
        header_lines = []
        while True:
            line = fd.readline()
            if line == '\r\n':
                break
            else:
                header_lines.append(line)
        self.assertEquals(1, len([l for l in header_lines
                if l.lower().startswith('content-length')]))

    @skip_if_no_ssl
    def test_017_ssl_zeroreturnerror(self):

        def server(sock, site, log):
            try:
                serv = wsgi.Server(sock, sock.getsockname(), site, log)
                client_socket = sock.accept()
                serv.process_request(client_socket)
                return True
            except:
                import traceback
                traceback.print_exc()
                return False

        def wsgi_app(environ, start_response):
            start_response('200 OK', [])
            return [environ['wsgi.input'].read()]

        certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
        private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')

        sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
                                        certfile=certificate_file,
                                        keyfile=private_key_file,
                                        server_side=True)
        server_coro = eventlet.spawn(server, sock, wsgi_app, self.logfile)

        client = eventlet.connect(('localhost', sock.getsockname()[1]))
        client = eventlet.wrap_ssl(client)
        client.write('X') # non-empty payload so that SSL handshake occurs
        greenio.shutdown_safe(client)
        client.close()

        success = server_coro.wait()
        self.assert_(success)

    def test_018_http_10_keepalive(self):
        # verify that if an http/1.0 client sends connection: keep-alive
        # that we don't close the connection
        sock = eventlet.connect(
            ('localhost', self.port))

        fd = sock.makefile('w')
        fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n')
        fd.flush()

        response_line, headers, body = read_http(sock)
        self.assert_('connection' in headers)
        self.assertEqual('keep-alive', headers['connection'])
        # repeat request to verify connection is actually still open
        fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n')
        fd.flush()
        response_line, headers, body = read_http(sock)
        self.assert_('connection' in headers)
        self.assertEqual('keep-alive', headers['connection'])

    def test_019_fieldstorage_compat(self):
        def use_fieldstorage(environ, start_response):
            import cgi
            fs = cgi.FieldStorage(fp=environ['wsgi.input'],
                                  environ=environ)
            start_response('200 OK', [('Content-type', 'text/plain')])
            return ['hello!']

        self.site.application = use_fieldstorage
        sock = eventlet.connect(
            ('localhost', self.port))

        fd = sock.makefile('rw')
        fd.write('POST / HTTP/1.1\r\n'
                 'Host: localhost\r\n'
                 'Connection: close\r\n'
                 'Transfer-Encoding: chunked\r\n\r\n'
                 '2\r\noh\r\n'
                 '4\r\n hai\r\n0\r\n\r\n')
        fd.flush()
        self.assert_('hello!' in fd.read())

    def test_020_x_forwarded_for(self):
        sock = eventlet.connect(('localhost', self.port))
        sock.sendall('GET / HTTP/1.1\r\nHost: localhost\r\nX-Forwarded-For: 1.2.3.4, 5.6.7.8\r\n\r\n')
        sock.recv(1024)
        sock.close()
        self.assert_('1.2.3.4,5.6.7.8,127.0.0.1' in self.logfile.getvalue())

        # turning off the option should work too
        self.logfile = StringIO()
        self.spawn_server(log_x_forwarded_for=False)

        sock = eventlet.connect(('localhost', self.port))
        sock.sendall('GET / HTTP/1.1\r\nHost: localhost\r\nX-Forwarded-For: 1.2.3.4, 5.6.7.8\r\n\r\n')
        sock.recv(1024)
        sock.close()
        self.assert_('1.2.3.4' not in self.logfile.getvalue())
        self.assert_('5.6.7.8' not in self.logfile.getvalue())
        self.assert_('127.0.0.1' in self.logfile.getvalue())

    def test_socket_remains_open(self):
        greenthread.kill(self.killer)
        server_sock = eventlet.listen(('localhost', 0))
        server_sock_2 = server_sock.dup()
        self.spawn_server(sock=server_sock_2)
        # do a single req/response to verify it's up
        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
        fd.flush()
        result = fd.read(1024)
        fd.close()
        self.assert_(result.startswith('HTTP'), result)
        self.assert_(result.endswith('hello world'))

        # shut down the server and verify the server_socket fd is still open,
        # but the actual socketobject passed in to wsgi.server is closed
        greenthread.kill(self.killer)
        eventlet.sleep(0) # make the kill go through
        try:
            server_sock_2.accept()
            # shouldn't be able to use this one anymore
        except socket.error, exc:
            self.assertEqual(get_errno(exc), errno.EBADF)
        self.spawn_server(sock=server_sock)
        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
        fd.flush()
        result = fd.read(1024)
        fd.close()
        self.assert_(result.startswith('HTTP'), result)
        self.assert_(result.endswith('hello world'))

    def test_021_environ_clobbering(self):
        def clobberin_time(environ, start_response):
            for environ_var in ['wsgi.version', 'wsgi.url_scheme',
                'wsgi.input', 'wsgi.errors', 'wsgi.multithread',
                'wsgi.multiprocess', 'wsgi.run_once', 'REQUEST_METHOD',
                'SCRIPT_NAME', 'RAW_PATH_INFO', 'PATH_INFO', 'QUERY_STRING',
                'CONTENT_TYPE', 'CONTENT_LENGTH', 'SERVER_NAME', 'SERVER_PORT',
                'SERVER_PROTOCOL']:
                environ[environ_var] = None
            start_response('200 OK', [('Content-type', 'text/plain')])
            return []
        self.site.application = clobberin_time
        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.1\r\n'
                 'Host: localhost\r\n'
                 'Connection: close\r\n'
                 '\r\n\r\n')
        fd.flush()
        self.assert_('200 OK' in fd.read())

    def test_022_custom_pool(self):
        # just test that it accepts the parameter for now
        # TODO: test that it uses the pool and that you can waitall() to
        # ensure that all clients finished
        from eventlet import greenpool
        p = greenpool.GreenPool(5)
        self.spawn_server(custom_pool=p)

        # this stuff is copied from test_001_server, could be better factored
        sock = eventlet.connect(
            ('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
        fd.flush()
        result = fd.read()
        fd.close()
        self.assert_(result.startswith('HTTP'), result)
        self.assert_(result.endswith('hello world'))

    def test_023_bad_content_length(self):
        sock = eventlet.connect(
            ('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nContent-length: argh\r\n\r\n')
        fd.flush()
        result = fd.read()
        fd.close()
        self.assert_(result.startswith('HTTP'), result)
        self.assert_('400 Bad Request' in result)
        self.assert_('500' not in result)

    def test_024_expect_100_continue(self):
        def wsgi_app(environ, start_response):
            if int(environ['CONTENT_LENGTH']) > 1024:
                start_response('417 Expectation Failed', [('Content-Length', '7')])
                return ['failure']
            else:
                text = environ['wsgi.input'].read()
                start_response('200 OK', [('Content-Length', str(len(text)))])
                return [text]
        self.site.application = wsgi_app
        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('PUT / HTTP/1.1\r\nHost: localhost\r\nContent-length: 1025\r\nExpect: 100-continue\r\n\r\n')
        fd.flush()
        response_line, headers, body = read_http(sock)
        self.assert_(response_line.startswith('HTTP/1.1 417 Expectation Failed'))
        self.assertEquals(body, 'failure')
        fd.write('PUT / HTTP/1.1\r\nHost: localhost\r\nContent-length: 7\r\nExpect: 100-continue\r\n\r\ntesting')
        fd.flush()
        header_lines = []
        while True:
            line = fd.readline()
            if line == '\r\n':
                break
            else:
                header_lines.append(line)
        self.assert_(header_lines[0].startswith('HTTP/1.1 100 Continue'))
        header_lines = []
        while True:
            line = fd.readline()
            if line == '\r\n':
                break
            else:
                header_lines.append(line)
        self.assert_(header_lines[0].startswith('HTTP/1.1 200 OK'))
        self.assertEquals(fd.read(7), 'testing')
        fd.close()

    def test_025_accept_errors(self):
        from eventlet import debug
        debug.hub_exceptions(True)
        listener = greensocket.socket()
        listener.bind(('localhost', 0))
        # NOT calling listen, to trigger the error
        self.logfile = StringIO()
        self.spawn_server(sock=listener)
        old_stderr = sys.stderr
        try:
            sys.stderr = self.logfile
            eventlet.sleep(0) # need to enter server loop
            try:
                eventlet.connect(('localhost', self.port))
                self.fail("Didn't expect to connect")
            except socket.error, exc:
                self.assertEquals(get_errno(exc), errno.ECONNREFUSED)

            self.assert_('Invalid argument' in self.logfile.getvalue(),
                self.logfile.getvalue())
        finally:
            sys.stderr = old_stderr
        debug.hub_exceptions(False)

    def test_026_log_format(self):
        self.spawn_server(log_format="HI %(request_line)s HI")
        sock = eventlet.connect(('localhost', self.port))
        sock.sendall('GET /yo! HTTP/1.1\r\nHost: localhost\r\n\r\n')
        sock.recv(1024)
        sock.close()
        self.assert_('\nHI GET /yo! HTTP/1.1 HI\n' in self.logfile.getvalue(), self.logfile.getvalue())

    def test_close_chunked_with_1_0_client(self):
        # verify that if we return a generator from our app
        # and we're not speaking with a 1.1 client, that we
        # close the connection
        self.site.application = chunked_app
        sock = eventlet.connect(('localhost', self.port))

        sock.sendall('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n')

        response_line, headers, body = read_http(sock)
        self.assertEqual(headers['connection'], 'close')
        self.assertNotEqual(headers.get('transfer-encoding'), 'chunked')
        self.assertEquals(body, "thisischunked")

    def test_minimum_chunk_size_parameter_leaves_httpprotocol_class_member_intact(self):
        start_size = wsgi.HttpProtocol.minimum_chunk_size

        self.spawn_server(minimum_chunk_size=start_size * 2)
        sock = eventlet.connect(('localhost', self.port))
        sock.sendall('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        read_http(sock)

        self.assertEqual(wsgi.HttpProtocol.minimum_chunk_size, start_size)

    def test_error_in_chunked_closes_connection(self):
        # From http://rhodesmill.org/brandon/2013/chunked-wsgi/
        self.spawn_server(minimum_chunk_size=1)

        self.site.application = chunked_fail_app
        sock = eventlet.connect(('localhost', self.port))

        sock.sendall('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')

        response_line, headers, body = read_http(sock)
        self.assertEqual(response_line, 'HTTP/1.1 200 OK\r\n')
        self.assertEqual(headers.get('transfer-encoding'), 'chunked')
        self.assertEqual(body, '27\r\nThe dwarves of yore made mighty spells,\r\n25\r\nWhile hammers fell like ringing bells\r\n')

        # verify that socket is closed by server
        self.assertEqual(sock.recv(1), '')

    def test_026_http_10_nokeepalive(self):
        # verify that if an http/1.0 client sends connection: keep-alive
        # and the server doesn't accept keep-alives, we close the connection
        self.spawn_server(keepalive=False)
        sock = eventlet.connect(
            ('localhost', self.port))

        sock.sendall('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n')
        response_line, headers, body = read_http(sock)
        self.assertEqual(headers['connection'], 'close')

    def test_027_keepalive_chunked(self):
        self.site.application = chunked_post
        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('w')
        fd.write('PUT /a HTTP/1.1\r\nHost: localhost\r\nTransfer-Encoding: chunked\r\n\r\n10\r\n0123456789abcdef\r\n0\r\n\r\n')
        fd.flush()
        read_http(sock)
        fd.write('PUT /b HTTP/1.1\r\nHost: localhost\r\nTransfer-Encoding: chunked\r\n\r\n10\r\n0123456789abcdef\r\n0\r\n\r\n')
        fd.flush()
        read_http(sock)
        fd.write('PUT /c HTTP/1.1\r\nHost: localhost\r\nTransfer-Encoding: chunked\r\n\r\n10\r\n0123456789abcdef\r\n0\r\n\r\n')
        fd.flush()
        read_http(sock)
        fd.write('PUT /a HTTP/1.1\r\nHost: localhost\r\nTransfer-Encoding: chunked\r\n\r\n10\r\n0123456789abcdef\r\n0\r\n\r\n')
        fd.flush()
        read_http(sock)

    @skip_if_no_ssl
    def test_028_ssl_handshake_errors(self):
        errored = [False]
        def server(sock):
            try:
                wsgi.server(sock=sock, site=hello_world, log=self.logfile)
                errored[0] = 'SSL handshake error caused wsgi.server to exit.'
            except greenthread.greenlet.GreenletExit:
                pass
            except Exception, e:
                errored[0] = 'SSL handshake error raised exception %s.' % e
        for data in ('', 'GET /non-ssl-request HTTP/1.0\r\n\r\n'):
            srv_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
                                        certfile=certificate_file,
                                        keyfile=private_key_file,
                                        server_side=True)
            port = srv_sock.getsockname()[1]
            g = eventlet.spawn_n(server, srv_sock)
            client = eventlet.connect(('localhost', port))
            if data: # send non-ssl request
                client.sendall(data)
            else: # close sock prematurely
                client.close()
            eventlet.sleep(0) # let context switch back to server
            self.assert_(not errored[0], errored[0])
            # make another request to ensure the server's still alive
            try:
                from eventlet.green import ssl
                client = ssl.wrap_socket(eventlet.connect(('localhost', port)))
                client.write('GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
                result = client.read()
                self.assert_(result.startswith('HTTP'), result)
                self.assert_(result.endswith('hello world'))
            except ImportError:
                pass # TODO: should test with OpenSSL
            greenthread.kill(g)

    def test_029_posthooks(self):
        posthook1_count = [0]
        posthook2_count = [0]
        def posthook1(env, value, multiplier=1):
            self.assertEquals(env['local.test'], 'test_029_posthooks')
            posthook1_count[0] += value * multiplier
        def posthook2(env, value, divisor=1):
            self.assertEquals(env['local.test'], 'test_029_posthooks')
            posthook2_count[0] += value / divisor

        def one_posthook_app(env, start_response):
            env['local.test'] = 'test_029_posthooks'
            if 'eventlet.posthooks' not in env:
                start_response('500 eventlet.posthooks not supported',
                               [('Content-Type', 'text/plain')])
            else:
                env['eventlet.posthooks'].append(
                    (posthook1, (2,), {'multiplier': 3}))
                start_response('200 OK', [('Content-Type', 'text/plain')])
            yield ''
        self.site.application = one_posthook_app
        sock = eventlet.connect(('localhost', self.port))
        fp = sock.makefile('rw')
        fp.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fp.flush()
        self.assertEquals(fp.readline(), 'HTTP/1.1 200 OK\r\n')
        fp.close()
        sock.close()
        self.assertEquals(posthook1_count[0], 6)
        self.assertEquals(posthook2_count[0], 0)

        def two_posthook_app(env, start_response):
            env['local.test'] = 'test_029_posthooks'
            if 'eventlet.posthooks' not in env:
                start_response('500 eventlet.posthooks not supported',
                               [('Content-Type', 'text/plain')])
            else:
                env['eventlet.posthooks'].append(
                    (posthook1, (4,), {'multiplier': 5}))
                env['eventlet.posthooks'].append(
                    (posthook2, (100,), {'divisor': 4}))
                start_response('200 OK', [('Content-Type', 'text/plain')])
            yield ''
        self.site.application = two_posthook_app
        sock = eventlet.connect(('localhost', self.port))
        fp = sock.makefile('rw')
        fp.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fp.flush()
        self.assertEquals(fp.readline(), 'HTTP/1.1 200 OK\r\n')
        fp.close()
        sock.close()
        self.assertEquals(posthook1_count[0], 26)
        self.assertEquals(posthook2_count[0], 25)

    def test_030_reject_long_header_lines(self):
        sock = eventlet.connect(('localhost', self.port))
        request = 'GET / HTTP/1.0\r\nHost: localhost\r\nLong: %s\r\n\r\n' % \
            ('a' * 10000)
        fd = sock.makefile('rw')
        fd.write(request)
        fd.flush()
        response_line, headers, body = read_http(sock)
        self.assertEquals(response_line,
            'HTTP/1.0 400 Header Line Too Long\r\n')
        fd.close()

    def test_031_reject_large_headers(self):
        sock = eventlet.connect(('localhost', self.port))
        headers = 'Name: Value\r\n' * 5050
        request = 'GET / HTTP/1.0\r\nHost: localhost\r\n%s\r\n\r\n' % headers
        fd = sock.makefile('rw')
        fd.write(request)
        fd.flush()
        response_line, headers, body = read_http(sock)
        self.assertEquals(response_line,
            'HTTP/1.0 400 Headers Too Large\r\n')
        fd.close()

    def test_zero_length_chunked_response(self):
        def zero_chunked_app(env, start_response):
            start_response('200 OK', [('Content-type', 'text/plain')])
            yield ""

        self.site.application = zero_chunked_app
        sock = eventlet.connect(
            ('localhost', self.port))

        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
        fd.flush()
        response = fd.read().split('\r\n')
        headers = []
        while True:
            h = response.pop(0)
            headers.append(h)
            if h == '':
                break
        self.assert_('Transfer-Encoding: chunked' in ''.join(headers))
        # should only be one chunk of zero size with two blank lines
        # (one terminates the chunk, one terminates the body)
        self.assertEqual(response, ['0', '', ''])

    def test_configurable_url_length_limit(self):
        self.spawn_server(url_length_limit=20000)
        sock = eventlet.connect(
            ('localhost', self.port))
        path = 'x' * 15000
        request = 'GET /%s HTTP/1.0\r\nHost: localhost\r\n\r\n' % path
        fd = sock.makefile('rw')
        fd.write(request)
        fd.flush()
        result = fd.readline()
        if result:
            # windows closes the socket before the data is flushed,
            # so we never get anything back
            status = result.split(' ')[1]
            self.assertEqual(status, '200')
        fd.close()

    def test_aborted_chunked_post(self):
        read_content = event.Event()
        blew_up = [False]
        def chunk_reader(env, start_response):
            try:
                content = env['wsgi.input'].read(1024)
            except IOError:
                blew_up[0] = True
                content = 'ok'
            read_content.send(content)
            start_response('200 OK', [('Content-Type', 'text/plain')])
            return [content]
        self.site.application = chunk_reader
        expected_body = 'a bunch of stuff'
        data = "\r\n".join(['PUT /somefile HTTP/1.0',
                            'Transfer-Encoding: chunked',
                            '',
                            'def',
                            expected_body])
        # start PUT-ing some chunked data but close prematurely
        sock = eventlet.connect(('127.0.0.1', self.port))
        sock.sendall(data)
        sock.close()
        # the test passes if we successfully get here, and read all the data
        # in spite of the early close
        self.assertEqual(read_content.wait(), 'ok')
        self.assert_(blew_up[0])

    def test_exceptions_close_connection(self):
        def wsgi_app(environ, start_response):
            raise RuntimeError("intentional error")
        self.site.application = wsgi_app
        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fd.flush()
        response_line, headers, body = read_http(sock)
        self.assert_(response_line.startswith('HTTP/1.1 500 Internal Server Error'))
        self.assertEqual(headers['connection'], 'close')
        self.assert_('transfer-encoding' not in headers)

    def test_unicode_raises_error(self):
        def wsgi_app(environ, start_response):
            start_response("200 OK", [])
            yield u"oh hai"
            yield u"non-encodable unicode: \u0230"
        self.site.application = wsgi_app
        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
        fd.flush()
        response_line, headers, body = read_http(sock)
        self.assert_(response_line.startswith('HTTP/1.1 500 Internal Server Error'))
        self.assertEqual(headers['connection'], 'close')
        self.assert_('unicode' in body)

    def test_path_info_decoding(self):
        def wsgi_app(environ, start_response):
            start_response("200 OK", [])
            yield "decoded: %s" % environ['PATH_INFO']
            yield "raw: %s" % environ['RAW_PATH_INFO']
        self.site.application = wsgi_app
        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('rw')
        fd.write('GET /a*b@%40%233 HTTP/1.1\r\nHost: localhost\r\nConnection: '\
                'close\r\n\r\n')
        fd.flush()
        response_line, headers, body = read_http(sock)
        self.assert_(response_line.startswith('HTTP/1.1 200'))
        self.assert_('decoded: /a*b@@#3' in body)
        self.assert_('raw: /a*b@%40%233' in body)

    def test_ipv6(self):
        try:
            sock = eventlet.listen(('::1', 0), family=socket.AF_INET6)
        except (socket.gaierror, socket.error):  # probably no ipv6
            return
        log = StringIO()
        # first thing the server does is try to log the IP it's bound to
        def run_server():
            try:
                server = wsgi.server(sock=sock, log=log, site=Site())
            except ValueError:
                log.write('broked')
        eventlet.spawn_n(run_server)
        logval = log.getvalue()
        while not logval:
            eventlet.sleep(0.0)
            logval = log.getvalue()
        if 'broked' in logval:
            self.fail('WSGI server raised exception with ipv6 socket')

    def test_debug(self):
        self.spawn_server(debug=False)
        def crasher(env, start_response):
            raise RuntimeError("intentional crash")
        self.site.application = crasher

        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('w')
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fd.flush()
        response_line, headers, body = read_http(sock)
        self.assert_(response_line.startswith('HTTP/1.1 500 Internal Server Error'))
        self.assertEqual(body, '')
        self.assertEqual(headers['connection'], 'close')
        self.assert_('transfer-encoding' not in headers)

        # verify traceback when debugging enabled
        self.spawn_server(debug=True)
        self.site.application = crasher
        sock = eventlet.connect(('localhost', self.port))
        fd = sock.makefile('w')
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
        fd.flush()
        response_line, headers, body = read_http(sock)
        self.assert_(response_line.startswith('HTTP/1.1 500 Internal Server Error'))
        self.assert_('intentional crash' in body)
        self.assert_('RuntimeError' in body)
        self.assert_('Traceback' in body)
        self.assertEqual(headers['connection'], 'close')
        self.assert_('transfer-encoding' not in headers)

    def test_client_disconnect(self):
        """Issue #95 Server must handle disconnect from client in the middle of response
        """
        def long_response(environ, start_response):
            start_response('200 OK', [('Content-Length', '9876')])
            yield 'a' * 9876

        server_sock = eventlet.listen(('localhost', 0))
        self.port = server_sock.getsockname()[1]
        server = wsgi.Server(server_sock, server_sock.getsockname(), long_response,
                             log=self.logfile)

        def make_request():
            sock = eventlet.connect(server_sock.getsockname())
            sock.send('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
            sock.close()

        request_thread = eventlet.spawn(make_request)
        server_conn = server_sock.accept()
        # Next line must not raise IOError -32 Broken pipe
        server.process_request(server_conn)
        request_thread.wait()
        server_sock.close()


def read_headers(sock):
    fd = sock.makefile()
    try:
        response_line = fd.readline()
    except socket.error, exc:
        if get_errno(exc) == 10053:
            raise ConnectionClosed
        raise
    if not response_line:
        raise ConnectionClosed

    header_lines = []
    while True:
        line = fd.readline()
        if line == '\r\n':
            break
        else:
            header_lines.append(line)
    headers = dict()
    for x in header_lines:
        x = x.strip()
        if not x:
            continue
        key, value = x.split(': ', 1)
        assert key.lower() not in headers, "%s header duplicated" % key
        headers[key.lower()] = value
    return response_line, headers

class IterableAlreadyHandledTest(_TestBase):
    def set_site(self):
        self.site = IterableSite()

    def get_app(self):
        return IterableApp(True)

    def test_iterable_app_keeps_socket_open_unless_connection_close_sent(self):
        self.site.application = self.get_app()
        sock = eventlet.connect(
            ('localhost', self.port))

        fd = sock.makefile('rw')
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')

        fd.flush()
        response_line, headers = read_headers(sock)
        self.assertEqual(response_line, 'HTTP/1.1 200 OK\r\n')
        self.assert_('connection' not in headers)
        fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
        fd.flush()
        response_line, headers, body = read_http(sock)
        self.assertEqual(response_line, 'HTTP/1.1 200 OK\r\n')
        self.assertEqual(headers.get('transfer-encoding'), 'chunked')
        self.assertEqual(body, '0\r\n\r\n') # Still coming back chunked

class ProxiedIterableAlreadyHandledTest(IterableAlreadyHandledTest):
    # same thing as the previous test but ensuring that it works with tpooled
    # results as well as regular ones
    @skip_with_pyevent
    def get_app(self):
        from eventlet import tpool
        return tpool.Proxy(super(ProxiedIterableAlreadyHandledTest, self).get_app())

    def tearDown(self):
        from eventlet import tpool
        tpool.killall()
        super(ProxiedIterableAlreadyHandledTest, self).tearDown()

class TestChunkedInput(_TestBase):
    dirt = ""
    validator = None
    def application(self, env, start_response):
        input = env['wsgi.input']
        response = []

        pi = env["PATH_INFO"]

        if pi=="/short-read":
            d=input.read(10)
            response = [d]
        elif pi=="/lines":
            for x in input:
                response.append(x)
        elif pi=="/ping":
            input.read()
            response.append("pong")
        else:
            raise RuntimeError("bad path")

        start_response('200 OK', [('Content-Type', 'text/plain')])
        return response

    def connect(self):
        return eventlet.connect(('localhost', self.port))

    def set_site(self):
        self.site = Site()
        self.site.application = self.application

    def chunk_encode(self, chunks, dirt=None):
        if dirt is None:
            dirt = self.dirt

        b = ""
        for c in chunks:
            b += "%x%s\r\n%s\r\n" % (len(c), dirt, c)
        return b

    def body(self, dirt=None):
        return self.chunk_encode(["this", " is ", "chunked", "\nline", " 2", "\n", "line3", ""], dirt=dirt)

    def ping(self, fd):
        fd.sendall("GET /ping HTTP/1.1\r\n\r\n")
        self.assertEquals(read_http(fd)[-1], "pong")

    def test_short_read_with_content_length(self):
        body = self.body()
        req = "POST /short-read HTTP/1.1\r\ntransfer-encoding: Chunked\r\nContent-Length:1000\r\n\r\n" + body

        fd = self.connect()
        fd.sendall(req)
        self.assertEquals(read_http(fd)[-1], "this is ch")

        self.ping(fd)

    def test_short_read_with_zero_content_length(self):
        body = self.body()
        req = "POST /short-read HTTP/1.1\r\ntransfer-encoding: Chunked\r\nContent-Length:0\r\n\r\n" + body
        fd = self.connect()
        fd.sendall(req)
        self.assertEquals(read_http(fd)[-1], "this is ch")

        self.ping(fd)

    def test_short_read(self):
        body = self.body()
        req = "POST /short-read HTTP/1.1\r\ntransfer-encoding: Chunked\r\n\r\n" + body

        fd = self.connect()
        fd.sendall(req)
        self.assertEquals(read_http(fd)[-1], "this is ch")

        self.ping(fd)

    def test_dirt(self):
        body = self.body(dirt="; here is dirt\0bla")
        req = "POST /ping HTTP/1.1\r\ntransfer-encoding: Chunked\r\n\r\n" + body

        fd = self.connect()
        fd.sendall(req)
        self.assertEquals(read_http(fd)[-1], "pong")

        self.ping(fd)

    def test_chunked_readline(self):
        body = self.body()
        req = "POST /lines HTTP/1.1\r\nContent-Length: %s\r\ntransfer-encoding: Chunked\r\n\r\n%s" % (len(body), body)

        fd = self.connect()
        fd.sendall(req)
        self.assertEquals(read_http(fd)[-1], 'this is chunked\nline 2\nline3')

    def test_close_before_finished(self):
        import signal

        got_signal = []
        def handler(*args):
            got_signal.append(1)
            raise KeyboardInterrupt()

        signal.signal(signal.SIGALRM, handler)
        signal.alarm(1)

        try:
            body = '4\r\nthi'
            req = "POST /short-read HTTP/1.1\r\ntransfer-encoding: Chunked\r\n\r\n" + body

            fd = self.connect()
            fd.sendall(req)
            fd.close()
            eventlet.sleep(0.0)
        finally:
            signal.alarm(0)
            signal.signal(signal.SIGALRM, signal.SIG_DFL)

        assert not got_signal, "caught alarm signal. infinite loop detected."


if __name__ == '__main__':
    main()