source: trunk/essentials/dev-lang/python/Lib/test/test_wsgiref.py@ 3226

Last change on this file since 3226 was 3225, checked in by bird, 19 years ago

Python 2.5

File size: 17.5 KB
Line 
1from __future__ import nested_scopes # Backward compat for 2.1
2from unittest import TestSuite, TestCase, makeSuite
3from wsgiref.util import setup_testing_defaults
4from wsgiref.headers import Headers
5from wsgiref.handlers import BaseHandler, BaseCGIHandler
6from wsgiref import util
7from wsgiref.validate import validator
8from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
9from wsgiref.simple_server import make_server
10from StringIO import StringIO
11from SocketServer import BaseServer
12import re, sys
13
14
15class MockServer(WSGIServer):
16 """Non-socket HTTP server"""
17
18 def __init__(self, server_address, RequestHandlerClass):
19 BaseServer.__init__(self, server_address, RequestHandlerClass)
20 self.server_bind()
21
22 def server_bind(self):
23 host, port = self.server_address
24 self.server_name = host
25 self.server_port = port
26 self.setup_environ()
27
28
29class MockHandler(WSGIRequestHandler):
30 """Non-socket HTTP handler"""
31 def setup(self):
32 self.connection = self.request
33 self.rfile, self.wfile = self.connection
34
35 def finish(self):
36 pass
37
38
39
40
41
42def hello_app(environ,start_response):
43 start_response("200 OK", [
44 ('Content-Type','text/plain'),
45 ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
46 ])
47 return ["Hello, world!"]
48
49def run_amock(app=hello_app, data="GET / HTTP/1.0\n\n"):
50 server = make_server("", 80, app, MockServer, MockHandler)
51 inp, out, err, olderr = StringIO(data), StringIO(), StringIO(), sys.stderr
52 sys.stderr = err
53
54 try:
55 server.finish_request((inp,out), ("127.0.0.1",8888))
56 finally:
57 sys.stderr = olderr
58
59 return out.getvalue(), err.getvalue()
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83def compare_generic_iter(make_it,match):
84 """Utility to compare a generic 2.1/2.2+ iterator with an iterable
85
86 If running under Python 2.2+, this tests the iterator using iter()/next(),
87 as well as __getitem__. 'make_it' must be a function returning a fresh
88 iterator to be tested (since this may test the iterator twice)."""
89
90 it = make_it()
91 n = 0
92 for item in match:
93 if not it[n]==item: raise AssertionError
94 n+=1
95 try:
96 it[n]
97 except IndexError:
98 pass
99 else:
100 raise AssertionError("Too many items from __getitem__",it)
101
102 try:
103 iter, StopIteration
104 except NameError:
105 pass
106 else:
107 # Only test iter mode under 2.2+
108 it = make_it()
109 if not iter(it) is it: raise AssertionError
110 for item in match:
111 if not it.next()==item: raise AssertionError
112 try:
113 it.next()
114 except StopIteration:
115 pass
116 else:
117 raise AssertionError("Too many items from .next()",it)
118
119
120
121
122
123
124class IntegrationTests(TestCase):
125
126 def check_hello(self, out, has_length=True):
127 self.assertEqual(out,
128 "HTTP/1.0 200 OK\r\n"
129 "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
130 "Content-Type: text/plain\r\n"
131 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
132 (has_length and "Content-Length: 13\r\n" or "") +
133 "\r\n"
134 "Hello, world!"
135 )
136
137 def test_plain_hello(self):
138 out, err = run_amock()
139 self.check_hello(out)
140
141 def test_validated_hello(self):
142 out, err = run_amock(validator(hello_app))
143 # the middleware doesn't support len(), so content-length isn't there
144 self.check_hello(out, has_length=False)
145
146 def test_simple_validation_error(self):
147 def bad_app(environ,start_response):
148 start_response("200 OK", ('Content-Type','text/plain'))
149 return ["Hello, world!"]
150 out, err = run_amock(validator(bad_app))
151 self.failUnless(out.endswith(
152 "A server error occurred. Please contact the administrator."
153 ))
154 self.assertEqual(
155 err.splitlines()[-2],
156 "AssertionError: Headers (('Content-Type', 'text/plain')) must"
157 " be of type list: <type 'tuple'>"
158 )
159
160
161
162
163
164
165class UtilityTests(TestCase):
166
167 def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
168 env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
169 util.setup_testing_defaults(env)
170 self.assertEqual(util.shift_path_info(env),part)
171 self.assertEqual(env['PATH_INFO'],pi_out)
172 self.assertEqual(env['SCRIPT_NAME'],sn_out)
173 return env
174
175 def checkDefault(self, key, value, alt=None):
176 # Check defaulting when empty
177 env = {}
178 util.setup_testing_defaults(env)
179 if isinstance(value,StringIO):
180 self.failUnless(isinstance(env[key],StringIO))
181 else:
182 self.assertEqual(env[key],value)
183
184 # Check existing value
185 env = {key:alt}
186 util.setup_testing_defaults(env)
187 self.failUnless(env[key] is alt)
188
189 def checkCrossDefault(self,key,value,**kw):
190 util.setup_testing_defaults(kw)
191 self.assertEqual(kw[key],value)
192
193 def checkAppURI(self,uri,**kw):
194 util.setup_testing_defaults(kw)
195 self.assertEqual(util.application_uri(kw),uri)
196
197 def checkReqURI(self,uri,query=1,**kw):
198 util.setup_testing_defaults(kw)
199 self.assertEqual(util.request_uri(kw,query),uri)
200
201
202
203
204
205
206 def checkFW(self,text,size,match):
207
208 def make_it(text=text,size=size):
209 return util.FileWrapper(StringIO(text),size)
210
211 compare_generic_iter(make_it,match)
212
213 it = make_it()
214 self.failIf(it.filelike.closed)
215
216 for item in it:
217 pass
218
219 self.failIf(it.filelike.closed)
220
221 it.close()
222 self.failUnless(it.filelike.closed)
223
224
225 def testSimpleShifts(self):
226 self.checkShift('','/', '', '/', '')
227 self.checkShift('','/x', 'x', '/x', '')
228 self.checkShift('/','', None, '/', '')
229 self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
230 self.checkShift('/a','/x/', 'x', '/a/x', '/')
231
232
233 def testNormalizedShifts(self):
234 self.checkShift('/a/b', '/../y', '..', '/a', '/y')
235 self.checkShift('', '/../y', '..', '', '/y')
236 self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
237 self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
238 self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
239 self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
240 self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
241 self.checkShift('/a/b', '///', '', '/a/b/', '')
242 self.checkShift('/a/b', '/.//', '', '/a/b/', '')
243 self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
244 self.checkShift('/a/b', '/.', None, '/a/b', '')
245
246
247 def testDefaults(self):
248 for key, value in [
249 ('SERVER_NAME','127.0.0.1'),
250 ('SERVER_PORT', '80'),
251 ('SERVER_PROTOCOL','HTTP/1.0'),
252 ('HTTP_HOST','127.0.0.1'),
253 ('REQUEST_METHOD','GET'),
254 ('SCRIPT_NAME',''),
255 ('PATH_INFO','/'),
256 ('wsgi.version', (1,0)),
257 ('wsgi.run_once', 0),
258 ('wsgi.multithread', 0),
259 ('wsgi.multiprocess', 0),
260 ('wsgi.input', StringIO("")),
261 ('wsgi.errors', StringIO()),
262 ('wsgi.url_scheme','http'),
263 ]:
264 self.checkDefault(key,value)
265
266
267 def testCrossDefaults(self):
268 self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
269 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
270 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
271 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
272 self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
273 self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
274 self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
275
276
277 def testGuessScheme(self):
278 self.assertEqual(util.guess_scheme({}), "http")
279 self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
280 self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
281 self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
282 self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
283
284
285
286
287
288 def testAppURIs(self):
289 self.checkAppURI("http://127.0.0.1/")
290 self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
291 self.checkAppURI("http://spam.example.com:2071/",
292 HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
293 self.checkAppURI("http://spam.example.com/",
294 SERVER_NAME="spam.example.com")
295 self.checkAppURI("http://127.0.0.1/",
296 HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
297 self.checkAppURI("https://127.0.0.1/", HTTPS="on")
298 self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
299 HTTP_HOST=None)
300
301 def testReqURIs(self):
302 self.checkReqURI("http://127.0.0.1/")
303 self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
304 self.checkReqURI("http://127.0.0.1/spammity/spam",
305 SCRIPT_NAME="/spammity", PATH_INFO="/spam")
306 self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
307 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
308 self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
309 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
310
311 def testFileWrapper(self):
312 self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
313
314 def testHopByHop(self):
315 for hop in (
316 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
317 "TE Trailers Transfer-Encoding Upgrade"
318 ).split():
319 for alt in hop, hop.title(), hop.upper(), hop.lower():
320 self.failUnless(util.is_hop_by_hop(alt))
321
322 # Not comprehensive, just a few random header names
323 for hop in (
324 "Accept Cache-Control Date Pragma Trailer Via Warning"
325 ).split():
326 for alt in hop, hop.title(), hop.upper(), hop.lower():