Coverage for pycommons / io / csv.py: 100%
297 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 03:04 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 03:04 +0000
1"""
2Tools for CSV output and input.
4Our CSV format tools are intended to read and write structured objects from
5and to a comma-separated-values format. This format consists of one header,
6where the column titles are included (separated by a :const:`CSV_SEPARATOR`)
7and one row per data object, with one value per column.
9Different from other CSV processing tools, we want to
111. Permit that data is extracted from / parsed in form of hierarchically
12 structured objects.
132. Columns have fixed types based on the object definition.
143. The data read and written is strictly validated during the process.
154. Data can be processed in form of a stream and is not necessarily all loaded
16 into memory at once.
175. The order of the columns is unimportant.
186. Useless white space is automatically stripped and ignored.
197. Multiple objects may be written per row, maybe even nested objects, and
20 this is signified by "scope" column titles, e.g., something like
21 `"weight.min"`, `"weight.median"`, ..., `"age.min"`, `"age.median"`, ...
228. Comments may be added to the header or footer of the CSV file that describe
23 the contents of the columns.
25The separator is configurable, but by default set to :const:`CSV_SEPARATOR`.
26Comments start with a comment start with :const:`COMMENT_START` by default.
27"""
28from typing import (
29 Any,
30 Callable,
31 Final,
32 Generator,
33 Iterable,
34 Mapping,
35 TypeVar,
36 cast,
37)
39from pycommons.ds.sequences import reiterable
40from pycommons.strings.chars import NEWLINE, WHITESPACE_OR_NEWLINE
41from pycommons.types import check_int_range, type_error
42from pycommons.version import __version__ as pycommons_version
44#: the default CSV separator
45CSV_SEPARATOR: Final[str] = ";"
47#: everything after this character is considered a comment
48COMMENT_START: Final[str] = "#"
50#: the separator to be used between scopes for nested column prefixes
51SCOPE_SEPARATOR: Final[str] = "."
53#: the type variable for data to be written to CSV or to be read from CSV
54T = TypeVar("T")
56# mypy: disable-error-code=valid-type
57#: the type variable for the CSV output setup
58S = TypeVar("S")
61def csv_scope(scope: str | None, key: str | None) -> str:
62 """
63 Combine a scope and a key.
65 :param scope: the scope, or `None`
66 :param key: the key, or `None`
67 :returns: the scope joined with the key
69 >>> csv_scope("a", "b")
70 'a.b'
71 >>> csv_scope("a", None)
72 'a'
73 >>> csv_scope(None, "b")
74 'b'
76 >>> try:
77 ... csv_scope(1, "b")
78 ... except TypeError as te:
79 ... print(str(te))
80 descriptor '__len__' requires a 'str' object but received a 'int'
82 >>> try:
83 ... csv_scope("a", 1)
84 ... except TypeError as te:
85 ... print(str(te))
86 descriptor '__len__' requires a 'str' object but received a 'int'
88 >>> try:
89 ... csv_scope("a ", "b")
90 ... except ValueError as ve:
91 ... print(str(ve))
92 Invalid csv scope 'a '.
94 >>> try:
95 ... csv_scope("", "b")
96 ... except ValueError as ve:
97 ... print(ve)
98 Invalid csv scope ''.
100 >>> try:
101 ... csv_scope("a", " b")
102 ... except ValueError as ve:
103 ... print(str(ve))
104 Invalid csv key ' b'.
106 >>> try:
107 ... csv_scope("a", "")
108 ... except ValueError as ve:
109 ... print(str(ve))
110 Invalid csv key ''.
112 >>> try:
113 ... csv_scope(None, None)
114 ... except ValueError as ve:
115 ... print(str(ve))
116 Csv scope and key cannot both be None.
117 """
118 if (key is not None) and ((str.__len__(key) <= 0) or (
119 str.strip(key) != key)):
120 raise ValueError(f"Invalid csv key {key!r}.")
121 if scope is None:
122 if key is None:
123 raise ValueError("Csv scope and key cannot both be None.")
124 return key
125 if (str.__len__(scope) <= 0) or (str.strip(scope) != scope):
126 raise ValueError(f"Invalid csv scope {scope!r}.")
127 if key is None:
128 return scope
129 return f"{scope}{SCOPE_SEPARATOR}{key}"
132def csv_read(rows: Iterable[str],
133 setup: Callable[[dict[str, int]], S],
134 parse_row: Callable[[S, list[str]], T],
135 separator: str = CSV_SEPARATOR,
136 comment_start: str | None = COMMENT_START) \
137 -> Generator[T, None, None]:
138 r"""
139 Read (parse) a sequence of strings as CSV data.
141 All lines str :meth:`~str.split` based on the `separator` string and each
142 of the resulting strings is stripped via :meth:`~str.strip`.
143 The first non-empty line of the data is interpreted as header line.
145 This header is passed to the `setup` function in form of a :class:`dict`
146 that maps column titles to column indices. This function then returns an
147 object of setup data. To each of the rows of CSV data, the function
148 `parse_row` is applied. This function receives the object returned by
149 `setup` as first argument and the row as list of strings as second
150 argument. Each line is therefore :meth:`~str.split` (by the CSV separator)
151 and its component :meth:`~str.strip`-ped.
152 It is permitted that a line in the CSV file contains fewer columns than
153 declared in the header. In this case, the missing columns are set to empty
154 strings. Lines that are entirely empty are skipped.
156 If `comment_start` is not none, then all text in a line starting at the
157 first occurence of `comment_start` is discarted before the line is
158 processed.
160 If you want to read more complex CSV structures, then using the class
161 :class:`CsvReader` and its class method :meth:`CsvReader.read` are a more
162 convenient approach. They are wrappers around :func:`csv_read`.
164 :param rows: the rows of text
165 :param setup: a function which creates an object holding the necessary
166 information for row parsing
167 :param parse_row: the unction parsing the rows
168 :param separator: the string used to separate columns
169 :param comment_start: the string starting comments
170 :returns: an :class:`Generator` with the parsed data rows
171 :raises TypeError: if any of the parameters has the wrong type
172 :raises ValueError: if the separator or comment start character are
173 incompatible or if the data has some internal error
175 >>> def _setup(colidx: dict[str, int]) -> dict[str, int]:
176 ... return colidx
178 >>> def _parse_row(colidx: dict[str, int], row: list[str]) -> dict:
179 ... return {x: row[y] for x, y in colidx.items()}
181 >>> text = ["a;b;c;d", "# test", " 1; 2;3;4", " 5 ;6 ", ";8;;9",
182 ... "", "10", "# 11;12"]
184 >>> for p in csv_read(text, _setup, _parse_row):
185 ... print(p)
186 {'a': '1', 'b': '2', 'c': '3', 'd': '4'}
187 {'a': '5', 'b': '6', 'c': '', 'd': ''}
188 {'a': '', 'b': '8', 'c': '', 'd': '9'}
189 {'a': '10', 'b': '', 'c': '', 'd': ''}
191 >>> for p in csv_read((t.replace(";", ",") for t in text), _setup,
192 ... _parse_row, ","):
193 ... print(p)
194 {'a': '1', 'b': '2', 'c': '3', 'd': '4'}
195 {'a': '5', 'b': '6', 'c': '', 'd': ''}
196 {'a': '', 'b': '8', 'c': '', 'd': '9'}
197 {'a': '10', 'b': '', 'c': '', 'd': ''}
199 >>> for p in csv_read((t.replace(";", "\t") for t in text), _setup,
200 ... _parse_row, "\t"):
201 ... print(p)
202 {'a': '1', 'b': '2', 'c': '3', 'd': '4'}
203 {'a': '5', 'b': '6', 'c': '', 'd': ''}
204 {'a': '', 'b': '8', 'c': '', 'd': '9'}
205 {'a': '10', 'b': '', 'c': '', 'd': ''}
207 >>> for p in csv_read(text, _setup, _parse_row, comment_start=None):
208 ... print(p)
209 {'a': '# test', 'b': '', 'c': '', 'd': ''}
210 {'a': '1', 'b': '2', 'c': '3', 'd': '4'}
211 {'a': '5', 'b': '6', 'c': '', 'd': ''}
212 {'a': '', 'b': '8', 'c': '', 'd': '9'}
213 {'a': '10', 'b': '', 'c': '', 'd': ''}
214 {'a': '# 11', 'b': '12', 'c': '', 'd': ''}
216 >>> text = ["a;b;c;d", "# test", " 1; 2;3;4", " 5 ;6 ", "5;6", ";8;;9",
217 ... "", "10", "# 11;12"]
218 >>> for p in csv_read(text, _setup, _parse_row):
219 ... print(p)
220 {'a': '1', 'b': '2', 'c': '3', 'd': '4'}
221 {'a': '5', 'b': '6', 'c': '', 'd': ''}
222 {'a': '5', 'b': '6', 'c': '', 'd': ''}
223 {'a': '', 'b': '8', 'c': '', 'd': '9'}
224 {'a': '10', 'b': '', 'c': '', 'd': ''}
226 >>> try:
227 ... list(csv_read(None, _setup, _parse_row))
228 ... except TypeError as te:
229 ... print(te)
230 rows should be an instance of typing.Iterable but is None.
232 >>> try:
233 ... list(csv_read(1, _setup, _parse_row))
234 ... except TypeError as te:
235 ... print(te)
236 rows should be an instance of typing.Iterable but is int, namely 1.
238 >>> try:
239 ... list(csv_read(text, None, _parse_row))
240 ... except TypeError as te:
241 ... print(te)
242 setup should be a callable but is None.
244 >>> try:
245 ... list(csv_read(text, 1, _parse_row))
246 ... except TypeError as te:
247 ... print(te)
248 setup should be a callable but is int, namely 1.
250 >>> try:
251 ... list(csv_read(text, _setup, None))
252 ... except TypeError as te:
253 ... print(te)
254 parse_row should be a callable but is None.
256 >>> try:
257 ... list(csv_read(text, _setup, 1))
258 ... except TypeError as te:
259 ... print(te)
260 parse_row should be a callable but is int, namely 1.
262 >>> try:
263 ... list(csv_read(text, _setup, _parse_row, None))
264 ... except TypeError as te:
265 ... print(te)
266 descriptor '__len__' requires a 'str' object but received a 'NoneType'
268 >>> try:
269 ... list(csv_read(text, _setup, _parse_row, 1))
270 ... except TypeError as te:
271 ... print(te)
272 descriptor '__len__' requires a 'str' object but received a 'int'
274 >>> try:
275 ... list(csv_read(text, _setup, _parse_row, ""))
276 ... except ValueError as ve:
277 ... print(ve)
278 Invalid separator ''.
280 >>> try:
281 ... list(csv_read(text, _setup, _parse_row, "-", 1))
282 ... except TypeError as te:
283 ... print(te)
284 descriptor '__len__' requires a 'str' object but received a 'int'
286 >>> try:
287 ... list(csv_read(text, _setup, _parse_row, "-", ""))
288 ... except ValueError as ve:
289 ... print(ve)
290 Invalid comment start: ''.
292 >>> try:
293 ... list(csv_read(text, _setup, _parse_row, "-", " "))
294 ... except ValueError as ve:
295 ... print(ve)
296 Invalid comment start: ' '.
298 >>> try:
299 ... list(csv_read(text, _setup, _parse_row, ";", ";"))
300 ... except ValueError as ve:
301 ... print(ve)
302 Invalid comment start: ';'.
304 >>> text2 = ["a;b;a;d", "# test", " 1; 2;3;4", " 5 ;6 ", ";8;;9"]
305 >>> try:
306 ... list(csv_read(text2, _setup, _parse_row))
307 ... except ValueError as ve:
308 ... print(ve)
309 Invalid column headers: ['a', 'b', 'a', 'd'].
311 >>> text2 = ["a;b;c;d", "# test", " 1; 2;3;4", "1;2;3;4;5;6;7", ";8;;9"]
312 >>> try:
313 ... list(csv_read(text2, _setup, _parse_row))
314 ... except ValueError as ve:
315 ... print(ve)
316 Invalid row '1;2;3;4;5;6;7' contains 7 columns, but should have at most 4.
317 """
318 if not isinstance(rows, Iterable):
319 raise type_error(rows, "rows", Iterable)
320 if not callable(setup):
321 raise type_error(setup, "setup", call=True)
322 if not callable(parse_row):
323 raise type_error(parse_row, "parse_row", call=True)
324 if str.__len__(separator) <= 0:
325 raise ValueError(f"Invalid separator {separator!r}.")
326 if (comment_start is not None) and (
327 (str.__len__(comment_start) <= 0) or (
328 str.strip(comment_start) != comment_start) or (
329 comment_start in separator)):
330 raise ValueError(f"Invalid comment start: {comment_start!r}.")
332 col_count: int = -1
334 # cannot strip spaces that are part of the separator
335 strip: Final[Callable[[str], str]] = str.strip
336 stripper: Final[Callable[[str], str]] = strip if ( # type: ignore
337 strip(separator) == separator) else str.rstrip # type: ignore
338 find: Final[Callable[[str, str], int]] = str.find # type: ignore
339 split: Final[Callable[[str, str], list[str]]] = str.split # type: ignore
340 listlen: Final[Callable[[list], int]] = list.__len__ # type: ignore
341 strlen: Final[Callable[[str], int]] = str.__len__ # type: ignore
342 info: S | None = None # the column definition info generated by setup
343 exts: dict[int, list[str]] = {} # the list of extensions
345 for orig_line in rows: # iterate over all the rows
346 line: str = orig_line
347 if comment_start is not None: # delete comment part, if any
348 deli = find(line, comment_start)
349 if deli >= 0:
350 line = line[:deli]
351 line = stripper(line)
352 if strlen(line) <= 0:
353 continue # nothing to do here
355 cols: list[str] = split(line, separator) # split into columns
356 for i, v in enumerate(cols): # string whitespace off columns
357 cols[i] = strip(v)
359 if info is None: # need to load column definition
360 col_count = listlen(cols)
361 colmap: dict[str, int] = {s: i for i, s in enumerate(cols)}
362 if any(strlen(s) <= 0 for s in cols) or (
363 dict.__len__(colmap) != col_count) or (col_count <= 0):
364 raise ValueError(f"Invalid column headers: {cols!r}.")
365 info = setup(colmap) # obtain the column setup object
366 del colmap # column map no longer needed
367 continue # proceed with next line
369 count: int = listlen(cols) # get number of columns
370 if count > col_count: # too many columns, throw error
371 raise ValueError(
372 f"Invalid row {orig_line!r} contains {count} columns, but "
373 f"should have at most {col_count}.")
374 if count < col_count: # do we need to add dummy columns?
375 add: int = col_count - count # number of needed columns
376 if add not in exts: # check if in cache
377 exts[add] = [""] * add # add to cache
378 cols.extend(exts[add])
379 yield parse_row(info, cols)
382def pycommons_footer_bottom_comments(
383 _: Any, additional: str | None = None) -> Iterable[str]:
384 """
385 Print standard footer bottom comments for `pycommons`.
387 :param _: ignored
388 :param additional: an optional line of additional comments
389 :returns: an :class:`Iterable` of standard pycommons comments
391 >>> for p in pycommons_footer_bottom_comments(""):
392 ... print(p[:70])
393 This CSV output has been created using the versatile CSV API of pycomm
394 You can find pycommons at https://thomasweise.github.io/pycommons.
396 >>> for p in pycommons_footer_bottom_comments("", "Statistics are cool."):
397 ... print(p[:70])
398 This CSV output has been created using the versatile CSV API of pycomm
399 Statistics are cool.
400 You can find pycommons at https://thomasweise.github.io/pycommons.
401 """
402 yield ("This CSV output has been created using the versatile CSV API of "
403 f"pycommons.io.csv, version {pycommons_version}.")
404 if (additional is not None) and (str.__len__(additional) > 0):
405 yield additional
406 yield "You can find pycommons at https://thomasweise.github.io/pycommons."
409def __print_comments(comments: Iterable[str] | None,
410 comment_start: str, comment_type: str,
411 empty_first_row: bool) -> Generator[str, None, None]:
412 r"""
413 Produce the comments after formatting and checking them.
415 :param comments: the comment source
416 :param comment_start: the comment start string
417 :param comment_type: the comment type
418 :param empty_first_row: should we put an empty first row?
419 :returns: the generator of the comment strings
420 :raises TypeError: if an argument is of the wrong type
421 :raises ValueError: if comments cannot be placed or contain newlines
423 >>> col = ["", "First comment.", "Second comment.", "", "",
424 ... " Third comment. "]
425 >>> for p in __print_comments(col, "#", "header", False):
426 ... print(p)
427 # First comment.
428 # Second comment.
429 #
430 # Third comment.
432 >>> col.clear()
433 >>> list(__print_comments(col, "#", "header", True))
434 []
436 >>> col = ["", "First comment.", "Second comment.", "", "",
437 ... " Third comment. "]
438 >>> for p in __print_comments(col, "#", "header", True):
439 ... print(p)
440 #
441 # First comment.
442 # Second comment.
443 #
444 # Third comment.
446 >>> col = ["First comment.", "Second comment.", "", "",
447 ... " Third comment. "]
448 >>> for p in __print_comments(col, "#", "header", True):
449 ... print(p)
450 #
451 # First comment.
452 # Second comment.
453 #
454 # Third comment.
456 >>> col = ["", "", "First comment.", "Second comment.", "", "",
457 ... " Third comment. "]
458 >>> for p in __print_comments(col, "#", "header", True):
459 ... print(p)
460 #
461 # First comment.
462 # Second comment.
463 #
464 # Third comment.
466 >>> list(__print_comments([], "#", "header", False))
467 []
468 >>> list(__print_comments([""], "#", "header", False))
469 []
470 >>> list(__print_comments(["", ""], "#", "header", False))
471 []
472 >>> list(__print_comments([], "#", "header", True))
473 []
474 >>> list(__print_comments([""], "#", "header", True))
475 []
476 >>> list(__print_comments(["", ""], "#", "header", True))
477 []
479 >>> list(__print_comments(None, "#", "header", True))
480 []
482 >>> try:
483 ... list(__print_comments(1, "#", "header", True))
484 ... except TypeError as te:
485 ... print(te)
486 comments should be an instance of typing.Iterable but is int, namely 1.
488 >>> try:
489 ... list(__print_comments(["", 1, "Second comment."], "x", "header",
490 ... False))
491 ... except TypeError as te:
492 ... print(te)
493 descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
495 >>> try:
496 ... list(__print_comments(["", None, "Second."], "x", "header",
497 ... False))
498 ... except TypeError as te:
499 ... print(te)
500 descriptor 'strip' for 'str' objects doesn't apply to a 'NoneType' object
502 >>> try:
503 ... list(__print_comments(["Hello", "x\ny", "z"], "#", "header",
504 ... False))
505 ... except ValueError as ve:
506 ... print(ve)
507 A header comment must not contain a newline character, but 'x\ny' does.
508 """
509 if comments is None:
510 return
511 if not isinstance(comments, Iterable):
512 raise type_error(comments, "comments", Iterable)
513 not_first = False
514 for cmt in comments:
515 xcmt = str.strip(cmt) # strip and typecheck
516 if str.__len__(xcmt) <= 0:
517 if not_first:
518 yield comment_start
519 empty_first_row = not_first = False
520 continue
521 if any(map(xcmt.__contains__, NEWLINE)):
522 raise ValueError(f"A {comment_type} comment must not contain "
523 f"a newline character, but {cmt!r} does.")
524 not_first = True
525 if empty_first_row:
526 yield comment_start
527 empty_first_row = False
528 yield f"{comment_start} {xcmt}"
531def __default_row(s: Iterable[str], t: Any) -> Iterable[str]:
532 """
533 Generate row data in the default way.
535 :param s: the setup object: an :class:`Iterable` of string
536 :param t: the row object
537 :returns: an :class:`Iterable` of string
539 >>> list(__default_row(("a", "b"), ("1", "2")))
540 ['1', '2']
542 >>> list(__default_row(("a", "b"), {"b": 45, "c": 44, "a": 6}))
543 ['6', '45']
544 """
545 if isinstance(t, Mapping):
546 return (str(t[ss]) if ss in t else "" for ss in s)
547 return map(str, cast("Iterable[Any]", t))
550def csv_write(
551 data: Iterable[T],
552 column_titles: Iterable[str] | Callable[[S], Iterable[str]] =
553 lambda t: cast("Iterable[str]", t),
554 get_row: Callable[[S, T], Iterable[str]] =
555 cast("Callable[[S, T], Iterable[str]]", __default_row),
556 setup: Callable[[Iterable[T]], S] = lambda t: cast("S", t),
557 separator: str = CSV_SEPARATOR,
558 comment_start: str | None = COMMENT_START,
559 header_comments:
560 Iterable[str] | Callable[[S], Iterable[str] | None] | None = None,
561 footer_comments:
562 Iterable[str] | Callable[[S], Iterable[str] | None] | None = None,
563 footer_bottom_comments: Iterable[str] | Callable[[
564 S], Iterable[str] | None] | None =
565 pycommons_footer_bottom_comments) -> Generator[str, None, None]:
566 r"""
567 Produce a sequence of CSV formatted text.
569 The data is provided in form of a :class:`Iterable`. In a first step, the
570 function `setup` is invoked and applied to the `data` :class:`Iterable`.
571 It can return an object that sort of stores the structure of the data,
572 e.g., which columns should be generated and how they should be formatted.
574 `column_titles` can either be an :class:`Iterable` with the column titles
575 or a :class:`Callable`. In the latter case, the object generated by `setup`
576 is passed to `column_titles`, which should generate the column titles.
577 These titles are :meth:`~str.strip`-ped and concatenated to use the column
578 `separator` string and the resulting header string is passed to `consumer`.
580 Then, for each element `e` in the `data` :class:`Iterable`, the function
581 `get_row` is invoked. This function receives the setup information object
582 (previously returned by `setup`). It should generate one string per
583 column. These strings are then each :meth:`~str.strip`-ped and
584 concatenated using the column `separator` string. All trailing `separator`
585 are removed, but if all strings are empty, at least a single `separator`
586 is retained. The resulting string (per row) is again passed to `consumer`.
588 Additionally, `header_comments` and `footer_comments` can be `None`, to
589 not include any such comments, an :class:`Iterable` of comments, or
590 functions to generate row comments as :class:`str`. These are then
591 prepended or appends as comment rows before or after all of the
592 above, respectively. In that case, `comment_start` is prepended to each
593 line. If `comment_start is None`, then these comments are not printed.
594 `footer_bottom_comments` provides means to print additional comments
595 after the footer comments `comment_start is not None`.
597 If you create nested CSV formats, i.e., such where the `setup` function
598 invokes the `setup` function of other data, and the data that you receive
599 could come from a :class:`~typing.Generator` (or some other one-shot
600 :class:`~typing.Iterator`), then you need to make sure to solidify the
601 iterable data with :func:`~pycommons.ds.sequences.reiterable`. The
602 structure of our CSV output is that `setup` is first invoked and then
603 `get_row`. If `setup` already consumes the data away, then `get_row` may
604 print nothing. Alternatively, if you apply multiple `setup` routines to
605 the same data that extract different information, then the first `setup`
606 run may consume all the data, leaving nothing for the second one.
608 If you want to write more complex CSV structures, then implementing the
609 class :class:`CsvWriter` and using its class method
610 :meth:`CsvWriter.write` may be a more convenient solution.
611 They are wrappers around :func:`csv_write`.
613 :param data: the iterable of data to be written
614 :param column_titles: get the column titles
615 :param get_row: transform a row of data into a list of strings
616 :param setup: the setup function that computes how the data should be
617 represented
618 :param separator: the string used to separate columns
619 :param comment_start: the string starting comments
620 :param header_comments: get the comments to be placed above the CSV
621 header row -- only invoked if `comment_start is not None`.
622 :param footer_comments: get the comments to be placed after the last
623 row -- only invoked if `comment_start is not None`.
624 :param footer_bottom_comments: get the footer bottom comments, i.e.,
625 comments to be printed after all other footers. These commonts may
626 include something like the version information of the software used.
627 This function is only invoked if `comment_start is not None`.
628 :returns: a :class:`Generator` with the rows of CSV text
629 :raises TypeError: if any of the parameters has the wrong type
630 :raises ValueError: if the separator or comment start character are
631 incompatible or if the data has some internal error
633 >>> dd = [{"a": 1, "c": 2}, {"b": 6, "c": 8},
634 ... {"a": 4, "d": 12, "b": 3}, {}]
636 >>> def __setup(datarows) -> list[str]:
637 ... return sorted({dkey for datarow in datarows for dkey in datarow})
639 >>> def __get_row(keyd: list[str], row: dict[str, int]) -> Iterable[str]:
640 ... return map(str, (row.get(key, "") for key in keyd))
642 >>> def __get_header_cmt(keyd: list[str]) -> list[str]:
643 ... return ["This is a header comment.", " We have two of it. "]
645 >>> def __get_footer_cmt(keyd: list[str]) -> list[str]:
646 ... return [" This is a footer comment."]
648 >>> for p in csv_write(dd, lambda x: x, __default_row, __setup,
649 ... ";", "#", __get_header_cmt, __get_footer_cmt,
650 ... lambda _: ()):
651 ... print(p)
652 # This is a header comment.
653 # We have two of it.
654 a;b;c;d
655 1;;2
656 ;6;8
657 4;3;;12
658 ;
659 # This is a footer comment.
661 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup,
662 ... ";", "#", __get_header_cmt, __get_footer_cmt):
663 ... print(p[:70])
664 # This is a header comment.
665 # We have two of it.
666 a;b;c;d
667 1;;2
668 ;6;8
669 4;3;;12
670 ;
671 # This is a footer comment.
672 #
673 # This CSV output has been created using the versatile CSV API of pyco
674 # You can find pycommons at https://thomasweise.github.io/pycommons.
676 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup,
677 ... ",", "@@", __get_header_cmt, __get_footer_cmt,
678 ... lambda _: ()):
679 ... print(p)
680 @@ This is a header comment.
681 @@ We have two of it.
682 a,b,c,d
683 1,,2
684 ,6,8
685 4,3,,12
686 ,
687 @@ This is a footer comment.
689 >>> try:
690 ... list(csv_write(None, lambda x: x, __get_row, __setup,
691 ... ";", "#", __get_header_cmt, __get_footer_cmt))
692 ... except TypeError as te:
693 ... print(str(te)[:60])
694 source should be an instance of any in {typing.Iterable, typ
696 >>> try:
697 ... list(csv_write(1, lambda x: x, __get_row, __setup,
698 ... ";", "#", __get_header_cmt, __get_footer_cmt))
699 ... except TypeError as te:
700 ... print(str(te)[:60])
701 source should be an instance of any in {typing.Iterable, typ
703 >>> try:
704 ... list(csv_write(dd, None, __get_row, __setup,
705 ... ";", "#", __get_header_cmt, __get_footer_cmt))
706 ... except TypeError as te:
707 ... print(str(te)[:70])
708 column_titles should be an instance of typing.Iterable or a callable b
710 >>> try:
711 ... list(csv_write(dd, 1, __get_row, __setup,
712 ... ";", "#", __get_header_cmt, __get_footer_cmt))
713 ... except TypeError as te:
714 ... print(str(te)[:70])
715 column_titles should be an instance of typing.Iterable or a callable b
717 >>> try:
718 ... list(csv_write(dd, lambda x: x, None, __setup,
719 ... ";", "#", __get_header_cmt, __get_footer_cmt))
720 ... except TypeError as te:
721 ... print(te)
722 get_row should be a callable but is None.
724 >>> try:
725 ... list(csv_write(dd, lambda x: x, 1, __setup,
726 ... ";", "#", __get_header_cmt, __get_footer_cmt))
727 ... except TypeError as te:
728 ... print(te)
729 get_row should be a callable but is int, namely 1.
731 >>> try:
732 ... list(csv_write(dd, lambda x: x, __get_row, None,
733 ... ";", "#", __get_header_cmt, __get_footer_cmt))
734 ... except TypeError as te:
735 ... print(te)
736 setup should be a callable but is None.
738 >>> try:
739 ... list(csv_write(dd, lambda x: x, __get_row, 1,
740 ... ";", "#", __get_header_cmt, __get_footer_cmt))
741 ... except TypeError as te:
742 ... print(te)
743 setup should be a callable but is int, namely 1.
745 >>> try:
746 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
747 ... None, "#", __get_header_cmt, __get_footer_cmt))
748 ... except TypeError as te:
749 ... print(te)
750 descriptor '__len__' requires a 'str' object but received a 'NoneType'
752 >>> try:
753 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
754 ... 1, "#", __get_header_cmt, __get_footer_cmt))
755 ... except TypeError as te:
756 ... print(te)
757 descriptor '__len__' requires a 'str' object but received a 'int'
759 >>> try:
760 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
761 ... ";", 1, __get_header_cmt, __get_footer_cmt))
762 ... except TypeError as te:
763 ... print(te)
764 descriptor '__len__' requires a 'str' object but received a 'int'
766 >>> try:
767 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
768 ... ";", "#", 1, __get_footer_cmt))
769 ... except TypeError as te:
770 ... print(str(te)[:70])
771 header_comments should be an instance of typing.Iterable or a callable
773 >>> try:
774 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
775 ... ";", "", __get_header_cmt, __get_footer_cmt))
776 ... except ValueError as ve:
777 ... print(ve)
778 Invalid comment start: ''.
780 >>> try:
781 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
782 ... ";", " ", __get_header_cmt, __get_footer_cmt))
783 ... except ValueError as ve:
784 ... print(ve)
785 Invalid comment start: ' '.
787 >>> try:
788 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
789 ... ";", "# ", __get_header_cmt, __get_footer_cmt))
790 ... except ValueError as ve:
791 ... print(ve)
792 Invalid comment start: '# '.
794 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup, ";",
795 ... None, None):
796 ... print(p)
797 a;b;c;d
798 1;;2
799 ;6;8
800 4;3;;12
801 ;
803 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup,
804 ... ";", None, __get_header_cmt):
805 ... print(p)
806 a;b;c;d
807 1;;2
808 ;6;8
809 4;3;;12
810 ;
812 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup,
813 ... ";", None, footer_comments=__get_footer_cmt,
814 ... footer_bottom_comments= None):
815 ... print(p)
816 a;b;c;d
817 1;;2
818 ;6;8
819 4;3;;12
820 ;
822 >>> try:
823 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
824 ... ";", "#", __get_header_cmt, 1))
825 ... except TypeError as te:
826 ... print(str(te)[:70])
827 footer_comments should be an instance of typing.Iterable or a callable
829 >>> def __err_cmt_1(keyd: list[str]) -> Iterable[str]:
830 ... return ("This is\n a comment with error.", )
832 >>> try:
833 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
834 ... ";", "#", __err_cmt_1))
835 ... except ValueError as ve:
836 ... print(str(ve)[:58])
837 A header comment must not contain a newline character, but
839 >>> try:
840 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
841 ... ";", "#", footer_comments=__err_cmt_1,
842 ... footer_bottom_comments=None))
843 ... except ValueError as ve:
844 ... print(str(ve)[:58])
845 A footer comment must not contain a newline character, but
847 >>> def __empty_cmt(keyd: list[str]) -> Iterable[str]:
848 ... return (" ", )
850 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup,
851 ... ";", "#", __empty_cmt, __empty_cmt, __empty_cmt):
852 ... print(p)
853 a;b;c;d
854 1;;2
855 ;6;8
856 4;3;;12
857 ;
859 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup,
860 ... ";", "#", footer_comments=__empty_cmt,
861 ... footer_bottom_comments=lambda _: ()):
862 ... print(p)
863 a;b;c;d
864 1;;2
865 ;6;8
866 4;3;;12
867 ;
869 >>> def __error_column_titles_1(keyd: list[str]) -> Iterable[str]:
870 ... return ()
872 >>> try:
873 ... list(csv_write(dd, __error_column_titles_1, __get_row,
874 ... __setup, ";", "#"))
875 ... except ValueError as ve:
876 ... print(ve)
877 Cannot have zero columns.
879 >>> dde = dd.copy()
880 >>> dde.append(None)
881 >>> try:
882 ... list(csv_write(dde, lambda x: x, __get_row,
883 ... lambda _: ["a", "b", "c", "d"],
884 ... ";", "#", footer_comments=__empty_cmt,
885 ... footer_bottom_comments=lambda _: ()))
886 ... except TypeError as te:
887 ... print(te)
888 data element should be an instance of object but is None.
890 >>> def __error_column_titles_2(keyd: list[str]) -> Iterable[str]:
891 ... return (" ", )
893 >>> try:
894 ... list(csv_write(dd, __error_column_titles_2, __get_row, __setup,
895 ... ";", "#"))
896 ... except ValueError as ve:
897 ... print(str(ve)[:50])
898 Invalid column title ' ', must neither be empty no
900 >>> def __error_column_titles_3(keyd: list[str]) -> Iterable[str]:
901 ... return ("bla\nblugg", )
903 >>> try:
904 ... list(csv_write(dd, __error_column_titles_3, __get_row, __setup,
905 ... ";", "#"))
906 ... except ValueError as ve:
907 ... print(str(ve)[:50])
908 Invalid column title 'bla\nblugg', must neither be
910 >>> def __error_column_titles_4(keyd: list[str]) -> Iterable[str]:
911 ... return (None, )
913 >>> try:
914 ... list(csv_write(dd, __error_column_titles_4, __get_row, __setup,
915 ... ";", "#"))
916 ... except TypeError as te:
917 ... print(te)
918 descriptor 'strip' for 'str' objects doesn't apply to a 'NoneType' object
920 >>> def __error_column_titles_5(keyd: list[str]) -> Iterable[str]:
921 ... return (1, )
923 >>> try:
924 ... list(csv_write(dd, __error_column_titles_5, __get_row, __setup,
925 ... ";", "#"))
926 ... except TypeError as te:
927 ... print(te)
928 descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
930 >>> def __error_column_titles_6(keyd: list[str]) -> Iterable[str]:
931 ... return ("a", "b", "c", "a")
933 >>> try:
934 ... list(csv_write(dd, __error_column_titles_6, __get_row, __setup,
935 ... ";", "#"))
936 ... except ValueError as ve:
937 ... print(ve)
938 Cannot have duplicated columns: ['a', 'b', 'c', 'a'].
940 >>> def __error_column_titles_7(keyd: list[str]) -> Iterable[str]:
941 ... return ("a", "b", "c;4")
943 >>> try:
944 ... list(csv_write(dd, __error_column_titles_7, __get_row, __setup,
945 ... ";", "#"))
946 ... except ValueError as ve:
947 ... print(str(ve)[:49])
948 Invalid column title 'c;4', must neither be empty
950 >>> def __error_column_titles_8(keyd: list[str]) -> Iterable[str]:
951 ... return ("a", "b#x", "c")
953 >>> try:
954 ... list(csv_write(dd, __error_column_titles_8, __get_row, __setup,
955 ... ";", "#"))
956 ... except ValueError as ve:
957 ... print(str(ve)[:49])
958 Invalid column title 'b#x', must neither be empty
960 >>> def __error_row_1(keyd: list[str], row: dict[str, int]):
961 ... return ("bla", None, "blubb")
963 >>> try:
964 ... list(csv_write(dd, lambda x: x, __error_row_1,
965 ... __setup, ";", "#",
966 ... footer_bottom_comments=lambda _, __: None))
967 ... except TypeError as te:
968 ... print(te)
969 descriptor 'strip' for 'str' objects doesn't apply to a 'NoneType' object
971 >>> def __error_row_2(keyd: list[str], row: dict[str, int]):
972 ... return ("bla", 2.3, "blubb")
974 >>> try:
975 ... list(csv_write(dd, lambda x: x, __error_row_2,
976 ... __setup, ";", "#",
977 ... footer_bottom_comments=lambda _: None))
978 ... except TypeError as te:
979 ... print(te)
980 descriptor 'strip' for 'str' objects doesn't apply to a 'float' object
982 >>> def __error_row_3(keyd: list[str], row: dict[str, int]):
983 ... return ("bla", "x\ny", "blubb")
985 >>> try:
986 ... list(csv_write(dd, lambda x: x, __error_row_3,
987 ... __setup, ";", "#",
988 ... footer_bottom_comments=lambda _: None))
989 ... except ValueError as ve:
990 ... print(str(ve)[:50])
991 Invalid column value 'x\ny', cannot contain any of
993 >>> def __error_row_4(keyd: list[str], row: dict[str, int]):
994 ... return ("bla", "x#", "blubb")
996 >>> try:
997 ... list(csv_write(dd, lambda x: x, __error_row_4,
998 ... __setup, ";", "#",
999 ... footer_bottom_comments=lambda _: None))
1000 ... except ValueError as ve:
1001 ... print(str(ve)[:50])
1002 Invalid column value 'x#', cannot contain any of [
1004 >>> def __error_row_5(keyd: list[str], row: dict[str, int]):
1005 ... return ("bla", "x;#", "blubb")
1007 >>> try:
1008 ... list(csv_write(dd, lambda x: x, __error_row_5,
1009 ... __setup, ";", "#"))
1010 ... except ValueError as ve:
1011 ... print(str(ve)[:49])
1012 Invalid column value 'x;#', cannot contain any of
1014 >>> def __error_column_titles_9(keyd: list[str]) -> Iterable[str]:
1015 ... return ("a", )
1017 >>> def __error_row_6(keyd: list[str], row: dict[str, int]):
1018 ... return ("", )
1020 >>> try:
1021 ... list(csv_write(dd, __error_column_titles_9, __error_row_6,
1022 ... __setup, ";", "#"))
1023 ... except ValueError as ve:
1024 ... print(ve)
1025 Cannot have empty row in a single-column format, but got [''].
1027 >>> def __error_row_7(keyd: list[str], row: dict[str, int]):
1028 ... return ("x", "y")
1030 >>> try:
1031 ... list(csv_write(dd, __error_column_titles_9, __error_row_7,
1032 ... __setup, ";", "#"))
1033 ... except ValueError as ve:
1034 ... print(ve)
1035 Too many columns in ['x', 'y'], should be 1.
1037 >>> try:
1038 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
1039 ... "", "#", footer_comments=__err_cmt_1))
1040 ... except ValueError as ve:
1041 ... print(ve)
1042 Invalid separator ''.
1044 >>> try:
1045 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
1046 ... "x", "#", footer_comments=1))
1047 ... except TypeError as te:
1048 ... print(str(te)[:70])
1049 footer_comments should be an instance of typing.Iterable or a callable
1051 >>> try:
1052 ... list(csv_write(dd, lambda x: x, __get_row, __setup,
1053 ... "x", "#", footer_bottom_comments=1))
1054 ... except TypeError as te:
1055 ... print(str(te)[:70])
1056 footer_bottom_comments should be an instance of typing.Iterable or a c
1058 >>> ddx = [{"a": 1, "c": 2}, None,
1059 ... {"a": 4, "d": 12, "b": 3}, {}]
1060 >>> def __error_row_9(_, __):
1061 ... return ("1", "2", "3", "4")
1062 >>> def __error_row_10(_):
1063 ... __error_row_9(1, 2)
1065 >>> try:
1066 ... list(csv_write(ddx, __error_row_10,
1067 ... __error_row_9, lambda x: x, ";", "#"))
1068 ... except TypeError as te:
1069 ... print(te)
1070 'NoneType' object is not iterable
1071 """
1072 if not (isinstance(column_titles, Iterable) or callable(column_titles)):
1073 raise type_error(column_titles, "column_titles", Iterable, call=True)
1074 if not callable(get_row):
1075 raise type_error(get_row, "get_row", call=True)
1076 if not callable(setup):
1077 raise type_error(setup, "setup", call=True)
1078 if str.__len__(separator) <= 0:
1079 raise ValueError(f"Invalid separator {separator!r}.")
1080 forbidden_marker: Final[set[str]] = set(NEWLINE)
1081 forbidden_marker.add(separator)
1082 if comment_start is not None:
1083 if (str.__len__(comment_start) <= 0) or (
1084 str.strip(comment_start) != comment_start) or (
1085 comment_start in separator):
1086 raise ValueError(f"Invalid comment start: {comment_start!r}.")
1087 forbidden_marker.add(comment_start)
1088 if (header_comments is not None) and (not (isinstance(
1089 header_comments, Iterable) or callable(header_comments))):
1090 raise type_error(
1091 header_comments, "header_comments", Iterable, call=True)
1092 if (footer_comments is not None) and (not (isinstance(
1093 footer_comments, Iterable) or callable(footer_comments))):
1094 raise type_error(
1095 footer_comments, "footer_comments", Iterable, call=True)
1096 if (footer_bottom_comments is not None) and (not (isinstance(
1097 footer_bottom_comments, Iterable) or callable(
1098 footer_bottom_comments))):
1099 raise type_error(footer_bottom_comments,
1100 "footer_bottom_comments", Iterable, call=True)
1102 data = reiterable(data) # make sure we can iterate over the data twice
1103 setting: Final[S] = setup(data)
1104 forbidden: Final[list[str]] = sorted(forbidden_marker)
1106 # first put header comments
1107 if (comment_start is not None) and (header_comments is not None):
1108 yield from __print_comments(
1109 header_comments(setting) if callable(header_comments)
1110 else header_comments, comment_start, "header", False)
1112 # now process the column titles
1113 collected: list[str] = list(
1114 column_titles(setting) if callable(column_titles) else column_titles)
1115 col_count: Final[int] = list.__len__(collected)
1116 if col_count <= 0:
1117 raise ValueError("Cannot have zero columns.")
1118 for i, col in enumerate(collected):
1119 collected[i] = xcol = str.strip(col)
1120 if (str.__len__(xcol) <= 0) or any(map(xcol.__contains__, forbidden)):
1121 raise ValueError(f"Invalid column title {col!r}, must neither be"
1122 f" empty nor contain any of {forbidden!r}.")
1123 if set.__len__(set(collected)) != col_count:
1124 raise ValueError(f"Cannot have duplicated columns: {collected!r}.")
1125 yield separator.join(collected)
1127 # now do the single rows
1128 for element in data:
1129 if element is None:
1130 raise type_error(element, "data element", object)
1131 collected.clear()
1132 collected.extend(get_row(setting, element))
1133 list_len: int = list.__len__(collected)
1134 if list_len > col_count:
1135 raise ValueError(
1136 f"Too many columns in {collected!r}, should be {col_count}.")
1137 last_non_empty: int = -1
1138 for i, col in enumerate(collected):
1139 collected[i] = xcol = str.strip(col)
1140 if any(map(xcol.__contains__, forbidden)):
1141 raise ValueError(f"Invalid column value {col!r}, cannot "
1142 f"contain any of {forbidden!r}.")
1143 if str.__len__(xcol) > 0:
1144 last_non_empty = i + 1
1145 if last_non_empty < list_len:
1146 if last_non_empty <= 0:
1147 if col_count <= 1:
1148 raise ValueError(
1149 f"Cannot have empty row in a single-column format, "
1150 f"but got {collected!r}.")
1151 yield separator
1152 continue
1153 del collected[last_non_empty:]
1154 yield separator.join(collected)
1156 # finally put footer comments
1157 if comment_start is not None:
1158 empty_next: bool = False
1159 if footer_comments is not None:
1160 for c in __print_comments(footer_comments(setting) if callable(
1161 footer_comments) else footer_comments, comment_start,
1162 "footer", False):
1163 yield c
1164 empty_next = True
1165 if footer_bottom_comments is not None:
1166 yield from __print_comments(
1167 footer_bottom_comments(setting) if callable(
1168 footer_bottom_comments) else footer_bottom_comments,
1169 comment_start, "footer bottom", empty_next)
1172def csv_str_or_none(data: list[str | None] | None,
1173 index: int | None) -> str | None:
1174 """
1175 Get a string or `None` from a data row.
1177 This function is a shortcut for when data elements or columns are
1178 optional. If `index` is `None` or outside of the valid index range of the
1179 list `data`, then `None` is returned. If `data` itself is `None` or the
1180 element at index `index` is the empty string, then `None` is returned.
1181 Only if `data` and `index` are both not `None` and `index` is a valid
1182 index into `data` and the element at index `index` in `data` is not the
1183 empty string, then this element is returned. In other words, this is a
1184 very tolerant function to handle optional data and to return `None` if the
1185 data is not present. The function :func:`csv_val_or_none` further extends
1186 this function by converting the data to another data type if it is
1187 present.
1189 :param data: the data
1190 :param index: the index, if any
1191 :returns: the string or nothing
1193 >>> ddd = ["a", "b", "", "d"]
1194 >>> print(csv_str_or_none(ddd, 0))
1195 a
1196 >>> print(csv_str_or_none(ddd, 1))
1197 b
1198 >>> print(csv_str_or_none(ddd, 2))
1199 None
1200 >>> print(csv_str_or_none(ddd, 3))
1201 d
1202 >>> print(csv_str_or_none(ddd, None))
1203 None
1204 >>> print(csv_str_or_none(ddd, 10))
1205 None
1206 >>> print(csv_str_or_none(ddd, -1))
1207 None
1208 >>> print(csv_str_or_none(None, 0))
1209 None
1210 """
1211 if (index is None) or (data is None):
1212 return None
1213 if 0 <= index <= list.__len__(data):
1214 d: str = data[index]
1215 return None if (d is None) or (str.__len__(d) <= 0) else d
1216 return None
1219#: a type variable for :func:`csv_val_or_none`.
1220U = TypeVar("U")
1223def csv_val_or_none(data: list[str | None] | None, index: int | None,
1224 conv: Callable[[str], U]) -> U | None:
1225 """
1226 Get a value or `None`.
1228 See :func:`csv_str_or_none` allows us to extract an optional data element
1229 from a CSV row and get `None` if the element is not present or if the
1230 `index` is `None` or outside of the valid range. In case the data is
1231 present and not the empty string, then the function `conv` is invoked to
1232 convert it to another value. Otherwise, `None` is returned.
1234 :param data: the data
1235 :param index: the index
1236 :param conv: the conversation function
1237 :returns: the object
1239 >>> ddd = ["11", "22", "", "33"]
1240 >>> print(csv_val_or_none(ddd, 0, int))
1241 11
1242 >>> print(csv_val_or_none(ddd, 1, int))
1243 22
1244 >>> print(csv_val_or_none(ddd, 2, int))
1245 None
1246 >>> print(csv_val_or_none(ddd, 3, int))
1247 33
1248 >>> print(csv_val_or_none(ddd, None, int))
1249 None
1250 """
1251 t: Final[str | None] = csv_str_or_none(data, index)
1252 return None if t is None else conv(t)
1255def csv_column(columns: dict[str, int], key: str,
1256 remove_col: bool = True) -> int:
1257 """
1258 Get the index of a CSV column.
1260 This function will extract the index of a column from a column description
1261 map. The index will be checked whether it is in a valid range and
1262 returned. If no column fitting to `key` exists, this function will throw a
1263 `KeyError`. If `remove_col` is `True` and a column fitting to `key`
1264 exists, then this column will be deleted from `columns`.
1266 :param columns: the columns set
1267 :param key: the key
1268 :param remove_col: should we remove the column?
1269 :returns: the column
1270 :raises TypeError: if any of the parameters is not of the prescribed type
1271 :raises ValueError: if the column or key are invalid
1272 :raises KeyError: if no column of the name `key` eixists
1274 >>> csv_column({"a": 5}, "a")
1275 5
1277 >>> cols = {"a": 5, "b": 7}
1278 >>> csv_column(cols, "a", False)
1279 5
1280 >>> cols
1281 {'a': 5, 'b': 7}
1282 >>> csv_column(cols, "a", True)
1283 5
1284 >>> cols
1285 {'b': 7}
1287 >>> try:
1288 ... csv_column({"a": 5}, "b")
1289 ... except KeyError as ke:
1290 ... print(ke)
1291 'b'
1293 >>> try:
1294 ... csv_column({"a": 5}, "a", "3")
1295 ... except TypeError as te:
1296 ... print(te)
1297 remove_col should be an instance of bool but is str, namely '3'.
1299 >>> try:
1300 ... csv_column(None, "b")
1301 ... except TypeError as te:
1302 ... print(str(te)[:50])
1303 descriptor '__getitem__' for 'dict' objects doesn'
1305 >>> try:
1306 ... csv_column({"a": 5}, 1)
1307 ... except TypeError as te:
1308 ... print(te)
1309 descriptor '__len__' requires a 'str' object but received a 'int'
1311 >>> try:
1312 ... csv_column({"a": -1}, "a")
1313 ... except ValueError as ve:
1314 ... print(ve)
1315 a=-1 is invalid, must be in 0..1000000.
1317 >>> try:
1318 ... csv_column({"a": -1}, "")
1319 ... except ValueError as ve:
1320 ... print(ve)
1321 Invalid key ''.
1322 """
1323 if str.__len__(key) <= 0:
1324 raise ValueError(f"Invalid key {key!r}.")
1325 if not isinstance(remove_col, bool):
1326 raise type_error(remove_col, "remove_col", bool)
1327 res: Final[int] = check_int_range(dict.__getitem__(
1328 columns, key), key, 0, 1_000_000)
1329 if remove_col:
1330 dict.__delitem__(columns, key)
1331 return res
1334def csv_column_or_none(columns: dict[str, int] | None = None,
1335 key: str | None = None,
1336 remove_col: bool = True) -> int | None:
1337 """
1338 Get an optional CSV column index.
1340 This function will extract the index of a column from a column description
1341 map. The index will be checked whether it is in a valid range and
1342 returned. If no column fitting to `key` exists, this function returns
1343 `None`. If `remove_col` is `True` and a column fitting to `key` exists,
1344 then this column will be deleted from `columns`.
1346 :param columns: the columns
1347 :param key: the key
1348 :param remove_col: should we remove the column?
1349 :returns: the column, or `None` if none was found
1350 :raises TypeError: if any of the parameters is not of the prescribed type
1351 :raises ValueError: if the column or key are invalid
1353 >>> csv_column_or_none({"a": 5}, "a")
1354 5
1356 >>> cols = {"a": 5, "b": 7}
1357 >>> csv_column_or_none(cols, "a", False)
1358 5
1359 >>> cols
1360 {'a': 5, 'b': 7}
1361 >>> csv_column_or_none(cols, "a", True)
1362 5
1363 >>> cols
1364 {'b': 7}
1366 >>> try:
1367 ... csv_column_or_none({"a": 5}, "a", "3")
1368 ... except TypeError as te:
1369 ... print(te)
1370 remove_col should be an instance of bool but is str, namely '3'.
1372 >>> print(csv_column_or_none({"a": 5}, "b"))
1373 None
1375 >>> print(csv_column_or_none(None, "b"))
1376 None
1378 >>> print(csv_column_or_none({"a": 5}, None))
1379 None
1381 >>> print(csv_column_or_none({"a": 5}, ""))
1382 None
1384 >>> try:
1385 ... csv_column({"a": 5}, 1)
1386 ... except TypeError as te:
1387 ... print(te)
1388 descriptor '__len__' requires a 'str' object but received a 'int'
1390 >>> try:
1391 ... csv_column({"a": -1}, "a")
1392 ... except ValueError as ve:
1393 ... print(ve)
1394 a=-1 is invalid, must be in 0..1000000.
1395 """
1396 if not isinstance(remove_col, bool):
1397 raise type_error(remove_col, "remove_col", bool)
1398 if (key is None) or (columns is None) or (str.__len__(key) <= 0):
1399 return None
1400 res: Final[int | None] = dict.get(columns, key)
1401 if res is None:
1402 return None
1403 check_int_range(res, key, 0, 1_000_000)
1404 if remove_col:
1405 dict.__delitem__(columns, key)
1406 return res
1409def csv_select_scope(
1410 conv: Callable[[dict[str, int]], U],
1411 columns: dict[str, int],
1412 scope: str | None = None,
1413 additional: Iterable[tuple[str, int]] = (),
1414 skip_orig_key: Callable[[str], bool] = lambda _: False,
1415 skip_final_key: Callable[[str], bool] = lambda _: False,
1416 skip_col: Callable[[int], bool] = lambda _: False,
1417 include_scope: bool = True,
1418 remove_cols: bool = True) -> U:
1419 """
1420 Get all the columns of a given scope and pass them to the function `conv`.
1422 This function is intended for selecting some keys from a column set and
1423 pass them as parameters to a constructor of a CSV reader. It can do this
1424 selection based on a `scope` prefix which is then removed from the column
1425 names before passing them into the constructor. If no column matches, this
1426 function throws a :class:`ValueError`.
1427 All columns that are passed on to `conv` are deleted from `columns` if
1428 `remove_cols == True`, which is the default.
1430 :param conv: the function to which the selected columns should be passed,
1431 and that creates the return value
1432 :param columns: the existing columns
1433 :param scope: the scope, or `None` or the empty string to select all
1434 columns
1435 :param skip_orig_key: a function that returns `True` for any original,
1436 unchanged key in `columns` that should be ignored and that
1437 returns `False` if the key can be processed normally (i.e., if we can
1438 check if it starts with the given scope and move on)
1439 :param skip_final_key: a function that returns `True` for any key in
1440 `columns` that would fall into the right scope but that should still
1441 be ignored. This function receives the key without the scope prefix.
1442 :param skip_col: any column that should be ignored
1443 :param additional: the additional columns to add *if* some keys/columns
1444 remain after all the transformation and selection
1445 :param include_scope: if scope appears as a lone column, should we
1446 include it?
1447 :param remove_cols: should we remove all selected columns?
1448 :returns: The result of the function `conv` applied to all matching
1449 columns (and those in `additional` are appended to them)
1450 :raises ValueError: if no columns could be selected
1451 :raises TypeError: if any of the elements passed in is of the wrong type
1453 >>> csv_select_scope(lambda x: x, {
1454 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "")
1455 {'a.x': 1, 'a.y': 2, 'a': 3, 'b': 4, 'b.t': 5}
1457 >>> try:
1458 ... csv_select_scope(print, {"a.x": 1, "a.y": 2}, "v")
1459 ... except ValueError as ve:
1460 ... print(ve)
1461 Did not find sufficient data of scope 'v' in {'a.x': 1, 'a.y': 2}.
1463 >>> try:
1464 ... csv_select_scope(print, {}, "v")
1465 ... except ValueError as ve:
1466 ... print(ve)
1467 Did not find sufficient data of scope 'v' in {}.
1468 """
1469 res: Final[U | None] = csv_select_scope_or_none(
1470 conv, columns, scope, additional, skip_orig_key, skip_final_key,
1471 skip_col, include_scope, remove_cols) \
1472 if dict.__len__(columns) > 0 else None
1473 if res is None:
1474 raise ValueError("Did not find sufficient data of "
1475 f"scope {scope!r} in {columns!r}.")
1476 return res
1479def csv_select_scope_or_none(
1480 conv: Callable[[dict[str, int]], U],
1481 columns: dict[str, int] | None,
1482 scope: str | None = None,
1483 additional: Iterable[tuple[str, int]] = (),
1484 skip_orig_key: Callable[[str], bool] = lambda _: False,
1485 skip_final_key: Callable[[str], bool] = lambda _: False,
1486 skip_col: Callable[[int], bool] = lambda _: False,
1487 include_scope: bool = True,
1488 remove_cols: bool = True) -> U | None:
1489 """
1490 Get all the columns of a given scope and pass them to the function `conv`.
1492 This function is intended for selecting some keys from a column set and
1493 pass them as parameters to a constructor of a CSV reader. It can do this
1494 selection based on a `scope` prefix which is then removed from the column
1495 names before passing them into the constructor. If no column matches, this
1496 function returns `None`.
1497 All columns that are passed on to `conv` are deleted from `columns` if
1498 `remove_cols == True`, which is the default.
1500 :param conv: the function to which the selected columns should be passed,
1501 if any, and that - in this case, returns the return value of this
1502 function
1503 :param columns: the existing columns
1504 :param scope: the scope, or `None` or the empty string to select all
1505 columns
1506 :param skip_orig_key: a function that returns `True` for any original,
1507 unchanged key in `columns` that should be ignored and that
1508 returns `False` if the key can be processed normally (i.e., if we can
1509 check if it starts with the given scope and move on)
1510 :param skip_final_key: a function that returns `True` for any key in
1511 `columns` that would fall into the right scope but that should still
1512 be ignored. This function receives the key without the scope prefix.
1513 :param skip_col: any column that should be ignored
1514 :param additional: the additional columns to add *if* some keys/columns
1515 remain after all the transformation and selection
1516 :param include_scope: if scope appears as a lone column, should we
1517 include it?
1518 :param remove_cols: should we remove all selected columns?
1519 :returns: `None` if no keys fall into the provided scope does not have any
1520 keys matching it in `columns`. The result of `conv` otherwise, i.e.,
1521 if there are matching columns, these are selected (and those in
1522 `additional` are appended to them) and these are then passed to `conv`
1523 and the result of `conv` is returned
1525 >>> csv_select_scope_or_none(print, {
1526 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a")
1527 {'x': 1, 'y': 2, 'a': 3}
1529 >>> exa1 = {"a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}
1530 >>> csv_select_scope_or_none(print, exa1, "a", remove_cols=False)
1531 {'x': 1, 'y': 2, 'a': 3}
1532 >>> exa1
1533 {'a.x': 1, 'a.y': 2, 'a': 3, 'b': 4, 'b.t': 5}
1534 >>> csv_select_scope_or_none(print, exa1, "a", remove_cols=True)
1535 {'x': 1, 'y': 2, 'a': 3}
1536 >>> exa1
1537 {'b': 4, 'b.t': 5}
1538 >>> csv_select_scope_or_none(print, exa1, "b", remove_cols=True)
1539 {'b': 4, 't': 5}
1540 >>> exa1
1541 {}
1543 >>> csv_select_scope_or_none(print, {
1544 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "")
1545 {'a.x': 1, 'a.y': 2, 'a': 3, 'b': 4, 'b.t': 5}
1547 >>> csv_select_scope_or_none(print, {
1548 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, None)
1549 {'a.x': 1, 'a.y': 2, 'a': 3, 'b': 4, 'b.t': 5}
1551 >>> csv_select_scope_or_none(print, {
1552 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1553 ... include_scope=False)
1554 {'x': 1, 'y': 2}
1556 >>> csv_select_scope_or_none(print, {
1557 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b")
1558 {'b': 4, 't': 5}
1560 >>> csv_select_scope_or_none(print, {
1561 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b",
1562 ... additional=(('z', 23), ('v', 45)))
1563 {'b': 4, 't': 5, 'z': 23, 'v': 45}
1565 >>> csv_select_scope_or_none(print, {
1566 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b",
1567 ... additional=(('t', 23), ('v', 45)))
1568 {'b': 4, 't': 5, 'v': 45}
1570 >>> csv_select_scope_or_none(print, {
1571 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1572 ... additional=(('x', 44), ('v', 45)))
1573 {'x': 1, 'y': 2, 'a': 3, 'v': 45}
1575 >>> csv_select_scope_or_none(print, {
1576 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b",
1577 ... additional=(('z', 23), ('v', 45)),
1578 ... skip_col=lambda c: c == 23)
1579 {'b': 4, 't': 5, 'v': 45}
1581 >>> csv_select_scope_or_none(print, {
1582 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b",
1583 ... additional=(('z', 23), ('v', 45)),
1584 ... skip_orig_key=lambda ok: ok == "b.t")
1585 {'b': 4, 'z': 23, 'v': 45}
1587 >>> csv_select_scope_or_none(print, {
1588 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b",
1589 ... additional=(('z', 23), ('v', 45)),
1590 ... skip_final_key=lambda fk: fk == "z")
1591 {'b': 4, 't': 5, 'v': 45}
1593 >>> print(csv_select_scope_or_none(print, {}, "a"))
1594 None
1596 >>> print(csv_select_scope_or_none(print, {}, None))
1597 None
1599 >>> print(csv_select_scope_or_none(print, None, None))
1600 None
1602 >>> print(csv_select_scope_or_none(print, {"a.x": 45}, "a",
1603 ... skip_col=lambda c: c == 45))
1604 None
1606 >>> try:
1607 ... csv_select_scope_or_none(None, {
1608 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a")
1609 ... except TypeError as te:
1610 ... print(te)
1611 conv should be a callable but is None.
1613 >>> try:
1614 ... csv_select_scope_or_none(print, {
1615 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1616 ... remove_cols=1)
1617 ... except TypeError as te:
1618 ... print(te)
1619 remove_cols should be an instance of bool but is int, namely 1.
1621 >>> try:
1622 ... csv_select_scope_or_none("x", {
1623 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a")
1624 ... except TypeError as te:
1625 ... print(te)
1626 conv should be a callable but is str, namely 'x'.
1628 >>> try:
1629 ... csv_select_scope_or_none(print, "x", "a")
1630 ... except TypeError as te:
1631 ... print(te)
1632 descriptor '__len__' requires a 'dict' object but received a 'str'
1634 >>> try:
1635 ... csv_select_scope_or_none(print, {
1636 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, int)
1637 ... except TypeError as te:
1638 ... print(te)
1639 descriptor '__len__' requires a 'str' object but received a 'type'
1641 >>> try:
1642 ... csv_select_scope_or_none(print, {
1643 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1644 ... additional=2)
1645 ... except TypeError as te:
1646 ... print(str(te)[:-7])
1647 additional should be an instance of typing.Iterable but is int, na
1649 >>> try:
1650 ... csv_select_scope_or_none(print, {
1651 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1652 ... additional=((1, 2), ))
1653 ... except TypeError as te:
1654 ... print(te)
1655 descriptor '__len__' requires a 'str' object but received a 'int'
1657 >>> try:
1658 ... csv_select_scope_or_none(print, {
1659 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1660 ... additional=(None, ))
1661 ... except TypeError as te:
1662 ... print(te)
1663 cannot unpack non-iterable NoneType object
1665 >>> try:
1666 ... csv_select_scope_or_none(print, {
1667 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1668 ... additional=(("yx", "a"), ))
1669 ... except TypeError as te:
1670 ... print(te)
1671 yx should be an instance of int but is str, namely 'a'.
1673 >>> try:
1674 ... csv_select_scope_or_none(print, {
1675 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1676 ... additional=(("yx", -2), ))
1677 ... except ValueError as ve:
1678 ... print(ve)
1679 yx=-2 is invalid, must be in 0..1000000.
1681 >>> try:
1682 ... csv_select_scope_or_none(print, {
1683 ... "a.x": 1, "a.y": 2, "a": 3, "a.b": -4, "b.t": 5}, "a")
1684 ... except ValueError as ve:
1685 ... print(ve)
1686 a.b=-4 is invalid, must be in 0..1000000.
1688 >>> try:
1689 ... csv_select_scope_or_none(print, {
1690 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1691 ... skip_col=None)
1692 ... except TypeError as te:
1693 ... print(te)
1694 skip_col should be a callable but is None.
1696 >>> try:
1697 ... csv_select_scope_or_none(print, {
1698 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1699 ... skip_orig_key=None)
1700 ... except TypeError as te:
1701 ... print(te)
1702 skip_orig_key should be a callable but is None.
1704 >>> try:
1705 ... csv_select_scope_or_none(print, {
1706 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1707 ... skip_final_key=None)
1708 ... except TypeError as te:
1709 ... print(te)
1710 skip_final_key should be a callable but is None.
1712 >>> try:
1713 ... csv_select_scope(print, {
1714 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1715 ... include_scope=3)
1716 ... except TypeError as te:
1717 ... print(te)
1718 include_scope should be an instance of bool but is int, namely 3.
1720 >>> try:
1721 ... csv_select_scope_or_none(print, {
1722 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, 4)
1723 ... except TypeError as te:
1724 ... print(te)
1725 descriptor '__len__' requires a 'str' object but received a 'int'
1727 >>> try:
1728 ... csv_select_scope_or_none(print, 11)
1729 ... except TypeError as te:
1730 ... print(te)
1731 descriptor '__len__' requires a 'dict' object but received a 'int'
1733 >>> try:
1734 ... csv_select_scope_or_none(print, {
1735 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
1736 ... additional=(("", 2), ))
1737 ... except ValueError as ve:
1738 ... print(ve)
1739 Invalid additional column ''.
1740 """
1741 if not callable(conv):
1742 raise type_error(conv, "conv", call=True)
1743 if not callable(skip_orig_key):
1744 raise type_error(skip_orig_key, "skip_orig_key", call=True)
1745 if not callable(skip_final_key):
1746 raise type_error(skip_final_key, "skip_final_key", call=True)
1747 if not isinstance(additional, Iterable):
1748 raise type_error(additional, "additional", Iterable)
1749 if not isinstance(include_scope, bool):
1750 raise type_error(include_scope, "include_scope", bool)
1751 if not callable(skip_col):
1752 raise type_error(skip_col, "skip_col", call=True)
1753 if not isinstance(remove_cols, bool):
1754 raise type_error(remove_cols, "remove_cols", bool)
1756 if (columns is None) or (dict.__len__(columns) <= 0):
1757 return None
1758 selection: Final[list[tuple[str, str, int]]] = [
1759 (k, k, v) for k, v in columns.items()
1760 if not (skip_orig_key(k) or skip_col(v))]
1761 sel_len: Final[int] = list.__len__(selection)
1762 if sel_len <= 0:
1763 return None
1765 if (scope is not None) and (str.__len__(scope) > 0):
1766 use_scope: Final[str] = f"{scope}{SCOPE_SEPARATOR}"
1767 usl: Final[int] = str.__len__(use_scope)
1768 for i in range(sel_len - 1, -1, -1):
1769 k, _, v = selection[i]
1770 if str.startswith(k, use_scope):
1771 use_key = k[usl:]
1772 if not skip_final_key(use_key):
1773 list.__setitem__(selection, i, (k, use_key, v))
1774 continue
1775 elif include_scope and (k == scope):
1776 if not skip_final_key(k):
1777 continue
1778 list.__delitem__(selection, i)
1780 if list.__len__(selection) <= 0:
1781 return None
1783 if remove_cols:
1784 for kv in selection:
1785 dict.__delitem__(columns, kv[0])
1787 subset: Final[dict[str, int]] = {
1788 kv[1]: check_int_range(
1789 kv[2], kv[0], 0, 1_000_000) for kv in selection}
1791 for kkk, vvv in additional:
1792 if str.__len__(kkk) <= 0:
1793 raise ValueError(f"Invalid additional column {kkk!r}.")
1794 if skip_final_key(kkk) or skip_col(vvv):
1795 continue
1796 if kkk not in subset:
1797 subset[kkk] = check_int_range(vvv, kkk, 0, 1_000_000)
1798 return conv(subset)
1801class CsvReader[T]:
1802 """
1803 A base class for CSV readers.
1805 Using this class and its :meth:`read` class method provides for a more
1806 elegant way to construct nested and combined CSV formats compared to
1807 creating classes and handing their methods to :func:`csv_read`.
1809 >>> class R(CsvReader):
1810 ... def __init__(self, columns: dict[str, int]) -> None:
1811 ... super().__init__(columns)
1812 ... self.cols = columns
1813 ... def parse_row(self, row: list[str]) -> dict:
1814 ... return {x: row[y] for x, y in self.cols.items()}
1816 >>> text = ["a;b;c;d", "# test", " 1; 2;3;4", " 5 ;6 ", ";8;;9",
1817 ... "", "10", "# 11;12"]
1819 >>> for p in R.read(text):
1820 ... print(p)
1821 {'a': '1', 'b': '2', 'c': '3', 'd': '4'}
1822 {'a': '5', 'b': '6', 'c': '', 'd': ''}
1823 {'a': '', 'b': '8', 'c': '', 'd': '9'}
1824 {'a': '10', 'b': '', 'c': '', 'd': ''}
1826 >>> text = ["a,b,c,d", "v test", " 1, 2,3,4", " 5 ,6 ", ",8,,9",
1827 ... "", "10", "v 11,12"]
1829 >>> for p in R.read(text, separator=',', comment_start='v'):
1830 ... print(p)
1831 {'a': '1', 'b': '2', 'c': '3', 'd': '4'}
1832 {'a': '5', 'b': '6', 'c': '', 'd': ''}
1833 {'a': '', 'b': '8', 'c': '', 'd': '9'}
1834 {'a': '10', 'b': '', 'c': '', 'd': ''}
1836 >>> class S(CsvReader):
1837 ... def __init__(self, columns: dict[str, int], add: str) -> None:
1838 ... super().__init__(columns)
1839 ... self.cols = columns
1840 ... self.s = add
1841 ... def parse_row(self, row: list[str]) -> dict:
1842 ... return {x: self.s + row[y] for x, y in self.cols.items()}
1844 >>> text = ["a;b;c;d", "# test", " 1; 2;3;4", " 5 ;6 ", ";8;;9",
1845 ... "", "10", "# 11;12"]
1847 >>> for p in S.read(text, add="b"):
1848 ... print(p)
1849 {'a': 'b1', 'b': 'b2', 'c': 'b3', 'd': 'b4'}
1850 {'a': 'b5', 'b': 'b6', 'c': 'b', 'd': 'b'}
1851 {'a': 'b', 'b': 'b8', 'c': 'b', 'd': 'b9'}
1852 {'a': 'b10', 'b': 'b', 'c': 'b', 'd': 'b'}
1854 >>> ccc = S({"a": 1}, add="x")
1855 >>> print(ccc.parse_optional_row(None))
1856 None
1857 >>> print(S.parse_optional_row(None, None))
1858 None
1859 >>> print((ccc).parse_optional_row(["x", "y"]))
1860 {'a': 'xy'}
1862 >>> try:
1863 ... CsvReader("x")
1864 ... except TypeError as te:
1865 ... print(te)
1866 columns should be an instance of dict but is str, namely 'x'.
1868 >>> try:
1869 ... CsvReader({"a": 1}).parse_row(["a"])
1870 ... except NotImplementedError as nie:
1871 ... print(type(nie))
1872 <class 'NotImplementedError'>
1873 """
1875 def __init__(self, columns: dict[str, int]) -> None:
1876 """
1877 Create the CSV reader.
1879 :param columns: the columns
1880 :raises TypeError: if `columns` is not a :class:`dict`
1881 """
1882 super().__init__()
1883 if not isinstance(columns, dict):
1884 raise type_error(columns, "columns", dict)
1886 def parse_row(self, data: list[str]) -> T:
1887 """
1888 Parse a row of data.
1890 :param data: the data row
1891 :returns: the object representing the row
1892 :raises NotImplementedError: because it must be overridden
1893 :raises ValueError: should raise a :class:`ValueError` if the row is
1894 incomplete or invalid
1895 """
1896 raise NotImplementedError
1898 def parse_optional_row(self, data: list[str] | None) -> T | None:
1899 """
1900 Parse a row of data that may be incomplete or empty.
1902 The default implementation of this method returns `None` if the data
1903 row is `None`, or if `self` is `None`, which should never happen.
1904 Otherwise, it calls :meth:`parse_row`, which will probably raise a
1905 :class:`ValueError`.
1907 :param data: the row of data that may be empty
1908 :returns: an object constructed from the partial row, if possible,
1909 or `None`
1910 """
1911 if (self is None) or (data is None):
1912 return None
1913 return self.parse_row(data)
1915 @classmethod
1916 def read(cls: type["CsvReader"], rows: Iterable[str],
1917 separator: str = CSV_SEPARATOR,
1918 comment_start: str | None = COMMENT_START,
1919 **kwargs) -> Generator[T, None, None]:
1920 """
1921 Parse a stream of CSV data.
1923 This class method creates a single new instance of `cls` and passes it
1924 the column names/indices as well as any additional named arguments of
1925 this method into the constructor. It then uses the method
1926 :meth:`parse_row` of the class to parse the row data to generate the
1927 output stream.
1929 It offers a more convenient wrapper around :func:`csv_read` for cases
1930 where it makes more sense to implement the parsing functionality in a
1931 class.
1933 :param rows: the rows of strings with CSV data
1934 :param separator: the separator character
1935 :param comment_start: the comment start character
1936 """
1937 def __creator(y: dict[str, int], __c=cls, # pylint: disable=W0102
1938 __x=kwargs) -> "CsvReader": # noqa # type: ignore
1939 return cls(y, **__x) # noqa # type: ignore
1941 yield from csv_read(rows=rows,
1942 setup=__creator,
1943 parse_row=cls.parse_row, # type: ignore
1944 separator=separator,
1945 comment_start=comment_start)
1948class CsvWriter[T]:
1949 """
1950 A base class for structured CSV writers.
1952 >>> class W(CsvWriter):
1953 ... def __init__(self, data: Iterable[dict[str, int]],
1954 ... scope: str | None = None) -> None:
1955 ... super().__init__(data, scope)
1956 ... self.rows = sorted({dkey for datarow in data
1957 ... for dkey in datarow})
1958 ... def get_column_titles(self) -> Iterable[str]:
1959 ... return self.rows
1960 ... def get_row(self, row: dict[str, int]) -> Iterable[str]:
1961 ... return map(str, (row.get(key, "") for key in self.rows))
1962 ... def get_header_comments(self) -> list[str]:
1963 ... return ["This is a header comment.", " We have two of it. "]
1964 ... def get_footer_comments(self) -> list[str]:
1965 ... return [" This is a footer comment."]
1967 >>> dd = [{"a": 1, "c": 2}, {"b": 6, "c": 8},
1968 ... {"a": 4, "d": 12, "b": 3}, {}]
1970 >>> for p in W.write(dd):
1971 ... print(p[:-8] if "version" in p else p)
1972 # This is a header comment.
1973 # We have two of it.
1974 a;b;c;d
1975 1;;2
1976 ;6;8
1977 4;3;;12
1978 ;
1979 # This is a footer comment.
1980 #
1981 # This CSV output has been created using the versatile CSV API of \
1982pycommons.io.csv, version
1983 # You can find pycommons at https://thomasweise.github.io/pycommons.
1985 >>> class W2(CsvWriter):
1986 ... def __init__(self, data: Iterable[dict[str, int]],
1987 ... scope: str | None = None) -> None:
1988 ... super().__init__(data, scope)
1989 ... self.rows = sorted({dkey for datarow in data
1990 ... for dkey in datarow})
1991 ... def get_column_titles(self) -> Iterable[str]:
1992 ... return self.rows if self.scope is None else [
1993 ... f"{self.scope}.{r}" for r in self.rows]
1994 ... def get_row(self, row: dict[str, int]) -> Iterable[str]:
1995 ... return map(str, (row.get(key, "") for key in self.rows))
1996 ... def get_footer_bottom_comments(self) -> Iterable[str] | None:
1997 ... return ["Bla!"]
1999 >>> for p in W2.write(dd, separator="@", comment_start="B"):
2000 ... print(p)
2001 a@b@c@d
2002 1@@2
2003 @6@8
2004 4@3@@12
2005 @
2006 B Bla!
2008 >>> for p in W2.write(dd, scope="k", separator="@", comment_start="B"):
2009 ... print(p)
2010 k.a@k.b@k.c@k.d
2011 1@@2
2012 @6@8
2013 4@3@@12
2014 @
2015 B Bla!
2017 >>> ";".join(W2(dd).get_optional_row(None))
2018 ';;;'
2019 >>> ";".join(W2(dd).get_optional_row(dd[0]))
2020 '1;;2;'
2022 >>> try:
2023 ... CsvWriter(1, None)
2024 ... except TypeError as te:
2025 ... print(te)
2026 data should be an instance of typing.Iterable but is int, namely 1.
2028 >>> try:
2029 ... CsvWriter([], 1)
2030 ... except TypeError as te:
2031 ... print(te)
2032 descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
2034 >>> try:
2035 ... CsvWriter([], "x x")
2036 ... except ValueError as ve:
2037 ... print(ve)
2038 invalid scope 'x x'
2040 >>> try:
2041 ... CsvWriter([], " x")
2042 ... except ValueError as ve:
2043 ... print(ve)
2044 invalid scope ' x'
2046 >>> try:
2047 ... CsvWriter([]).get_row("x")
2048 ... except NotImplementedError as nie:
2049 ... print(type(nie))
2050 <class 'NotImplementedError'>
2052 >>> try:
2053 ... CsvWriter([]).get_column_titles()
2054 ... except NotImplementedError as nie:
2055 ... print(type(nie))
2056 <class 'NotImplementedError'>
2057 """
2059 def __init__(self, data: Iterable[T],
2060 scope: str | None = None) -> None:
2061 """
2062 Initialize the csv writer.
2064 :param data: the data to be written
2065 :param scope: the prefix to be pre-pended to all columns
2066 :raises TypeError: if `data` is not an `Iterable` or if `scope` is
2067 neither `None` nor a string
2068 :raises ValueError: if `scope` is not `None` but: an empty string,
2069 becomes an empty string after stripping, or contains any
2070 whitespace or newline character
2071 """
2072 super().__init__()
2073 if not isinstance(data, Iterable):
2074 raise type_error(data, "data", Iterable)
2075 if (scope is not None) and ((str.strip(scope) != scope) or (
2076 str.__len__(scope) <= 0) or (any(map(
2077 scope.__contains__, WHITESPACE_OR_NEWLINE)))):
2078 raise ValueError(f"invalid scope {scope!r}")
2079 #: the optional scope
2080 self.scope: Final[str | None] = scope
2082 def get_column_titles(self) -> Iterable[str]:
2083 """
2084 Get the column titles.
2086 :returns: the column titles
2087 """
2088 raise NotImplementedError
2090 def get_optional_row(self, data: T | None) -> Iterable[str]:
2091 """
2092 Attach an empty row of the correct shape to the output.
2094 :param data: the data item or `None`
2095 :returns: the optional row data
2096 """
2097 if data is None: # very crude and slow way to create an optional row
2098 return [""] * list.__len__(list(self.get_column_titles()))
2099 return self.get_row(data)
2101 def get_row(self, data: T) -> Iterable[str]:
2102 """
2103 Render a single sample statistics to a CSV row.
2105 :param data: the data sample statistics
2106 :returns: the row iterator
2107 """
2108 raise NotImplementedError
2110 def get_header_comments(self) -> Iterable[str]:
2111 """
2112 Get any possible header comments.
2114 :returns: the iterable of header comments
2115 """
2116 return ()
2118 def get_footer_comments(self) -> Iterable[str]:
2119 """
2120 Get any possible footer comments.
2122 :returns: the footer comments
2123 """
2124 return ()
2126 def get_footer_bottom_comments(self) -> Iterable[str] | None:
2127 """
2128 Get the bottom footer comments.
2130 :returns: an iterator with the bottom comments
2131 """
2132 return pycommons_footer_bottom_comments(self)
2134 @classmethod
2135 def write(
2136 cls: type["CsvWriter"],
2137 data: Iterable[T],
2138 scope: str | None = None,
2139 separator: str = CSV_SEPARATOR,
2140 comment_start: str | None = COMMENT_START,
2141 **kwargs) -> Generator[str, None, None]:
2142 """
2143 Write the CSV data based on the methods provided by the class `cls`.
2145 :param data: the data
2146 :param separator: the CSV separator
2147 :param comment_start: the comment start character
2148 :param scope: the scope, or `None`
2149 :param kwargs: additional arguments to be passed to the constructor
2151 :raises TypeError: if `kwargs` is not `None` but also not a
2152 :class:`dict`
2153 """
2154 def __creator(y: Iterable[T], __c=cls, # pylint: disable=W0102
2155 __s=scope, # noqa # type: ignore
2156 __x=kwargs) -> "CsvWriter": # noqa # type: ignore
2157 return __c(data=y, scope=__s, **__x) # noqa # type: ignore
2159 yield from csv_write(
2160 data=data,
2161 column_titles=cls.get_column_titles, # type: ignore
2162 get_row=cls.get_row, # type: ignore
2163 setup=__creator,
2164 separator=separator,
2165 comment_start=comment_start,
2166 header_comments=cls.get_header_comments, # type: ignore
2167 footer_comments=cls.get_footer_comments, # type: ignore
2168 footer_bottom_comments=cls. # type: ignore
2169 get_footer_bottom_comments) # type: ignore