OLD | NEW |
1 # -*- Mode: Python; tab-width: 4 -*- | 1 # -*- Mode: Python; tab-width: 4 -*- |
2 # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp | 2 # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp |
3 # Author: Sam Rushing <rushing@nightmare.com> | 3 # Author: Sam Rushing <rushing@nightmare.com> |
4 | 4 |
5 # ====================================================================== | 5 # ====================================================================== |
6 # Copyright 1996 by Sam Rushing | 6 # Copyright 1996 by Sam Rushing |
7 # | 7 # |
8 # All Rights Reserved | 8 # All Rights Reserved |
9 # | 9 # |
10 # Permission to use, copy, modify, and distribute this software and | 10 # Permission to use, copy, modify, and distribute this software and |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
53 class async_chat (asyncore.dispatcher): | 53 class async_chat (asyncore.dispatcher): |
54 """This is an abstract class. You must derive from this class, and add | 54 """This is an abstract class. You must derive from this class, and add |
55 the two methods collect_incoming_data() and found_terminator()""" | 55 the two methods collect_incoming_data() and found_terminator()""" |
56 | 56 |
57 # these are overridable defaults | 57 # these are overridable defaults |
58 | 58 |
59 ac_in_buffer_size = 4096 | 59 ac_in_buffer_size = 4096 |
60 ac_out_buffer_size = 4096 | 60 ac_out_buffer_size = 4096 |
61 | 61 |
62 def __init__ (self, conn=None): | 62 def __init__ (self, conn=None): |
| 63 # for string terminator matching |
63 self.ac_in_buffer = '' | 64 self.ac_in_buffer = '' |
64 self.ac_out_buffer = '' | 65 ········ |
65 self.producer_fifo = fifo() | 66 # we use a list here rather than cStringIO for a few reasons... |
| 67 # del lst[:] is faster than sio.truncate(0) |
| 68 # lst = [] is faster than sio.truncate(0) |
| 69 # cStringIO will be gaining unicode support in py3k, which |
| 70 # will negatively affect the performance of bytes compared to |
| 71 # a ''.join() equivalent |
| 72 self.incoming = [] |
| 73 ········ |
| 74 # we toss the use of the "simple producer" and replace it with |
| 75 # a pure deque, which the original fifo was a wrapping of |
| 76 self.producer_fifo = deque() |
66 asyncore.dispatcher.__init__ (self, conn) | 77 asyncore.dispatcher.__init__ (self, conn) |
67 | 78 |
68 def collect_incoming_data(self, data): | 79 def collect_incoming_data(self, data): |
69 raise NotImplementedError, "must be implemented in subclass" | 80 raise NotImplementedError("must be implemented in subclass") |
| 81 ···· |
| 82 def _collect_incoming_data(self, data): |
| 83 self.incoming.append(data) |
| 84 ···· |
| 85 def _get_data(self): |
| 86 d = ''.join(self.incoming) |
| 87 del self.incoming[:] |
| 88 return d |
70 | 89 |
71 def found_terminator(self): | 90 def found_terminator(self): |
72 raise NotImplementedError, "must be implemented in subclass" | 91 raise NotImplementedError("must be implemented in subclass") |
73 | 92 |
74 def set_terminator (self, term): | 93 def set_terminator (self, term): |
75 "Set the input delimiter. Can be a fixed string of any length, an integ
er, or None" | 94 "Set the input delimiter. Can be a fixed string of any length, an integ
er, or None" |
76 self.terminator = term | 95 self.terminator = term |
77 | 96 |
78 def get_terminator (self): | 97 def get_terminator (self): |
79 return self.terminator | 98 return self.terminator |
80 | 99 |
81 # grab some more data from the socket, | 100 # grab some more data from the socket, |
82 # throw it to the collector method, | 101 # throw it to the collector method, |
83 # check for the terminator, | 102 # check for the terminator, |
84 # if found, transition to the next state. | 103 # if found, transition to the next state. |
85 | 104 |
86 def handle_read (self): | 105 def handle_read (self): |
87 | 106 |
88 try: | 107 try: |
89 data = self.recv (self.ac_in_buffer_size) | 108 data = self.recv (self.ac_in_buffer_size) |
90 except socket.error, why: | 109 except socket.error, why: |
91 self.handle_error() | 110 self.handle_error() |
92 return | 111 return |
93 | 112 |
94 self.ac_in_buffer = self.ac_in_buffer + data | 113 self.ac_in_buffer = self.ac_in_buffer + data |
95 | 114 |
96 # Continue to search for self.terminator in self.ac_in_buffer, | 115 # Continue to search for self.terminator in self.ac_in_buffer, |
97 # while calling self.collect_incoming_data. The while loop | 116 # while calling self.collect_incoming_data. The while loop |
98 # is necessary because we might read several data+terminator | 117 # is necessary because we might read several data+terminator |
99 # combos with a single recv(1024). | 118 # combos with a single recv(4096). |
100 | 119 |
101 while self.ac_in_buffer: | 120 while self.ac_in_buffer: |
102 lb = len(self.ac_in_buffer) | 121 lb = len(self.ac_in_buffer) |
103 terminator = self.get_terminator() | 122 terminator = self.get_terminator() |
104 if not terminator: | 123 if not terminator: |
105 # no terminator, collect it all | 124 # no terminator, collect it all |
106 self.collect_incoming_data (self.ac_in_buffer) | 125 self.collect_incoming_data (self.ac_in_buffer) |
107 self.ac_in_buffer = '' | 126 self.ac_in_buffer = '' |
108 elif isinstance(terminator, int) or isinstance(terminator, long): | 127 elif isinstance(terminator, int) or isinstance(terminator, long): |
109 # numeric terminator | 128 # numeric terminator |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
143 # we found a prefix, collect up to the prefix | 162 # we found a prefix, collect up to the prefix |
144 self.collect_incoming_data (self.ac_in_buffer[:-inde
x]) | 163 self.collect_incoming_data (self.ac_in_buffer[:-inde
x]) |
145 self.ac_in_buffer = self.ac_in_buffer[-index:] | 164 self.ac_in_buffer = self.ac_in_buffer[-index:] |
146 break | 165 break |
147 else: | 166 else: |
148 # no prefix, collect it all | 167 # no prefix, collect it all |
149 self.collect_incoming_data (self.ac_in_buffer) | 168 self.collect_incoming_data (self.ac_in_buffer) |
150 self.ac_in_buffer = '' | 169 self.ac_in_buffer = '' |
151 | 170 |
152 def handle_write (self): | 171 def handle_write (self): |
153 self.initiate_send () | 172 self.initiate_send() |
154 | 173 |
155 def handle_close (self): | 174 def handle_close (self): |
156 self.close() | 175 self.close() |
157 | 176 |
158 def push (self, data): | 177 def push (self, data): |
159 self.producer_fifo.push (simple_producer (data)) | 178 sabs = self.ac_out_buffer_size |
| 179 if len(data) > sabs: |
| 180 for i in xrange(0, len(data), sabs): |
| 181 self.producer_fifo.append(data[i:i+sabs]) |
| 182 else: |
| 183 self.producer_fifo.append(data) |
160 self.initiate_send() | 184 self.initiate_send() |
161 | 185 |
162 def push_with_producer (self, producer): | 186 def push_with_producer (self, producer): |
163 self.producer_fifo.push (producer) | 187 self.producer_fifo.append(producer) |
164 self.initiate_send() | 188 self.initiate_send() |
165 | 189 |
166 def readable (self): | 190 def readable (self): |
167 "predicate for inclusion in the readable for select()" | 191 "predicate for inclusion in the readable for select()" |
168 return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) | 192 # cannot use the old predicate, it violates the claim of the |
| 193 # set_terminator method. |
| 194 ········ |
| 195 # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) |
| 196 return 1· |
169 | 197 |
170 def writable (self): | 198 def writable (self): |
171 "predicate for inclusion in the writable for select()" | 199 "predicate for inclusion in the writable for select()" |
172 # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self
.connected) | 200 return self.producer_fifo or (not self.connected) |
173 # this is about twice as fast, though not as clear. | |
174 return not ( | |
175 (self.ac_out_buffer == '') and | |
176 self.producer_fifo.is_empty() and | |
177 self.connected | |
178 ) | |
179 | 201 |
180 def close_when_done (self): | 202 def close_when_done (self): |
181 "automatically close this channel once the outgoing queue is empty" | 203 "automatically close this channel once the outgoing queue is empty" |
182 self.producer_fifo.push (None) | 204 self.producer_fifo.append(None) |
183 | 205 |
184 # refill the outgoing buffer by calling the more() method | 206 def initiate_send(self): |
185 # of the first producer in the queue | 207 while self.producer_fifo and self.connected: |
186 def refill_buffer (self): | 208 first = self.producer_fifo[0] |
187 while 1: | 209 # handle empty string/buffer or None entry |
188 if len(self.producer_fifo): | 210 if not first: |
189 p = self.producer_fifo.first() | 211 del self.producer_fifo[0] |
190 # a 'None' in the producer fifo is a sentinel, | 212 if first is None: |
191 # telling us to close the channel. | 213 self.handle_close() |
192 if p is None: | |
193 if not self.ac_out_buffer: | |
194 self.producer_fifo.pop() | |
195 self.close() | |
196 return | 214 return |
197 elif isinstance(p, str): | 215 |
198 self.producer_fifo.pop() | 216 # handle classic producer behavior |
199 self.ac_out_buffer = self.ac_out_buffer + p | 217 obs = self.ac_out_buffer_size |
200 return | 218 try: |
201 data = p.more() | 219 data = buffer(first, 0, obs) |
| 220 except TypeError: |
| 221 data = first.more() |
202 if data: | 222 if data: |
203 self.ac_out_buffer = self.ac_out_buffer + data | 223 self.producer_fifo.appendleft(data) |
204 return | |
205 else: | 224 else: |
206 self.producer_fifo.pop() | 225 del self.producer_fifo[0] |
207 else: | 226 continue |
| 227 |
| 228 # send the data |
| 229 try: |
| 230 num_sent = self.send(data) |
| 231 except socket.error: |
| 232 self.handle_error() |
208 return | 233 return |
209 | 234 |
210 def initiate_send (self): | 235 if num_sent: |
211 obs = self.ac_out_buffer_size | 236 if num_sent < len(data) or obs < len(first): |
212 # try to refill the buffer | 237 self.producer_fifo[0] = first[num_sent:] |
213 if (len (self.ac_out_buffer) < obs): | 238 else: |
214 self.refill_buffer() | 239 del self.producer_fifo[0] |
215 | 240 # we tried to send some actual data |
216 if self.ac_out_buffer and self.connected: | 241 return |
217 # try to send the buffer | |
218 try: | |
219 num_sent = self.send (self.ac_out_buffer[:obs]) | |
220 if num_sent: | |
221 self.ac_out_buffer = self.ac_out_buffer[num_sent:] | |
222 | |
223 except socket.error, why: | |
224 self.handle_error() | |
225 return | |
226 | 242 |
227 def discard_buffers (self): | 243 def discard_buffers (self): |
228 # Emergencies only! | 244 # Emergencies only! |
229 self.ac_in_buffer = '' | 245 self.ac_in_buffer = '' |
230 self.ac_out_buffer = '' | 246 del self.incoming[:] |
231 while self.producer_fifo: | 247 self.producer_fifo.clear() |
232 self.producer_fifo.pop() | |
233 | |
234 | 248 |
235 class simple_producer: | 249 class simple_producer: |
236 | 250 |
237 def __init__ (self, data, buffer_size=512): | 251 def __init__ (self, data, buffer_size=512): |
238 self.data = data | 252 self.data = data |
239 self.buffer_size = buffer_size | 253 self.buffer_size = buffer_size |
240 | 254 |
241 def more (self): | 255 def more (self): |
242 if len (self.data) > self.buffer_size: | 256 if len (self.data) > self.buffer_size: |
243 result = self.data[:self.buffer_size] | 257 result = self.data[:self.buffer_size] |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
286 # new python: 28961/s | 300 # new python: 28961/s |
287 # old python: 18307/s | 301 # old python: 18307/s |
288 # re: 12820/s | 302 # re: 12820/s |
289 # regex: 14035/s | 303 # regex: 14035/s |
290 | 304 |
291 def find_prefix_at_end (haystack, needle): | 305 def find_prefix_at_end (haystack, needle): |
292 l = len(needle) - 1 | 306 l = len(needle) - 1 |
293 while l and not haystack.endswith(needle[:l]): | 307 while l and not haystack.endswith(needle[:l]): |
294 l -= 1 | 308 l -= 1 |
295 return l | 309 return l |
OLD | NEW |