| 1 | # -*- Mode: Python; tab-width: 4 -*-
|
|---|
| 2 | # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
|
|---|
| 3 | # Author: Sam Rushing <[email protected]>
|
|---|
| 4 |
|
|---|
| 5 | # ======================================================================
|
|---|
| 6 | # Copyright 1996 by Sam Rushing
|
|---|
| 7 | #
|
|---|
| 8 | # All Rights Reserved
|
|---|
| 9 | #
|
|---|
| 10 | # Permission to use, copy, modify, and distribute this software and
|
|---|
| 11 | # its documentation for any purpose and without fee is hereby
|
|---|
| 12 | # granted, provided that the above copyright notice appear in all
|
|---|
| 13 | # copies and that both that copyright notice and this permission
|
|---|
| 14 | # notice appear in supporting documentation, and that the name of Sam
|
|---|
| 15 | # Rushing not be used in advertising or publicity pertaining to
|
|---|
| 16 | # distribution of the software without specific, written prior
|
|---|
| 17 | # permission.
|
|---|
| 18 | #
|
|---|
| 19 | # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
|
|---|
| 20 | # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
|
|---|
| 21 | # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
|
|---|
| 22 | # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
|
|---|
| 23 | # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
|
|---|
| 24 | # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
|---|
| 25 | # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|---|
| 26 | # ======================================================================
|
|---|
| 27 |
|
|---|
| 28 | r"""A class supporting chat-style (command/response) protocols.
|
|---|
| 29 |
|
|---|
| 30 | This class adds support for 'chat' style protocols - where one side
|
|---|
| 31 | sends a 'command', and the other sends a response (examples would be
|
|---|
| 32 | the common internet protocols - smtp, nntp, ftp, etc..).
|
|---|
| 33 |
|
|---|
| 34 | The handle_read() method looks at the input stream for the current
|
|---|
| 35 | 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
|
|---|
| 36 | for multi-line output), calling self.found_terminator() on its
|
|---|
| 37 | receipt.
|
|---|
| 38 |
|
|---|
| 39 | for example:
|
|---|
| 40 | Say you build an async nntp client using this class. At the start
|
|---|
| 41 | of the connection, you'll have self.terminator set to '\r\n', in
|
|---|
| 42 | order to process the single-line greeting. Just before issuing a
|
|---|
| 43 | 'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST
|
|---|
| 44 | command will be accumulated (using your own 'collect_incoming_data'
|
|---|
| 45 | method) up to the terminator, and then control will be returned to
|
|---|
| 46 | you - by calling your self.found_terminator() method.
|
|---|
| 47 | """
|
|---|
| 48 |
|
|---|
| 49 | import socket
|
|---|
| 50 | import asyncore
|
|---|
| 51 | from collections import deque
|
|---|
| 52 |
|
|---|
| 53 | class async_chat (asyncore.dispatcher):
|
|---|
| 54 | """This is an abstract class. You must derive from this class, and add
|
|---|
| 55 | the two methods collect_incoming_data() and found_terminator()"""
|
|---|
| 56 |
|
|---|
| 57 | # these are overridable defaults
|
|---|
| 58 |
|
|---|
| 59 | ac_in_buffer_size = 4096
|
|---|
| 60 | ac_out_buffer_size = 4096
|
|---|
| 61 |
|
|---|
| 62 | def __init__ (self, conn=None):
|
|---|
| 63 | self.ac_in_buffer = ''
|
|---|
| 64 | self.ac_out_buffer = ''
|
|---|
| 65 | self.producer_fifo = fifo()
|
|---|
| 66 | asyncore.dispatcher.__init__ (self, conn)
|
|---|
| 67 |
|
|---|
| 68 | def collect_incoming_data(self, data):
|
|---|
| 69 | raise NotImplementedError, "must be implemented in subclass"
|
|---|
| 70 |
|
|---|
| 71 | def found_terminator(self):
|
|---|
| 72 | raise NotImplementedError, "must be implemented in subclass"
|
|---|
| 73 |
|
|---|
| 74 | def set_terminator (self, term):
|
|---|
| 75 | "Set the input delimiter. Can be a fixed string of any length, an integer, or None"
|
|---|
| 76 | self.terminator = term
|
|---|
| 77 |
|
|---|
| 78 | def get_terminator (self):
|
|---|
| 79 | return self.terminator
|
|---|
| 80 |
|
|---|
| 81 | # grab some more data from the socket,
|
|---|
| 82 | # throw it to the collector method,
|
|---|
| 83 | # check for the terminator,
|
|---|
| 84 | # if found, transition to the next state.
|
|---|
| 85 |
|
|---|
| 86 | def handle_read (self):
|
|---|
| 87 |
|
|---|
| 88 | try:
|
|---|
| 89 | data = self.recv (self.ac_in_buffer_size)
|
|---|
| 90 | except socket.error, why:
|
|---|
| 91 | self.handle_error()
|
|---|
| 92 | return
|
|---|
| 93 |
|
|---|
| 94 | self.ac_in_buffer = self.ac_in_buffer + data
|
|---|
| 95 |
|
|---|
| 96 | # Continue to search for self.terminator in self.ac_in_buffer,
|
|---|
| 97 | # while calling self.collect_incoming_data. The while loop
|
|---|
| 98 | # is necessary because we might read several data+terminator
|
|---|
| 99 | # combos with a single recv(1024).
|
|---|
| 100 |
|
|---|
| 101 | while self.ac_in_buffer:
|
|---|
| 102 | lb = len(self.ac_in_buffer)
|
|---|
| 103 | terminator = self.get_terminator()
|
|---|
| 104 | if not terminator:
|
|---|
| 105 | # no terminator, collect it all
|
|---|
| 106 | self.collect_incoming_data (self.ac_in_buffer)
|
|---|
| 107 | self.ac_in_buffer = ''
|
|---|
| 108 | elif isinstance(terminator, int) or isinstance(terminator, long):
|
|---|
| 109 | # numeric terminator
|
|---|
| 110 | n = terminator
|
|---|
| 111 | if lb < n:
|
|---|
| 112 | self.collect_incoming_data (self.ac_in_buffer)
|
|---|
| 113 | self.ac_in_buffer = ''
|
|---|
| 114 | self.terminator = self.terminator - lb
|
|---|
| 115 | else:
|
|---|
| 116 | self.collect_incoming_data (self.ac_in_buffer[:n])
|
|---|
| 117 | self.ac_in_buffer = self.ac_in_buffer[n:]
|
|---|
| 118 | self.terminator = 0
|
|---|
| 119 | self.found_terminator()
|
|---|
| 120 | else:
|
|---|
| 121 | # 3 cases:
|
|---|
| 122 | # 1) end of buffer matches terminator exactly:
|
|---|
| 123 | # collect data, transition
|
|---|
| 124 | # 2) end of buffer matches some prefix:
|
|---|
| 125 | # collect data to the prefix
|
|---|
| 126 | # 3) end of buffer does not match any prefix:
|
|---|
| 127 | # collect data
|
|---|
| 128 | terminator_len = len(terminator)
|
|---|
| 129 | index = self.ac_in_buffer.find(terminator)
|
|---|
| 130 | if index != -1:
|
|---|
| 131 | # we found the terminator
|
|---|
| 132 | if index > 0:
|
|---|
| 133 | # don't bother reporting the empty string (source of subtle bugs)
|
|---|
| 134 | self.collect_incoming_data (self.ac_in_buffer[:index])
|
|---|
| 135 | self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
|
|---|
| 136 | # This does the Right Thing if the terminator is changed here.
|
|---|
| 137 | self.found_terminator()
|
|---|
| 138 | else:
|
|---|
| 139 | # check for a prefix of the terminator
|
|---|
| 140 | index = find_prefix_at_end (self.ac_in_buffer, terminator)
|
|---|
| 141 | if index:
|
|---|
| 142 | if index != lb:
|
|---|
| 143 | # we found a prefix, collect up to the prefix
|
|---|
| 144 | self.collect_incoming_data (self.ac_in_buffer[:-index])
|
|---|
| 145 | self.ac_in_buffer = self.ac_in_buffer[-index:]
|
|---|
| 146 | break
|
|---|
| 147 | else:
|
|---|
| 148 | # no prefix, collect it all
|
|---|
| 149 | self.collect_incoming_data (self.ac_in_buffer)
|
|---|
| 150 | self.ac_in_buffer = ''
|
|---|
| 151 |
|
|---|
| 152 | def handle_write (self):
|
|---|
| 153 | self.initiate_send ()
|
|---|
| 154 |
|
|---|
| 155 | def handle_close (self):
|
|---|
| 156 | self.close()
|
|---|
| 157 |
|
|---|
| 158 | def push (self, data):
|
|---|
| 159 | self.producer_fifo.push (simple_producer (data))
|
|---|
| 160 | self.initiate_send()
|
|---|
| 161 |
|
|---|
| 162 | def push_with_producer (self, producer):
|
|---|
| 163 | self.producer_fifo.push (producer)
|
|---|
| 164 | self.initiate_send()
|
|---|
| 165 |
|
|---|
| 166 | def readable (self):
|
|---|
| 167 | "predicate for inclusion in the readable for select()"
|
|---|
| 168 | return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
|
|---|
| 169 |
|
|---|
| 170 | def writable (self):
|
|---|
| 171 | "predicate for inclusion in the writable for select()"
|
|---|
| 172 | # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
|
|---|
| 173 | # this is about twice as fast, though not as clear.
|
|---|
| 174 | return not (
|
|---|
| 175 | (self.ac_out_buffer == '') and
|
|---|
| 176 | self.producer_fifo.is_empty() and
|
|---|
| 177 | self.connected
|
|---|
| 178 | )
|
|---|
| 179 |
|
|---|
| 180 | def close_when_done (self):
|
|---|
| 181 | "automatically close this channel once the outgoing queue is empty"
|
|---|
| 182 | self.producer_fifo.push (None)
|
|---|
| 183 |
|
|---|
| 184 | # refill the outgoing buffer by calling the more() method
|
|---|
| 185 | # of the first producer in the queue
|
|---|
| 186 | def refill_buffer (self):
|
|---|
| 187 | while 1:
|
|---|
| 188 | if len(self.producer_fifo):
|
|---|
| 189 | p = self.producer_fifo.first()
|
|---|
| 190 | # a 'None' in the producer fifo is a sentinel,
|
|---|
| 191 | # telling us to close the channel.
|
|---|
| 192 | if p is None:
|
|---|
| 193 | if not self.ac_out_buffer:
|
|---|
| 194 | self.producer_fifo.pop()
|
|---|
| 195 | self.close()
|
|---|
| 196 | return
|
|---|
| 197 | elif isinstance(p, str):
|
|---|
| 198 | self.producer_fifo.pop()
|
|---|
| 199 | self.ac_out_buffer = self.ac_out_buffer + p
|
|---|
| 200 | return
|
|---|
| 201 | data = p.more()
|
|---|
| 202 | if data:
|
|---|
| 203 | self.ac_out_buffer = self.ac_out_buffer + data
|
|---|
| 204 | return
|
|---|
| 205 | else:
|
|---|
| 206 | self.producer_fifo.pop()
|
|---|
| 207 | else:
|
|---|
| 208 | return
|
|---|
| 209 |
|
|---|
| 210 | def initiate_send (self):
|
|---|
| 211 | obs = self.ac_out_buffer_size
|
|---|
| 212 | # try to refill the buffer
|
|---|
| 213 | if (len (self.ac_out_buffer) < obs):
|
|---|
| 214 | self.refill_buffer()
|
|---|
| 215 |
|
|---|
| 216 | if self.ac_out_buffer and self.connected:
|
|---|
| 217 | # try to send the buffer
|
|---|
| 218 | try:
|
|---|
| 219 | num_sent = self.send (self.ac_out_buffer[:obs])
|
|---|
| 220 | if num_sent:
|
|---|
| 221 | self.ac_out_buffer = self.ac_out_buffer[num_sent:]
|
|---|
| 222 |
|
|---|
| 223 | except socket.error, why:
|
|---|
| 224 | self.handle_error()
|
|---|
| 225 | return
|
|---|
| 226 |
|
|---|
| 227 | def discard_buffers (self):
|
|---|
| 228 | # Emergencies only!
|
|---|
| 229 | self.ac_in_buffer = ''
|
|---|
| 230 | self.ac_out_buffer = ''
|
|---|
| 231 | while self.producer_fifo:
|
|---|
| 232 | self.producer_fifo.pop()
|
|---|
| 233 |
|
|---|
| 234 |
|
|---|
| 235 | class simple_producer:
|
|---|
| 236 |
|
|---|
| 237 | def __init__ (self, data, buffer_size=512):
|
|---|
| 238 | self.data = data
|
|---|
| 239 | self.buffer_size = buffer_size
|
|---|
| 240 |
|
|---|
| 241 | def more (self):
|
|---|
| 242 | if len (self.data) > self.buffer_size:
|
|---|
| 243 | result = self.data[:self.buffer_size]
|
|---|
| 244 | self.data = self.data[self.buffer_size:]
|
|---|
| 245 | return result
|
|---|
| 246 | else:
|
|---|
| 247 | result = self.data
|
|---|
| 248 | self.data = ''
|
|---|
| 249 | return result
|
|---|
| 250 |
|
|---|
| 251 | class fifo:
|
|---|
| 252 | def __init__ (self, list=None):
|
|---|
| 253 | if not list:
|
|---|
| 254 | self.list = deque()
|
|---|
| 255 | else:
|
|---|
| 256 | self.list = deque(list)
|
|---|
| 257 |
|
|---|
| 258 | def __len__ (self):
|
|---|
| 259 | return len(self.list)
|
|---|
| 260 |
|
|---|
| 261 | def is_empty (self):
|
|---|
| 262 | return not self.list
|
|---|
| 263 |
|
|---|
| 264 | def first (self):
|
|---|
| 265 | return self.list[0]
|
|---|
| 266 |
|
|---|
| 267 | def push (self, data):
|
|---|
| 268 | self.list.append(data)
|
|---|
| 269 |
|
|---|
| 270 | def pop (self):
|
|---|
| 271 | if self.list:
|
|---|
| 272 | return (1, self.list.popleft())
|
|---|
| 273 | else:
|
|---|
| 274 | return (0, None)
|
|---|
| 275 |
|
|---|
| 276 | # Given 'haystack', see if any prefix of 'needle' is at its end. This
|
|---|
| 277 | # assumes an exact match has already been checked. Return the number of
|
|---|
| 278 | # characters matched.
|
|---|
| 279 | # for example:
|
|---|
| 280 | # f_p_a_e ("qwerty\r", "\r\n") => 1
|
|---|
| 281 | # f_p_a_e ("qwertydkjf", "\r\n") => 0
|
|---|
| 282 | # f_p_a_e ("qwerty\r\n", "\r\n") => <undefined>
|
|---|
| 283 |
|
|---|
| 284 | # this could maybe be made faster with a computed regex?
|
|---|
| 285 | # [answer: no; circa Python-2.0, Jan 2001]
|
|---|
| 286 | # new python: 28961/s
|
|---|
| 287 | # old python: 18307/s
|
|---|
| 288 | # re: 12820/s
|
|---|
| 289 | # regex: 14035/s
|
|---|
| 290 |
|
|---|
| 291 | def find_prefix_at_end (haystack, needle):
|
|---|
| 292 | l = len(needle) - 1
|
|---|
| 293 | while l and not haystack.endswith(needle[:l]):
|
|---|
| 294 | l -= 1
|
|---|
| 295 | return l
|
|---|