| 1 | # Test case for the os.poll() function
|
|---|
| 2 |
|
|---|
| 3 | import sys, os, select, random
|
|---|
| 4 | from test.test_support import verify, verbose, TestSkipped, TESTFN
|
|---|
| 5 |
|
|---|
| 6 | try:
|
|---|
| 7 | select.poll
|
|---|
| 8 | except AttributeError:
|
|---|
| 9 | raise TestSkipped, "select.poll not defined -- skipping test_poll"
|
|---|
| 10 |
|
|---|
| 11 |
|
|---|
| 12 | def find_ready_matching(ready, flag):
|
|---|
| 13 | match = []
|
|---|
| 14 | for fd, mode in ready:
|
|---|
| 15 | if mode & flag:
|
|---|
| 16 | match.append(fd)
|
|---|
| 17 | return match
|
|---|
| 18 |
|
|---|
| 19 | def test_poll1():
|
|---|
| 20 | """Basic functional test of poll object
|
|---|
| 21 |
|
|---|
| 22 | Create a bunch of pipe and test that poll works with them.
|
|---|
| 23 | """
|
|---|
| 24 | print 'Running poll test 1'
|
|---|
| 25 | p = select.poll()
|
|---|
| 26 |
|
|---|
| 27 | NUM_PIPES = 12
|
|---|
| 28 | MSG = " This is a test."
|
|---|
| 29 | MSG_LEN = len(MSG)
|
|---|
| 30 | readers = []
|
|---|
| 31 | writers = []
|
|---|
| 32 | r2w = {}
|
|---|
| 33 | w2r = {}
|
|---|
| 34 |
|
|---|
| 35 | for i in range(NUM_PIPES):
|
|---|
| 36 | rd, wr = os.pipe()
|
|---|
| 37 | p.register(rd, select.POLLIN)
|
|---|
| 38 | p.register(wr, select.POLLOUT)
|
|---|
| 39 | readers.append(rd)
|
|---|
| 40 | writers.append(wr)
|
|---|
| 41 | r2w[rd] = wr
|
|---|
| 42 | w2r[wr] = rd
|
|---|
| 43 |
|
|---|
| 44 | while writers:
|
|---|
| 45 | ready = p.poll()
|
|---|
| 46 | ready_writers = find_ready_matching(ready, select.POLLOUT)
|
|---|
| 47 | if not ready_writers:
|
|---|
| 48 | raise RuntimeError, "no pipes ready for writing"
|
|---|
| 49 | wr = random.choice(ready_writers)
|
|---|
| 50 | os.write(wr, MSG)
|
|---|
| 51 |
|
|---|
| 52 | ready = p.poll()
|
|---|
| 53 | ready_readers = find_ready_matching(ready, select.POLLIN)
|
|---|
| 54 | if not ready_readers:
|
|---|
| 55 | raise RuntimeError, "no pipes ready for reading"
|
|---|
| 56 | rd = random.choice(ready_readers)
|
|---|
| 57 | buf = os.read(rd, MSG_LEN)
|
|---|
| 58 | verify(len(buf) == MSG_LEN)
|
|---|
| 59 | print buf
|
|---|
| 60 | os.close(r2w[rd]) ; os.close( rd )
|
|---|
| 61 | p.unregister( r2w[rd] )
|
|---|
| 62 | p.unregister( rd )
|
|---|
| 63 | writers.remove(r2w[rd])
|
|---|
| 64 |
|
|---|
| 65 | poll_unit_tests()
|
|---|
| 66 | print 'Poll test 1 complete'
|
|---|
| 67 |
|
|---|
| 68 | def poll_unit_tests():
|
|---|
| 69 | # returns NVAL for invalid file descriptor
|
|---|
| 70 | FD = 42
|
|---|
| 71 | try:
|
|---|
| 72 | os.close(FD)
|
|---|
| 73 | except OSError:
|
|---|
| 74 | pass
|
|---|
| 75 | p = select.poll()
|
|---|
| 76 | p.register(FD)
|
|---|
| 77 | r = p.poll()
|
|---|
| 78 | verify(r[0] == (FD, select.POLLNVAL))
|
|---|
| 79 |
|
|---|
| 80 | f = open(TESTFN, 'w')
|
|---|
| 81 | fd = f.fileno()
|
|---|
| 82 | p = select.poll()
|
|---|
| 83 | p.register(f)
|
|---|
| 84 | r = p.poll()
|
|---|
| 85 | verify(r[0][0] == fd)
|
|---|
| 86 | f.close()
|
|---|
| 87 | r = p.poll()
|
|---|
| 88 | verify(r[0] == (fd, select.POLLNVAL))
|
|---|
| 89 | os.unlink(TESTFN)
|
|---|
| 90 |
|
|---|
| 91 | # type error for invalid arguments
|
|---|
| 92 | p = select.poll()
|
|---|
| 93 | try:
|
|---|
| 94 | p.register(p)
|
|---|
| 95 | except TypeError:
|
|---|
| 96 | pass
|
|---|
| 97 | else:
|
|---|
| 98 | print "Bogus register call did not raise TypeError"
|
|---|
| 99 | try:
|
|---|
| 100 | p.unregister(p)
|
|---|
| 101 | except TypeError:
|
|---|
| 102 | pass
|
|---|
| 103 | else:
|
|---|
| 104 | print "Bogus unregister call did not raise TypeError"
|
|---|
| 105 |
|
|---|
| 106 | # can't unregister non-existent object
|
|---|
| 107 | p = select.poll()
|
|---|
| 108 | try:
|
|---|
| 109 | p.unregister(3)
|
|---|
| 110 | except KeyError:
|
|---|
| 111 | pass
|
|---|
| 112 | else:
|
|---|
| 113 | print "Bogus unregister call did not raise KeyError"
|
|---|
| 114 |
|
|---|
| 115 | # Test error cases
|
|---|
| 116 | pollster = select.poll()
|
|---|
| 117 | class Nope:
|
|---|
| 118 | pass
|
|---|
| 119 |
|
|---|
| 120 | class Almost:
|
|---|
| 121 | def fileno(self):
|
|---|
| 122 | return 'fileno'
|
|---|
| 123 |
|
|---|
| 124 | try:
|
|---|
| 125 | pollster.register( Nope(), 0 )
|
|---|
| 126 | except TypeError: pass
|
|---|
| 127 | else: print 'expected TypeError exception, not raised'
|
|---|
| 128 |
|
|---|
| 129 | try:
|
|---|
| 130 | pollster.register( Almost(), 0 )
|
|---|
| 131 | except TypeError: pass
|
|---|
| 132 | else: print 'expected TypeError exception, not raised'
|
|---|
| 133 |
|
|---|
| 134 |
|
|---|
| 135 | # Another test case for poll(). This is copied from the test case for
|
|---|
| 136 | # select(), modified to use poll() instead.
|
|---|
| 137 |
|
|---|
| 138 | def test_poll2():
|
|---|
| 139 | print 'Running poll test 2'
|
|---|
| 140 | cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
|
|---|
| 141 | p = os.popen(cmd, 'r')
|
|---|
| 142 | pollster = select.poll()
|
|---|
| 143 | pollster.register( p, select.POLLIN )
|
|---|
| 144 | for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
|
|---|
| 145 | if verbose:
|
|---|
| 146 | print 'timeout =', tout
|
|---|
| 147 | fdlist = pollster.poll(tout)
|
|---|
| 148 | if (fdlist == []):
|
|---|
| 149 | continue
|
|---|
| 150 | fd, flags = fdlist[0]
|
|---|
| 151 | if flags & select.POLLHUP:
|
|---|
| 152 | line = p.readline()
|
|---|
| 153 | if line != "":
|
|---|
| 154 | print 'error: pipe seems to be closed, but still returns data'
|
|---|
| 155 | continue
|
|---|
| 156 |
|
|---|
| 157 | elif flags & select.POLLIN:
|
|---|
| 158 | line = p.readline()
|
|---|
| 159 | if verbose:
|
|---|
| 160 | print repr(line)
|
|---|
| 161 | if not line:
|
|---|
| 162 | if verbose:
|
|---|
| 163 | print 'EOF'
|
|---|
| 164 | break
|
|---|
| 165 | continue
|
|---|
| 166 | else:
|
|---|
| 167 | print 'Unexpected return value from select.poll:', fdlist
|
|---|
| 168 | p.close()
|
|---|
| 169 | print 'Poll test 2 complete'
|
|---|
| 170 |
|
|---|
| 171 | def test_poll3():
|
|---|
| 172 | # test int overflow
|
|---|
| 173 | print 'Running poll test 3'
|
|---|
| 174 | pollster = select.poll()
|
|---|
| 175 | pollster.register(1)
|
|---|
| 176 |
|
|---|
| 177 | try:
|
|---|
| 178 | pollster.poll(1L << 64)
|
|---|
| 179 | except OverflowError:
|
|---|
| 180 | pass
|
|---|
| 181 | else:
|
|---|
| 182 | print 'Expected OverflowError with excessive timeout'
|
|---|
| 183 |
|
|---|
| 184 | x = 2 + 3
|
|---|
| 185 | if x != 5:
|
|---|
| 186 | print 'Overflow must have occurred'
|
|---|
| 187 | print 'Poll test 3 complete'
|
|---|
| 188 |
|
|---|
| 189 |
|
|---|
| 190 | test_poll1()
|
|---|
| 191 | test_poll2()
|
|---|
| 192 | test_poll3()
|
|---|