-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathreader.py
365 lines (306 loc) · 13.1 KB
/
reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at:
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
# OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the
# License.
"""Provides common functionality for Ion binary and text readers."""
from collections import deque
from enum import IntEnum
from amazon.ion.symbols import SymbolToken
from .core import DataEvent, IonEventType, Transition
from .core import ION_STREAM_END_EVENT
from .util import coroutine
class CodePoint(int):
"""Evaluates as a code point ordinal, while also containing the unicode character representation and
indicating whether the code point was escaped.
"""
def __init__(self, *args, **kwargs):
self.char = None
self.is_escaped = False
class CodePointArray:
"""A mutable sequence of code points. Used in place of bytearray() for text values."""
def __init__(self, initial_bytes=None):
self.__text = u''
if initial_bytes is not None:
for b in initial_bytes:
self.append(b)
def append(self, value):
self.__text += chr(value)
def extend(self, values):
if isinstance(values, str):
self.__text += values
else:
assert isinstance(values, bytes)
for b in iter(values):
self.append(b)
def as_symbol(self):
return SymbolToken(self.__text, sid=None, location=None)
def as_text(self):
return self.__text
def __len__(self):
return len(self.__text)
def __repr__(self):
return 'CodePointArray(text=%s)' % (self.__text,)
__str__ = __repr__
def insert(self, index, value):
raise ValueError('Attempted to add code point in middle of sequence.')
def __setitem__(self, index, value):
raise ValueError('Attempted to set code point in middle of sequence.')
def __getitem__(self, index):
return self.__text[index]
def __delitem__(self, index):
raise ValueError('Attempted to delete from code point sequence.')
_EOF = b'\x04' # End of transmission character.
class BufferQueue(object):
"""A simple circular buffer of buffers."""
def __init__(self, is_unicode=False):
self.__segments = deque()
self.__offset = 0
self.__size = 0
self.__data_cls = CodePointArray if is_unicode else bytearray
if is_unicode:
self.__chr = chr
self.__element_type = str
else:
self.__chr = lambda x: x
self.__element_type = bytes
self.__ord = ord if is_unicode else lambda x: x
self.position = 0
self.is_unicode = is_unicode
@staticmethod
def is_eof(c):
return c is _EOF # Note reference equality, ensuring that the EOF literal is still illegal as part of the data.
@staticmethod
def _incompatible_types(element_type, data):
raise ValueError('Incompatible input data types. Expected %r, got %r.' % (element_type, type(data)))
def extend(self, data):
# TODO Determine if there are any other accumulation strategies that make sense.
# TODO Determine if we should use memoryview to avoid copying.
if not isinstance(data, self.__element_type):
BufferQueue._incompatible_types(self.__element_type, data)
self.__segments.append(data)
self.__size += len(data)
def mark_eof(self):
self.__segments.append(_EOF)
self.__size += 1
def read(self, length, skip=False):
"""Consumes the first ``length`` bytes from the accumulator."""
if length > self.__size:
raise IndexError(
'Cannot pop %d bytes, %d bytes in buffer queue' % (length, self.__size))
self.position += length
self.__size -= length
segments = self.__segments
offset = self.__offset
data = self.__data_cls()
while length > 0:
segment = segments[0]
segment_off = offset
segment_len = len(segment)
segment_rem = segment_len - segment_off
segment_read_len = min(segment_rem, length)
if segment_off == 0 and segment_read_len == segment_rem:
# consume an entire segment
if skip:
segment_slice = self.__element_type()
else:
segment_slice = segment
else:
# Consume a part of the segment.
if skip:
segment_slice = self.__element_type()
else:
segment_slice = segment[segment_off:segment_off + segment_read_len]
offset = 0
segment_off += segment_read_len
if segment_off == segment_len:
segments.popleft()
self.__offset = 0
else:
self.__offset = segment_off
if length <= segment_rem and len(data) == 0:
return segment_slice
data.extend(segment_slice)
length -= segment_read_len
if self.is_unicode:
return data.as_text()
else:
return data
def read_byte(self):
if self.__size < 1:
raise IndexError('Buffer queue is empty')
segments = self.__segments
segment = segments[0]
segment_len = len(segment)
offset = self.__offset
if BufferQueue.is_eof(segment):
octet = _EOF
else:
octet = self.__ord(segment[offset])
offset += 1
if offset == segment_len:
offset = 0
segments.popleft()
self.__offset = offset
self.__size -= 1
self.position += 1
return octet
def unread(self, c):
"""Unread the given character, byte, or code point.
If this is a unicode buffer and the input is an int or byte, it will be interpreted as an ordinal representing
a unicode code point.
If this is a binary buffer, the input must be a byte or int; a unicode character will raise an error.
"""
if self.position < 1:
raise IndexError('Cannot unread an empty buffer queue.')
if isinstance(c, str):
if not self.is_unicode:
BufferQueue._incompatible_types(self.is_unicode, c)
else:
c = self.__chr(c)
num_code_units = self.is_unicode and len(c) or 1
if self.__offset == 0:
if num_code_units == 1:
if self.is_unicode:
segment = c
else:
segment = bytes((c,))
else:
segment = c
self.__segments.appendleft(segment)
else:
self.__offset -= num_code_units
def verify(ch, idx):
existing = self.__segments[0][self.__offset + idx]
if existing != ch:
raise ValueError('Attempted to unread %s when %s was expected.' % (ch, existing))
if num_code_units == 1:
verify(c, 0)
else:
for i in range(num_code_units):
verify(c[i], i)
self.__size += num_code_units
self.position -= num_code_units
def skip(self, length):
"""Removes ``length`` bytes and returns the number length still required to skip"""
if length >= self.__size:
skip_amount = self.__size
rem = length - skip_amount
self.__segments.clear()
self.__offset = 0
self.__size = 0
self.position += skip_amount
else:
rem = 0
self.read(length, skip=True)
return rem
def __iter__(self):
while self.__size > 0:
yield self.read_byte()
def __len__(self):
return self.__size
class ReadEventType(IntEnum):
"""Events that are pushed into an Ion reader co-routine.
Attributes:
DATA: Indicates more data for the reader. The expected type is :class:`bytes`.
NEXT: Indicates that the reader should yield the next event.
SKIP: Indicates that the reader should proceed to the end of the current container.
This type is not meaningful at the top-level.
"""
DATA = 0
NEXT = 1
SKIP = 2
NEXT_EVENT = DataEvent(ReadEventType.NEXT, None)
SKIP_EVENT = DataEvent(ReadEventType.SKIP, None)
def read_data_event(data):
"""Simple wrapper over the :class:`DataEvent` constructor to wrap a :class:`bytes` like
with the ``DATA`` :class:`ReadEventType`.
Args:
data (bytes|unicode): The data for the event. Bytes are accepted by both binary and text readers, while unicode
is accepted by text readers with is_unicode=True.
"""
return DataEvent(ReadEventType.DATA, data)
@coroutine
def reader_trampoline(start, allow_flush=False):
"""Provides the co-routine trampoline for a reader state machine.
The given co-routine is a state machine that yields :class:`Transition` and takes
a Transition of :class:`amazon.ion.core.DataEvent` and the co-routine itself.
A reader must start with a ``ReadEventType.NEXT`` event to prime the parser. In many cases
this will lead to an ``IonEventType.INCOMPLETE`` being yielded, but not always
(consider a reader over an in-memory data structure).
Notes:
A reader delimits its incomplete parse points with ``IonEventType.INCOMPLETE``.
Readers also delimit complete parse points with ``IonEventType.STREAM_END``;
this is similar to the ``INCOMPLETE`` case except that it denotes that a logical
termination of data is *allowed*. When these event are received, the only valid
input event type is a ``ReadEventType.DATA``.
Generally, ``ReadEventType.NEXT`` is used to get the next parse event, but
``ReadEventType.SKIP`` can be used to skip over the current container.
An internal state machine co-routine can delimit a state change without yielding
to the caller by yielding ``None`` event, this will cause the trampoline to invoke
the transition delegate, immediately.
Args:
start: The reader co-routine to initially delegate to.
allow_flush(Optional[bool]): True if this reader supports receiving ``NEXT`` after
yielding ``INCOMPLETE`` to trigger an attempt to flush pending parse events,
otherwise False.
Yields:
amazon.ion.core.IonEvent: the result of parsing.
Receives :class:`DataEvent` to parse into :class:`amazon.ion.core.IonEvent`.
"""
data_event = yield
if data_event is None or data_event.type is not ReadEventType.NEXT:
raise TypeError('Reader must be started with NEXT')
trans = Transition(None, start)
while True:
trans = trans.delegate.send(Transition(data_event, trans.delegate))
data_event = None
if trans.event is not None:
# Only yield if there is an event.
data_event = (yield trans.event)
if trans.event.event_type.is_stream_signal:
if data_event.type is not ReadEventType.DATA:
if not allow_flush or not (trans.event.event_type is IonEventType.INCOMPLETE and
data_event.type is ReadEventType.NEXT):
raise TypeError('Reader expected data: %r' % (data_event,))
else:
if data_event.type is ReadEventType.DATA:
raise TypeError('Reader did not expect data')
if data_event.type is ReadEventType.DATA and len(data_event.data) == 0:
raise ValueError('Empty data not allowed')
if trans.event.depth == 0 \
and trans.event.event_type is not IonEventType.CONTAINER_START \
and data_event.type is ReadEventType.SKIP:
raise TypeError('Cannot skip at the top-level')
_DEFAULT_BUFFER_SIZE = 8196
@coroutine
def blocking_reader(reader, input, buffer_size=_DEFAULT_BUFFER_SIZE):
"""Provides an implementation of using the reader co-routine with a file-like object.
Args:
reader(Coroutine): A reader co-routine.
input(BaseIO): The file-like object to read from.
buffer_size(Optional[int]): The optional buffer size to use.
"""
ion_event = None
while True:
read_event = (yield ion_event)
ion_event = reader.send(read_event)
while ion_event is not None and ion_event.event_type.is_stream_signal:
data = input.read(buffer_size)
if len(data) == 0:
# End of file.
if ion_event.event_type is IonEventType.INCOMPLETE:
ion_event = reader.send(NEXT_EVENT)
continue
else:
yield ION_STREAM_END_EVENT
return
ion_event = reader.send(read_data_event(data))