-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathtextio.py
1070 lines (941 loc) · 41.7 KB
/
textio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A source and a sink for reading from and writing to text files."""
# pytype: skip-file
import logging
import os
from functools import partial
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import Optional
from typing import Union
from apache_beam import typehints
from apache_beam.coders import coders
from apache_beam.io import filebasedsink
from apache_beam.io import filebasedsource
from apache_beam.io import iobase
from apache_beam.io.filebasedsource import ReadAllFiles
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.iobase import Read
from apache_beam.io.iobase import Write
from apache_beam.transforms import PTransform
from apache_beam.transforms.display import DisplayDataItem
if TYPE_CHECKING:
from apache_beam.io import fileio
__all__ = [
'ReadFromText',
'ReadFromTextWithFilename',
'ReadAllFromText',
'ReadAllFromTextContinuously',
'WriteToText',
'ReadFromCsv',
'WriteToCsv',
'ReadFromJson',
'WriteToJson',
]
_LOGGER = logging.getLogger(__name__)
class _TextSource(filebasedsource.FileBasedSource):
r"""A source for reading text files.
Parses a text file as newline-delimited elements. Supports newline delimiters
'\n' and '\r\n.
This implementation reads encoded text and uses the input coder's encoding to
decode from bytes to str. This does not support ``UTF-16`` or ``UTF-32``
encodings.
"""
DEFAULT_READ_BUFFER_SIZE = 8192
class ReadBuffer(object):
# A buffer that gives the buffered data and next position in the
# buffer that should be read.
def __init__(self, data, position):
self._data = data
self._position = position
@property
def data(self):
return self._data
@data.setter
def data(self, value):
assert isinstance(value, bytes)
self._data = value
@property
def position(self):
return self._position
@position.setter
def position(self, value):
assert isinstance(value, int)
if value > len(self._data):
raise ValueError(
'Cannot set position to %d since it\'s larger than '
'size of data %d.' % (value, len(self._data)))
self._position = value
def reset(self):
self.data = b''
self.position = 0
def __init__(
self,
file_pattern,
min_bundle_size,
compression_type,
strip_trailing_newlines,
coder: coders.Coder,
buffer_size=DEFAULT_READ_BUFFER_SIZE,
validate=True,
skip_header_lines=0,
header_processor_fns=(None, None),
delimiter=None,
escapechar=None):
"""Initialize a _TextSource
Args:
header_processor_fns (tuple): a tuple of a `header_matcher` function
and a `header_processor` function. The `header_matcher` should
return `True` for all lines at the start of the file that are part
of the file header and `False` otherwise. These header lines will
not be yielded when reading records and instead passed into
`header_processor` to be handled. If `skip_header_lines` and a
`header_matcher` are both provided, the value of `skip_header_lines`
lines will be skipped and the header will be processed from
there.
delimiter (bytes) Optional: delimiter to split records.
Must not self-overlap, because self-overlapping delimiters cause
ambiguous parsing.
escapechar (bytes) Optional: a single byte to escape the records
delimiter, can also escape itself.
Raises:
ValueError: if skip_lines is negative.
Please refer to documentation in class `ReadFromText` for the rest
of the arguments.
"""
super().__init__(
file_pattern,
min_bundle_size,
compression_type=compression_type,
validate=validate)
self._strip_trailing_newlines = strip_trailing_newlines
self._compression_type = compression_type
self._coder = coder
self._buffer_size = buffer_size
if skip_header_lines < 0:
raise ValueError(
'Cannot skip negative number of header lines: %d' % skip_header_lines)
elif skip_header_lines > 10:
_LOGGER.warning(
'Skipping %d header lines. Skipping large number of header '
'lines might significantly slow down processing.')
self._skip_header_lines = skip_header_lines
self._header_matcher, self._header_processor = header_processor_fns
if delimiter is not None:
if not isinstance(delimiter, bytes) or len(delimiter) == 0:
raise ValueError('Delimiter must be a non-empty bytes sequence.')
if self._is_self_overlapping(delimiter):
raise ValueError('Delimiter must not self-overlap.')
self._delimiter = delimiter
if escapechar is not None:
if not (isinstance(escapechar, bytes) and len(escapechar) == 1):
raise ValueError(
"escapechar must be bytes of size 1: '%s'" % escapechar)
self._escapechar = escapechar
def display_data(self):
parent_dd = super().display_data()
parent_dd['strip_newline'] = DisplayDataItem(
self._strip_trailing_newlines, label='Strip Trailing New Lines')
parent_dd['buffer_size'] = DisplayDataItem(
self._buffer_size, label='Buffer Size')
parent_dd['coder'] = DisplayDataItem(self._coder.__class__, label='Coder')
return parent_dd
def read_records(self, file_name, range_tracker):
start_offset = range_tracker.start_position()
read_buffer = _TextSource.ReadBuffer(b'', 0)
next_record_start_position = -1
def split_points_unclaimed(stop_position):
return (
0 if stop_position <= next_record_start_position else
iobase.RangeTracker.SPLIT_POINTS_UNKNOWN)
range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed)
with self.open_file(file_name) as file_to_read:
position_after_processing_header_lines = (
self._process_header(file_to_read, read_buffer))
start_offset = max(start_offset, position_after_processing_header_lines)
if start_offset > position_after_processing_header_lines:
# Seeking to one delimiter length before the start index and ignoring
# the current line. If start_position is at beginning of the line, that
# line belongs to the current bundle, hence ignoring that is incorrect.
# Seeking to one delimiter before prevents that.
if self._delimiter is not None and start_offset >= len(self._delimiter):
required_position = start_offset - len(self._delimiter)
else:
required_position = start_offset - 1
if self._escapechar is not None:
# Need more bytes to check if the delimiter is escaped.
# Seek until the first escapechar if any.
while required_position > 0:
file_to_read.seek(required_position - 1)
if file_to_read.read(1) == self._escapechar:
required_position -= 1
else:
break
file_to_read.seek(required_position)
read_buffer.reset()
sep_bounds = self._find_separator_bounds(file_to_read, read_buffer)
if not sep_bounds:
# Could not find a delimiter after required_position. This means that
# none of the records within the file belongs to the current source.
return
_, sep_end = sep_bounds
read_buffer.data = read_buffer.data[sep_end:]
next_record_start_position = required_position + sep_end
else:
next_record_start_position = position_after_processing_header_lines
while range_tracker.try_claim(next_record_start_position):
record, num_bytes_to_next_record = self._read_record(file_to_read,
read_buffer)
# For compressed text files that use an unsplittable OffsetRangeTracker
# with infinity as the end position, above 'try_claim()' invocation
# would pass for an empty record at the end of file that is not
# followed by a new line character. Since such a record is at the last
# position of a file, it should not be a part of the considered range.
# We do this check to ignore such records.
if len(record) == 0 and num_bytes_to_next_record < 0: # pylint: disable=len-as-condition
break
# Record delimiter must be larger than zero bytes.
assert num_bytes_to_next_record != 0
if num_bytes_to_next_record > 0:
next_record_start_position += num_bytes_to_next_record
yield self._coder.decode(record)
if num_bytes_to_next_record < 0:
break
def _process_header(self, file_to_read, read_buffer):
# Returns a tuple containing the position in file after processing header
# records and a list of decoded header lines that match
# 'header_matcher'.
header_lines = []
position = self._skip_lines(
file_to_read, read_buffer,
self._skip_header_lines) if self._skip_header_lines else 0
if self._header_matcher:
while True:
record, num_bytes_to_next_record = self._read_record(file_to_read,
read_buffer)
decoded_line = self._coder.decode(record)
if not self._header_matcher(decoded_line):
# We've read past the header section at this point, so go back a line.
file_to_read.seek(position)
read_buffer.reset()
break
header_lines.append(decoded_line)
if num_bytes_to_next_record < 0:
break
position += num_bytes_to_next_record
if self._header_processor:
self._header_processor(header_lines)
return position
def _find_separator_bounds(self, file_to_read, read_buffer):
# Determines the start and end positions within 'read_buffer.data' of the
# next delimiter starting from position 'read_buffer.position'.
# Use the custom delimiter to be used in place of
# the default ones ('\n' or '\r\n')'
# This method may increase the size of buffer but it will not decrease the
# size of it.
current_pos = read_buffer.position
# b'\n' use as default
delimiter = self._delimiter or b'\n'
delimiter_len = len(delimiter)
while True:
if current_pos >= len(read_buffer.data) - delimiter_len + 1:
# Ensuring that there are enough bytes to determine
# at current_pos.
if not self._try_to_ensure_num_bytes_in_buffer(
file_to_read, read_buffer, current_pos + delimiter_len):
return
# Using find() here is more efficient than a linear scan
# of the byte array.
next_delim = read_buffer.data.find(delimiter, current_pos)
if next_delim >= 0:
if (self._delimiter is None and
read_buffer.data[next_delim - 1:next_delim] == b'\r'):
if self._escapechar is not None and self._is_escaped(read_buffer,
next_delim - 1):
# Accept '\n' as a default delimiter, because '\r' is escaped.
return (next_delim, next_delim + 1)
else:
# Accept both '\r\n' and '\n' as a default delimiter.
return (next_delim - 1, next_delim + 1)
else:
if self._escapechar is not None and self._is_escaped(read_buffer,
next_delim):
# Skip an escaped delimiter.
current_pos = next_delim + delimiter_len + 1
continue
else:
# Found a delimiter. Accepting that as the next delimiter.
return (next_delim, next_delim + delimiter_len)
elif self._delimiter is not None:
# Corner case: custom delimiter is truncated at the end of the buffer.
next_delim = read_buffer.data.find(
delimiter[0], len(read_buffer.data) - delimiter_len + 1)
if next_delim >= 0:
# Delimiters longer than 1 byte may cross the buffer boundary.
# Defer full matching till the next iteration.
current_pos = next_delim
continue
current_pos = len(read_buffer.data)
def _try_to_ensure_num_bytes_in_buffer(
self, file_to_read, read_buffer, num_bytes):
# Tries to ensure that there are at least num_bytes bytes in the buffer.
# Returns True if this can be fulfilled, returned False if this cannot be
# fulfilled due to reaching EOF.
while len(read_buffer.data) < num_bytes:
read_data = file_to_read.read(self._buffer_size)
if not read_data:
return False
read_buffer.data += read_data
return True
def _skip_lines(self, file_to_read, read_buffer, num_lines):
"""Skip num_lines from file_to_read, return num_lines+1 start position."""
if file_to_read.tell() > 0:
file_to_read.seek(0)
position = 0
for _ in range(num_lines):
_, num_bytes_to_next_record = self._read_record(file_to_read, read_buffer)
if num_bytes_to_next_record < 0:
# We reached end of file. It is OK to just break here
# because subsequent _read_record will return same result.
break
position += num_bytes_to_next_record
return position
def _read_record(self, file_to_read, read_buffer):
# Returns a tuple containing the current_record and number of bytes to the
# next record starting from 'read_buffer.position'. If EOF is
# reached, returns a tuple containing the current record and -1.
if read_buffer.position > self._buffer_size:
# read_buffer is too large. Truncating and adjusting it.
read_buffer.data = read_buffer.data[read_buffer.position:]
read_buffer.position = 0
record_start_position_in_buffer = read_buffer.position
sep_bounds = self._find_separator_bounds(file_to_read, read_buffer)
read_buffer.position = sep_bounds[1] if sep_bounds else len(
read_buffer.data)
if not sep_bounds:
# Reached EOF. Bytes up to the EOF is the next record. Returning '-1' for
# the starting position of the next record.
return (read_buffer.data[record_start_position_in_buffer:], -1)
if self._strip_trailing_newlines:
# Current record should not contain the delimiter.
return (
read_buffer.data[record_start_position_in_buffer:sep_bounds[0]],
sep_bounds[1] - record_start_position_in_buffer)
else:
# Current record should contain the delimiter.
return (
read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
sep_bounds[1] - record_start_position_in_buffer)
@staticmethod
def _is_self_overlapping(delimiter):
# A delimiter self-overlaps if it has a prefix that is also its suffix.
for i in range(1, len(delimiter)):
if delimiter[0:i] == delimiter[len(delimiter) - i:]:
return True
return False
def _is_escaped(self, read_buffer, position):
# Returns True if byte at position is preceded with an odd number
# of escapechar bytes or False if preceded by 0 or even escapes
# (the even number means that all the escapes are escaped themselves).
escape_count = 0
for current_pos in reversed(range(0, position)):
if read_buffer.data[current_pos:current_pos + 1] != self._escapechar:
break
escape_count += 1
return escape_count % 2 == 1
def output_type_hint(self):
try:
return self._coder.to_type_hint()
except NotImplementedError:
return Any
class _TextSourceWithFilename(_TextSource):
def read_records(self, file_name, range_tracker):
records = super().read_records(file_name, range_tracker)
for record in records:
yield (file_name, record)
def output_type_hint(self):
return typehints.KV[str, super().output_type_hint()]
class _TextSink(filebasedsink.FileBasedSink):
"""A sink to a GCS or local text file or files."""
def __init__(
self,
file_path_prefix,
file_name_suffix='',
append_trailing_newlines=True,
num_shards=0,
shard_name_template=None,
coder: coders.Coder = coders.ToBytesCoder(),
compression_type=CompressionTypes.AUTO,
header=None,
footer=None,
*,
max_records_per_shard=None,
max_bytes_per_shard=None,
skip_if_empty=False):
"""Initialize a _TextSink.
Args:
file_path_prefix: The file path to write to. The files written will begin
with this prefix, followed by a shard identifier (see num_shards), and
end in a common extension, if given by file_name_suffix. In most cases,
only this argument is specified and num_shards, shard_name_template, and
file_name_suffix use default values.
file_name_suffix: Suffix for the files written.
append_trailing_newlines: indicate whether this sink should write an
additional newline char after writing each element.
num_shards: The number of files (shards) used for output. If not set, the
service will decide on the optimal number of shards.
Constraining the number of shards is likely to reduce
the performance of a pipeline. Setting this value is not recommended
unless you require a specific number of output files.
shard_name_template: A template string containing placeholders for
the shard number and shard count. When constructing a filename for a
particular shard number, the upper-case letters 'S' and 'N' are
replaced with the 0-padded shard number and shard count respectively.
This argument can be '' in which case it behaves as if num_shards was
set to 1 and only one file will be generated. The default pattern used
is '-SSSSS-of-NNNNN' if None is passed as the shard_name_template.
coder: Coder used to encode each line.
compression_type: Used to handle compressed output files. Typical value
is CompressionTypes.AUTO, in which case the final file path's
extension (as determined by file_path_prefix, file_name_suffix,
num_shards and shard_name_template) will be used to detect the
compression.
header: String to write at beginning of file as a header. If not None and
append_trailing_newlines is set, '\n' will be added.
footer: String to write at the end of file as a footer. If not None and
append_trailing_newlines is set, '\n' will be added.
max_records_per_shard: Maximum number of records to write to any
individual shard.
max_bytes_per_shard: Target maximum number of bytes to write to any
individual shard. This may be exceeded slightly, as a new shard is
created once this limit is hit, but the remainder of a given record, a
subsequent newline, and a footer may cause the actual shard size
to exceed this value. This also tracks the uncompressed,
not compressed, size of the shard.
skip_if_empty: Don't write any shards if the PCollection is empty.
Returns:
A _TextSink object usable for writing.
"""
super().__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
num_shards=num_shards,
shard_name_template=shard_name_template,
coder=coder,
mime_type='text/plain',
compression_type=compression_type,
max_records_per_shard=max_records_per_shard,
max_bytes_per_shard=max_bytes_per_shard,
skip_if_empty=skip_if_empty)
self._append_trailing_newlines = append_trailing_newlines
self._header = header
self._footer = footer
def open(self, temp_path):
file_handle = super().open(temp_path)
if self._header is not None:
file_handle.write(coders.ToBytesCoder().encode(self._header))
if self._append_trailing_newlines:
file_handle.write(b'\n')
return file_handle
def close(self, file_handle):
if self._footer is not None:
file_handle.write(coders.ToBytesCoder().encode(self._footer))
if self._append_trailing_newlines:
file_handle.write(b'\n')
super().close(file_handle)
def display_data(self):
dd_parent = super().display_data()
dd_parent['append_newline'] = DisplayDataItem(
self._append_trailing_newlines, label='Append Trailing New Lines')
return dd_parent
def write_encoded_record(self, file_handle, encoded_value):
"""Writes a single encoded record."""
file_handle.write(encoded_value)
if self._append_trailing_newlines:
file_handle.write(b'\n')
def _create_text_source(
file_pattern=None,
min_bundle_size=None,
compression_type=None,
strip_trailing_newlines=None,
coder=None,
validate=False,
skip_header_lines=None,
delimiter=None,
escapechar=None):
return _TextSource(
file_pattern=file_pattern,
min_bundle_size=min_bundle_size,
compression_type=compression_type,
strip_trailing_newlines=strip_trailing_newlines,
coder=coder,
validate=validate,
skip_header_lines=skip_header_lines,
delimiter=delimiter,
escapechar=escapechar)
class ReadAllFromText(PTransform):
"""A ``PTransform`` for reading a ``PCollection`` of text files.
Reads a ``PCollection`` of text files or file patterns and produces a
``PCollection`` of strings.
Parses a text file as newline-delimited elements, by default assuming
UTF-8 encoding. Supports newline delimiters '\\n' and '\\r\\n'.
If `with_filename` is ``True`` the output will include the file name. This is
similar to ``ReadFromTextWithFilename`` but this ``PTransform`` can be placed
anywhere in the pipeline.
If reading from a text file that that requires a different encoding, you may
provide a custom :class:`~apache_beam.coders.coders.Coder` that encodes and
decodes with the appropriate codec. For example, see the implementation of
:class:`~apache_beam.coders.coders.StrUtf8Coder`.
This does not support ``UTF-16`` or ``UTF-32`` encodings.
This implementation is only tested with batch pipeline. In streaming,
reading may happen with delay due to the limitation in ReShuffle involved.
"""
DEFAULT_DESIRED_BUNDLE_SIZE = 64 * 1024 * 1024 # 64MB
def __init__(
self,
min_bundle_size=0,
desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE,
compression_type=CompressionTypes.AUTO,
strip_trailing_newlines=True,
validate=False,
coder: coders.Coder = coders.StrUtf8Coder(),
skip_header_lines=0,
with_filename=False,
delimiter=None,
escapechar=None,
**kwargs):
"""Initialize the ``ReadAllFromText`` transform.
Args:
min_bundle_size: Minimum size of bundles that should be generated when
splitting this source into bundles. See ``FileBasedSource`` for more
details.
desired_bundle_size: Desired size of bundles that should be generated when
splitting this source into bundles. See ``FileBasedSource`` for more
details.
compression_type: Used to handle compressed input files. Typical value
is ``CompressionTypes.AUTO``, in which case the underlying file_path's
extension will be used to detect the compression.
strip_trailing_newlines: Indicates whether this source should remove
the newline char in each line it reads before decoding that line.
validate: flag to verify that the files exist during the pipeline
creation time.
skip_header_lines: Number of header lines to skip. Same number is skipped
from each source file. Must be 0 or higher. Large number of skipped
lines might impact performance.
coder: Coder used to decode each line.
with_filename: If True, returns a Key Value with the key being the file
name and the value being the actual data. If False, it only returns
the data.
delimiter (bytes) Optional: delimiter to split records.
Must not self-overlap, because self-overlapping delimiters cause
ambiguous parsing.
escapechar (bytes) Optional: a single byte to escape the records
delimiter, can also escape itself.
"""
super().__init__(**kwargs)
self._source_from_file = partial(
_create_text_source,
min_bundle_size=min_bundle_size,
compression_type=compression_type,
strip_trailing_newlines=strip_trailing_newlines,
validate=validate,
coder=coder,
skip_header_lines=skip_header_lines,
delimiter=delimiter,
escapechar=escapechar)
self._desired_bundle_size = desired_bundle_size
self._min_bundle_size = min_bundle_size
self._compression_type = compression_type
self._with_filename = with_filename
self._read_all_files = ReadAllFiles(
True,
self._compression_type,
self._desired_bundle_size,
self._min_bundle_size,
self._source_from_file,
self._with_filename)
def expand(self, pvalue):
return pvalue | 'ReadAllFiles' >> self._read_all_files
class ReadAllFromTextContinuously(ReadAllFromText):
"""A ``PTransform`` for reading text files in given file patterns.
This PTransform acts as a Source and produces continuously a ``PCollection``
of strings.
For more details, see ``ReadAllFromText`` for text parsing settings;
see ``apache_beam.io.fileio.MatchContinuously`` for watching settings.
ReadAllFromTextContinuously is experimental. No backwards-compatibility
guarantees. Due to the limitation on Reshuffle, current implementation does
not scale.
"""
_ARGS_FOR_MATCH = (
'interval',
'has_deduplication',
'start_timestamp',
'stop_timestamp',
'match_updated_files',
'apply_windowing')
_ARGS_FOR_READ = (
'min_bundle_size',
'desired_bundle_size',
'compression_type',
'strip_trailing_newlines',
'validate',
'coder',
'skip_header_lines',
'with_filename',
'delimiter',
'escapechar')
def __init__(self, file_pattern, **kwargs):
"""Initialize the ``ReadAllFromTextContinuously`` transform.
Accepts args for constructor args of both :class:`ReadAllFromText` and
:class:`~apache_beam.io.fileio.MatchContinuously`.
"""
kwargs_for_match = {
k: v
for (k, v) in kwargs.items() if k in self._ARGS_FOR_MATCH
}
kwargs_for_read = {
k: v
for (k, v) in kwargs.items() if k in self._ARGS_FOR_READ
}
kwargs_additinal = {
k: v
for (k, v) in kwargs.items()
if k not in self._ARGS_FOR_MATCH and k not in self._ARGS_FOR_READ
}
super().__init__(**kwargs_for_read, **kwargs_additinal)
self._file_pattern = file_pattern
self._kwargs_for_match = kwargs_for_match
def expand(self, pbegin):
# Importing locally to prevent circular dependency issues.
from apache_beam.io.fileio import MatchContinuously
# TODO(BEAM-14497) always reshuffle once gbk always trigger works.
return (
pbegin
| MatchContinuously(self._file_pattern, **self._kwargs_for_match)
| 'ReadAllFiles' >> self._read_all_files._disable_reshuffle())
class ReadFromText(PTransform):
r"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading text
files.
Parses a text file as newline-delimited elements, by default assuming
``UTF-8`` encoding. Supports newline delimiters ``\n`` and ``\r\n``
or specified delimiter.
If reading from a text file that that requires a different encoding, you may
provide a custom :class:`~apache_beam.coders.coders.Coder` that encodes and
decodes with the appropriate codec. For example, see the implementation of
:class:`~apache_beam.coders.coders.StrUtf8Coder`.
This does not support ``UTF-16`` or ``UTF-32`` encodings.
"""
_source_class = _TextSource
def __init__(
self,
file_pattern=None,
min_bundle_size=0,
compression_type=CompressionTypes.AUTO,
strip_trailing_newlines=True,
coder: coders.Coder = coders.StrUtf8Coder(),
validate=True,
skip_header_lines=0,
delimiter=None,
escapechar=None,
**kwargs):
"""Initialize the :class:`ReadFromText` transform.
Args:
file_pattern (str): The file path to read from as a local file path or a
GCS ``gs://`` path. The path can contain glob characters
(``*``, ``?``, and ``[...]`` sets).
min_bundle_size (int): Minimum size of bundles that should be generated
when splitting this source into bundles. See
:class:`~apache_beam.io.filebasedsource.FileBasedSource` for more
details.
compression_type (str): Used to handle compressed input files.
Typical value is :attr:`CompressionTypes.AUTO
<apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the
underlying file_path's extension will be used to detect the compression.
strip_trailing_newlines (bool): Indicates whether this source should
remove the newline char in each line it reads before decoding that line.
validate (bool): flag to verify that the files exist during the pipeline
creation time.
skip_header_lines (int): Number of header lines to skip. Same number is
skipped from each source file. Must be 0 or higher. Large number of
skipped lines might impact performance.
coder (~apache_beam.coders.coders.Coder): Coder used to decode each line.
delimiter (bytes) Optional: delimiter to split records.
Must not self-overlap, because self-overlapping delimiters cause
ambiguous parsing.
escapechar (bytes) Optional: a single byte to escape the records
delimiter, can also escape itself.
"""
super().__init__(**kwargs)
if file_pattern:
try:
if not os.path.dirname(file_pattern):
file_pattern = os.path.join('.', file_pattern)
except TypeError:
pass
self._source = self._source_class(
file_pattern,
min_bundle_size,
compression_type,
strip_trailing_newlines,
coder,
validate=validate,
skip_header_lines=skip_header_lines,
delimiter=delimiter,
escapechar=escapechar)
def expand(self, pvalue):
return pvalue.pipeline | Read(self._source).with_output_types(
self._source.output_type_hint())
class ReadFromTextWithFilename(ReadFromText):
r"""A :class:`~apache_beam.io.textio.ReadFromText` for reading text
files returning the name of the file and the content of the file.
This class extend ReadFromText class just setting a different
_source_class attribute.
"""
_source_class = _TextSourceWithFilename
class WriteToText(PTransform):
"""A :class:`~apache_beam.transforms.ptransform.PTransform` for writing to
text files."""
def __init__(
self,
file_path_prefix: str,
file_name_suffix='',
append_trailing_newlines=True,
num_shards=0,
shard_name_template: Optional[str] = None,
coder: coders.Coder = coders.ToBytesCoder(),
compression_type=CompressionTypes.AUTO,
header=None,
footer=None,
*,
max_records_per_shard=None,
max_bytes_per_shard=None,
skip_if_empty=False):
r"""Initialize a :class:`WriteToText` transform.
Args:
file_path_prefix (str): The file path to write to. The files written will
begin with this prefix, followed by a shard identifier (see
**num_shards**), and end in a common extension, if given by
**file_name_suffix**. In most cases, only this argument is specified and
**num_shards**, **shard_name_template**, and **file_name_suffix** use
default values.
file_name_suffix (str): Suffix for the files written.
append_trailing_newlines (bool): indicate whether this sink should write
an additional newline char after writing each element.
num_shards (int): The number of files (shards) used for output.
If not set, the service will decide on the optimal number of shards.
Constraining the number of shards is likely to reduce
the performance of a pipeline. Setting this value is not recommended
unless you require a specific number of output files.
shard_name_template (str): A template string containing placeholders for
the shard number and shard count. Currently only ``''`` and
``'-SSSSS-of-NNNNN'`` are patterns accepted by the service.
When constructing a filename for a particular shard number, the
upper-case letters ``S`` and ``N`` are replaced with the ``0``-padded
shard number and shard count respectively. This argument can be ``''``
in which case it behaves as if num_shards was set to 1 and only one file
will be generated. The default pattern used is ``'-SSSSS-of-NNNNN'``.
coder (~apache_beam.coders.coders.Coder): Coder used to encode each line.
compression_type (str): Used to handle compressed output files.
Typical value is :class:`CompressionTypes.AUTO
<apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the
final file path's extension (as determined by **file_path_prefix**,
**file_name_suffix**, **num_shards** and **shard_name_template**) will
be used to detect the compression.
header (str): String to write at beginning of file as a header.
If not :data:`None` and **append_trailing_newlines** is set, ``\n`` will
be added.
footer (str): String to write at the end of file as a footer.
If not :data:`None` and **append_trailing_newlines** is set, ``\n`` will
be added.
max_records_per_shard: Maximum number of records to write to any
individual shard.
max_bytes_per_shard: Target maximum number of bytes to write to any
individual shard. This may be exceeded slightly, as a new shard is
created once this limit is hit, but the remainder of a given record, a
subsequent newline, and a footer may cause the actual shard size
to exceed this value. This also tracks the uncompressed,
not compressed, size of the shard.
skip_if_empty: Don't write any shards if the PCollection is empty.
In case of an empty PCollection, this will still delete existing
files having same file path and not create new ones.
"""
self._sink = _TextSink(
file_path_prefix,
file_name_suffix,
append_trailing_newlines,
num_shards,
shard_name_template,
coder,
compression_type,
header,
footer,
max_records_per_shard=max_records_per_shard,
max_bytes_per_shard=max_bytes_per_shard,
skip_if_empty=skip_if_empty)
def expand(self, pcoll):
return pcoll | Write(self._sink)
try:
import pandas
def append_pandas_args(src, exclude):
def append(dest):
state = None
skip = False
extra_lines = []
for line in src.__doc__.split('\n'):
if line.strip() == 'Parameters':
indent = len(line) - len(line.lstrip())
extra_lines = ['\n\nPandas Parameters']
state = 'append'
continue
elif line.strip().startswith('Returns'):
break
if state == 'append':
if skip:
if line and not line[indent:].startswith(' '):
skip = False
if any(line.strip().startswith(arg + ' : ') for arg in exclude):
skip = True
if not skip:
extra_lines.append(line[indent:])
# Expand title underline due to Parameters -> Pandas Parameters.
extra_lines[1] += '-------'
dest.__doc__ += '\n'.join(extra_lines)
return dest
return append
@append_pandas_args(
pandas.read_csv, exclude=['filepath_or_buffer', 'iterator'])
def ReadFromCsv(path: str, *, splittable: bool = True, **kwargs):
"""A PTransform for reading comma-separated values (csv) files into a
PCollection.
Args:
path (str): The file path to read from. The path can contain glob
characters such as ``*`` and ``?``.
splittable (bool): Whether the csv files are splittable at line
boundaries, i.e. each line of this file represents a complete record.
This should be set to False if single records span multiple lines (e.g.
a quoted field has a newline inside of it). Setting this to false may
disable liquid sharding.
**kwargs: Extra arguments passed to `pandas.read_csv` (see below).
"""
from apache_beam.dataframe.io import ReadViaPandas
return 'ReadFromCsv' >> ReadViaPandas(
'csv', path, splittable=splittable, **kwargs)
@append_pandas_args(
pandas.DataFrame.to_csv, exclude=['path_or_buf', 'index', 'index_label'])
def WriteToCsv(
path: str,
num_shards: Optional[int] = None,
file_naming: Optional['fileio.FileNaming'] = None,
**kwargs):
# pylint: disable=line-too-long
"""A PTransform for writing a schema'd PCollection as a (set of)
comma-separated values (csv) files.
Args:
path (str): The file path to write to. The files written will
begin with this prefix, followed by a shard identifier (see
`num_shards`) according to the `file_naming` parameter.
num_shards (optional int): The number of shards to use in the distributed
write. Defaults to None, letting the system choose an optimal value.
file_naming (optional callable): A file-naming strategy, determining the
actual shard names given their shard number, etc.
See the section on `file naming
<https://beam.apache.org/releases/pydoc/current/apache_beam.io.fileio.html#file-naming>`_
Defaults to `fileio.default_file_naming`, which names files as
`path-XXXXX-of-NNNNN`.
**kwargs: Extra arguments passed to `pandas.Dataframe.to_csv` (see below).
"""
from apache_beam.dataframe.io import WriteViaPandas
if num_shards is not None:
kwargs['num_shards'] = num_shards
if file_naming is not None:
kwargs['file_naming'] = file_naming
return 'WriteToCsv' >> WriteViaPandas('csv', path, index=False, **kwargs)
@append_pandas_args(pandas.read_json, exclude=['path_or_buf'])
def ReadFromJson(
path: str,
*,
orient: str = 'records',
lines: bool = True,
dtype: Union[bool, Dict[str, Any]] = False,
**kwargs):
"""A PTransform for reading json values from files into a PCollection.