| 1 | import httplib
|
|---|
| 2 | import StringIO
|
|---|
| 3 | import sys
|
|---|
| 4 |
|
|---|
| 5 | from unittest import TestCase
|
|---|
| 6 |
|
|---|
| 7 | from test import test_support
|
|---|
| 8 |
|
|---|
| 9 | class FakeSocket:
|
|---|
| 10 | def __init__(self, text, fileclass=StringIO.StringIO):
|
|---|
| 11 | self.text = text
|
|---|
| 12 | self.fileclass = fileclass
|
|---|
| 13 |
|
|---|
| 14 | def sendall(self, data):
|
|---|
| 15 | self.data = data
|
|---|
| 16 |
|
|---|
| 17 | def makefile(self, mode, bufsize=None):
|
|---|
| 18 | if mode != 'r' and mode != 'rb':
|
|---|
| 19 | raise httplib.UnimplementedFileMode()
|
|---|
| 20 | return self.fileclass(self.text)
|
|---|
| 21 |
|
|---|
| 22 | class NoEOFStringIO(StringIO.StringIO):
|
|---|
| 23 | """Like StringIO, but raises AssertionError on EOF.
|
|---|
| 24 |
|
|---|
| 25 | This is used below to test that httplib doesn't try to read
|
|---|
| 26 | more from the underlying file than it should.
|
|---|
| 27 | """
|
|---|
| 28 | def read(self, n=-1):
|
|---|
| 29 | data = StringIO.StringIO.read(self, n)
|
|---|
| 30 | if data == '':
|
|---|
| 31 | raise AssertionError('caller tried to read past EOF')
|
|---|
| 32 | return data
|
|---|
| 33 |
|
|---|
| 34 | def readline(self, length=None):
|
|---|
| 35 | data = StringIO.StringIO.readline(self, length)
|
|---|
| 36 | if data == '':
|
|---|
| 37 | raise AssertionError('caller tried to read past EOF')
|
|---|
| 38 | return data
|
|---|
| 39 |
|
|---|
| 40 |
|
|---|
| 41 | class HeaderTests(TestCase):
|
|---|
| 42 | def test_auto_headers(self):
|
|---|
| 43 | # Some headers are added automatically, but should not be added by
|
|---|
| 44 | # .request() if they are explicitly set.
|
|---|
| 45 |
|
|---|
| 46 | import httplib
|
|---|
| 47 |
|
|---|
| 48 | class HeaderCountingBuffer(list):
|
|---|
| 49 | def __init__(self):
|
|---|
| 50 | self.count = {}
|
|---|
| 51 | def append(self, item):
|
|---|
| 52 | kv = item.split(':')
|
|---|
| 53 | if len(kv) > 1:
|
|---|
| 54 | # item is a 'Key: Value' header string
|
|---|
| 55 | lcKey = kv[0].lower()
|
|---|
| 56 | self.count.setdefault(lcKey, 0)
|
|---|
| 57 | self.count[lcKey] += 1
|
|---|
| 58 | list.append(self, item)
|
|---|
| 59 |
|
|---|
| 60 | for explicit_header in True, False:
|
|---|
| 61 | for header in 'Content-length', 'Host', 'Accept-encoding':
|
|---|
| 62 | conn = httplib.HTTPConnection('example.com')
|
|---|
| 63 | conn.sock = FakeSocket('blahblahblah')
|
|---|
| 64 | conn._buffer = HeaderCountingBuffer()
|
|---|
| 65 |
|
|---|
| 66 | body = 'spamspamspam'
|
|---|
| 67 | headers = {}
|
|---|
| 68 | if explicit_header:
|
|---|
| 69 | headers[header] = str(len(body))
|
|---|
| 70 | conn.request('POST', '/', body, headers)
|
|---|
| 71 | self.assertEqual(conn._buffer.count[header.lower()], 1)
|
|---|
| 72 |
|
|---|
| 73 | # Collect output to a buffer so that we don't have to cope with line-ending
|
|---|
| 74 | # issues across platforms. Specifically, the headers will have \r\n pairs
|
|---|
| 75 | # and some platforms will strip them from the output file.
|
|---|
| 76 |
|
|---|
| 77 | def test():
|
|---|
| 78 | buf = StringIO.StringIO()
|
|---|
| 79 | _stdout = sys.stdout
|
|---|
| 80 | try:
|
|---|
| 81 | sys.stdout = buf
|
|---|
| 82 | _test()
|
|---|
| 83 | finally:
|
|---|
| 84 | sys.stdout = _stdout
|
|---|
| 85 |
|
|---|
| 86 | # print individual lines with endings stripped
|
|---|
| 87 | s = buf.getvalue()
|
|---|
| 88 | for line in s.split("\n"):
|
|---|
| 89 | print line.strip()
|
|---|
| 90 |
|
|---|
| 91 | def _test():
|
|---|
| 92 | # Test HTTP status lines
|
|---|
| 93 |
|
|---|
| 94 | body = "HTTP/1.1 200 Ok\r\n\r\nText"
|
|---|
| 95 | sock = FakeSocket(body)
|
|---|
| 96 | resp = httplib.HTTPResponse(sock, 1)
|
|---|
| 97 | resp.begin()
|
|---|
| 98 | print resp.read()
|
|---|
| 99 | resp.close()
|
|---|
| 100 |
|
|---|
| 101 | body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
|
|---|
| 102 | sock = FakeSocket(body)
|
|---|
| 103 | resp = httplib.HTTPResponse(sock, 1)
|
|---|
| 104 | try:
|
|---|
| 105 | resp.begin()
|
|---|
| 106 | except httplib.BadStatusLine:
|
|---|
| 107 | print "BadStatusLine raised as expected"
|
|---|
| 108 | else:
|
|---|
| 109 | print "Expect BadStatusLine"
|
|---|
| 110 |
|
|---|
| 111 | # Check invalid host_port
|
|---|
| 112 |
|
|---|
| 113 | for hp in ("www.python.org:abc", "www.python.org:"):
|
|---|
| 114 | try:
|
|---|
| 115 | h = httplib.HTTP(hp)
|
|---|
| 116 | except httplib.InvalidURL:
|
|---|
| 117 | print "InvalidURL raised as expected"
|
|---|
| 118 | else:
|
|---|
| 119 | print "Expect InvalidURL"
|
|---|
| 120 |
|
|---|
| 121 | for hp,h,p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b", 8000),
|
|---|
| 122 | ("www.python.org:80", "www.python.org", 80),
|
|---|
| 123 | ("www.python.org", "www.python.org", 80),
|
|---|
| 124 | ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
|
|---|
| 125 | try:
|
|---|
| 126 | http = httplib.HTTP(hp)
|
|---|
| 127 | except httplib.InvalidURL:
|
|---|
| 128 | print "InvalidURL raised erroneously"
|
|---|
| 129 | c = http._conn
|
|---|
| 130 | if h != c.host: raise AssertionError, ("Host incorrectly parsed", h, c.host)
|
|---|
| 131 | if p != c.port: raise AssertionError, ("Port incorrectly parsed", p, c.host)
|
|---|
| 132 |
|
|---|
| 133 | # test response with multiple message headers with the same field name.
|
|---|
| 134 | text = ('HTTP/1.1 200 OK\r\n'
|
|---|
| 135 | 'Set-Cookie: Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"\r\n'
|
|---|
| 136 | 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
|
|---|
| 137 | ' Path="/acme"\r\n'
|
|---|
| 138 | '\r\n'
|
|---|
| 139 | 'No body\r\n')
|
|---|
| 140 | hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
|
|---|
| 141 | ', '
|
|---|
| 142 | 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
|
|---|
| 143 | s = FakeSocket(text)
|
|---|
| 144 | r = httplib.HTTPResponse(s, 1)
|
|---|
| 145 | r.begin()
|
|---|
| 146 | cookies = r.getheader("Set-Cookie")
|
|---|
| 147 | if cookies != hdr:
|
|---|
| 148 | raise AssertionError, "multiple headers not combined properly"
|
|---|
| 149 |
|
|---|
| 150 | # Test that the library doesn't attempt to read any data
|
|---|
| 151 | # from a HEAD request. (Tickles SF bug #622042.)
|
|---|
| 152 | sock = FakeSocket(
|
|---|
| 153 | 'HTTP/1.1 200 OK\r\n'
|
|---|
| 154 | 'Content-Length: 14432\r\n'
|
|---|
| 155 | '\r\n',
|
|---|
| 156 | NoEOFStringIO)
|
|---|
| 157 | resp = httplib.HTTPResponse(sock, 1, method="HEAD")
|
|---|
| 158 | resp.begin()
|
|---|
| 159 | if resp.read() != "":
|
|---|
| 160 | raise AssertionError, "Did not expect response from HEAD request"
|
|---|
| 161 | resp.close()
|
|---|
| 162 |
|
|---|
| 163 |
|
|---|
| 164 | class OfflineTest(TestCase):
|
|---|
| 165 | def test_responses(self):
|
|---|
| 166 | self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found")
|
|---|
| 167 |
|
|---|
| 168 | def test_main(verbose=None):
|
|---|
| 169 | tests = [HeaderTests,OfflineTest]
|
|---|
| 170 | test_support.run_unittest(*tests)
|
|---|
| 171 |
|
|---|
| 172 | test()
|
|---|