Coverage for pycommons / io / csv.py: 100%

297 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 03:04 +0000

1""" 

2Tools for CSV output and input. 

3 

4Our CSV format tools are intended to read and write structured objects from 

5and to a comma-separated-values format. This format consists of one header, 

6where the column titles are included (separated by a :const:`CSV_SEPARATOR`) 

7and one row per data object, with one value per column. 

8 

9Different from other CSV processing tools, we want to 

10 

111. Permit that data is extracted from / parsed in form of hierarchically 

12 structured objects. 

132. Columns have fixed types based on the object definition. 

143. The data read and written is strictly validated during the process. 

154. Data can be processed in form of a stream and is not necessarily all loaded 

16 into memory at once. 

175. The order of the columns is unimportant. 

186. Useless white space is automatically stripped and ignored. 

197. Multiple objects may be written per row, maybe even nested objects, and 

20 this is signified by "scope" column titles, e.g., something like 

21 `"weight.min"`, `"weight.median"`, ..., `"age.min"`, `"age.median"`, ... 

228. Comments may be added to the header or footer of the CSV file that describe 

23 the contents of the columns. 

24 

25The separator is configurable, but by default set to :const:`CSV_SEPARATOR`. 

26Comments start with a comment start with :const:`COMMENT_START` by default. 

27""" 

28from typing import ( 

29 Any, 

30 Callable, 

31 Final, 

32 Generator, 

33 Iterable, 

34 Mapping, 

35 TypeVar, 

36 cast, 

37) 

38 

39from pycommons.ds.sequences import reiterable 

40from pycommons.strings.chars import NEWLINE, WHITESPACE_OR_NEWLINE 

41from pycommons.types import check_int_range, type_error 

42from pycommons.version import __version__ as pycommons_version 

43 

44#: the default CSV separator 

45CSV_SEPARATOR: Final[str] = ";" 

46 

47#: everything after this character is considered a comment 

48COMMENT_START: Final[str] = "#" 

49 

50#: the separator to be used between scopes for nested column prefixes 

51SCOPE_SEPARATOR: Final[str] = "." 

52 

53#: the type variable for data to be written to CSV or to be read from CSV 

54T = TypeVar("T") 

55 

56# mypy: disable-error-code=valid-type 

57#: the type variable for the CSV output setup 

58S = TypeVar("S") 

59 

60 

61def csv_scope(scope: str | None, key: str | None) -> str: 

62 """ 

63 Combine a scope and a key. 

64 

65 :param scope: the scope, or `None` 

66 :param key: the key, or `None` 

67 :returns: the scope joined with the key 

68 

69 >>> csv_scope("a", "b") 

70 'a.b' 

71 >>> csv_scope("a", None) 

72 'a' 

73 >>> csv_scope(None, "b") 

74 'b' 

75 

76 >>> try: 

77 ... csv_scope(1, "b") 

78 ... except TypeError as te: 

79 ... print(str(te)) 

80 descriptor '__len__' requires a 'str' object but received a 'int' 

81 

82 >>> try: 

83 ... csv_scope("a", 1) 

84 ... except TypeError as te: 

85 ... print(str(te)) 

86 descriptor '__len__' requires a 'str' object but received a 'int' 

87 

88 >>> try: 

89 ... csv_scope("a ", "b") 

90 ... except ValueError as ve: 

91 ... print(str(ve)) 

92 Invalid csv scope 'a '. 

93 

94 >>> try: 

95 ... csv_scope("", "b") 

96 ... except ValueError as ve: 

97 ... print(ve) 

98 Invalid csv scope ''. 

99 

100 >>> try: 

101 ... csv_scope("a", " b") 

102 ... except ValueError as ve: 

103 ... print(str(ve)) 

104 Invalid csv key ' b'. 

105 

106 >>> try: 

107 ... csv_scope("a", "") 

108 ... except ValueError as ve: 

109 ... print(str(ve)) 

110 Invalid csv key ''. 

111 

112 >>> try: 

113 ... csv_scope(None, None) 

114 ... except ValueError as ve: 

115 ... print(str(ve)) 

116 Csv scope and key cannot both be None. 

117 """ 

118 if (key is not None) and ((str.__len__(key) <= 0) or ( 

119 str.strip(key) != key)): 

120 raise ValueError(f"Invalid csv key {key!r}.") 

121 if scope is None: 

122 if key is None: 

123 raise ValueError("Csv scope and key cannot both be None.") 

124 return key 

125 if (str.__len__(scope) <= 0) or (str.strip(scope) != scope): 

126 raise ValueError(f"Invalid csv scope {scope!r}.") 

127 if key is None: 

128 return scope 

129 return f"{scope}{SCOPE_SEPARATOR}{key}" 

130 

131 

132def csv_read(rows: Iterable[str], 

133 setup: Callable[[dict[str, int]], S], 

134 parse_row: Callable[[S, list[str]], T], 

135 separator: str = CSV_SEPARATOR, 

136 comment_start: str | None = COMMENT_START) \ 

137 -> Generator[T, None, None]: 

138 r""" 

139 Read (parse) a sequence of strings as CSV data. 

140 

141 All lines str :meth:`~str.split` based on the `separator` string and each 

142 of the resulting strings is stripped via :meth:`~str.strip`. 

143 The first non-empty line of the data is interpreted as header line. 

144 

145 This header is passed to the `setup` function in form of a :class:`dict` 

146 that maps column titles to column indices. This function then returns an 

147 object of setup data. To each of the rows of CSV data, the function 

148 `parse_row` is applied. This function receives the object returned by 

149 `setup` as first argument and the row as list of strings as second 

150 argument. Each line is therefore :meth:`~str.split` (by the CSV separator) 

151 and its component :meth:`~str.strip`-ped. 

152 It is permitted that a line in the CSV file contains fewer columns than 

153 declared in the header. In this case, the missing columns are set to empty 

154 strings. Lines that are entirely empty are skipped. 

155 

156 If `comment_start` is not none, then all text in a line starting at the 

157 first occurence of `comment_start` is discarted before the line is 

158 processed. 

159 

160 If you want to read more complex CSV structures, then using the class 

161 :class:`CsvReader` and its class method :meth:`CsvReader.read` are a more 

162 convenient approach. They are wrappers around :func:`csv_read`. 

163 

164 :param rows: the rows of text 

165 :param setup: a function which creates an object holding the necessary 

166 information for row parsing 

167 :param parse_row: the unction parsing the rows 

168 :param separator: the string used to separate columns 

169 :param comment_start: the string starting comments 

170 :returns: an :class:`Generator` with the parsed data rows 

171 :raises TypeError: if any of the parameters has the wrong type 

172 :raises ValueError: if the separator or comment start character are 

173 incompatible or if the data has some internal error 

174 

175 >>> def _setup(colidx: dict[str, int]) -> dict[str, int]: 

176 ... return colidx 

177 

178 >>> def _parse_row(colidx: dict[str, int], row: list[str]) -> dict: 

179 ... return {x: row[y] for x, y in colidx.items()} 

180 

181 >>> text = ["a;b;c;d", "# test", " 1; 2;3;4", " 5 ;6 ", ";8;;9", 

182 ... "", "10", "# 11;12"] 

183 

184 >>> for p in csv_read(text, _setup, _parse_row): 

185 ... print(p) 

186 {'a': '1', 'b': '2', 'c': '3', 'd': '4'} 

187 {'a': '5', 'b': '6', 'c': '', 'd': ''} 

188 {'a': '', 'b': '8', 'c': '', 'd': '9'} 

189 {'a': '10', 'b': '', 'c': '', 'd': ''} 

190 

191 >>> for p in csv_read((t.replace(";", ",") for t in text), _setup, 

192 ... _parse_row, ","): 

193 ... print(p) 

194 {'a': '1', 'b': '2', 'c': '3', 'd': '4'} 

195 {'a': '5', 'b': '6', 'c': '', 'd': ''} 

196 {'a': '', 'b': '8', 'c': '', 'd': '9'} 

197 {'a': '10', 'b': '', 'c': '', 'd': ''} 

198 

199 >>> for p in csv_read((t.replace(";", "\t") for t in text), _setup, 

200 ... _parse_row, "\t"): 

201 ... print(p) 

202 {'a': '1', 'b': '2', 'c': '3', 'd': '4'} 

203 {'a': '5', 'b': '6', 'c': '', 'd': ''} 

204 {'a': '', 'b': '8', 'c': '', 'd': '9'} 

205 {'a': '10', 'b': '', 'c': '', 'd': ''} 

206 

207 >>> for p in csv_read(text, _setup, _parse_row, comment_start=None): 

208 ... print(p) 

209 {'a': '# test', 'b': '', 'c': '', 'd': ''} 

210 {'a': '1', 'b': '2', 'c': '3', 'd': '4'} 

211 {'a': '5', 'b': '6', 'c': '', 'd': ''} 

212 {'a': '', 'b': '8', 'c': '', 'd': '9'} 

213 {'a': '10', 'b': '', 'c': '', 'd': ''} 

214 {'a': '# 11', 'b': '12', 'c': '', 'd': ''} 

215 

216 >>> text = ["a;b;c;d", "# test", " 1; 2;3;4", " 5 ;6 ", "5;6", ";8;;9", 

217 ... "", "10", "# 11;12"] 

218 >>> for p in csv_read(text, _setup, _parse_row): 

219 ... print(p) 

220 {'a': '1', 'b': '2', 'c': '3', 'd': '4'} 

221 {'a': '5', 'b': '6', 'c': '', 'd': ''} 

222 {'a': '5', 'b': '6', 'c': '', 'd': ''} 

223 {'a': '', 'b': '8', 'c': '', 'd': '9'} 

224 {'a': '10', 'b': '', 'c': '', 'd': ''} 

225 

226 >>> try: 

227 ... list(csv_read(None, _setup, _parse_row)) 

228 ... except TypeError as te: 

229 ... print(te) 

230 rows should be an instance of typing.Iterable but is None. 

231 

232 >>> try: 

233 ... list(csv_read(1, _setup, _parse_row)) 

234 ... except TypeError as te: 

235 ... print(te) 

236 rows should be an instance of typing.Iterable but is int, namely 1. 

237 

238 >>> try: 

239 ... list(csv_read(text, None, _parse_row)) 

240 ... except TypeError as te: 

241 ... print(te) 

242 setup should be a callable but is None. 

243 

244 >>> try: 

245 ... list(csv_read(text, 1, _parse_row)) 

246 ... except TypeError as te: 

247 ... print(te) 

248 setup should be a callable but is int, namely 1. 

249 

250 >>> try: 

251 ... list(csv_read(text, _setup, None)) 

252 ... except TypeError as te: 

253 ... print(te) 

254 parse_row should be a callable but is None. 

255 

256 >>> try: 

257 ... list(csv_read(text, _setup, 1)) 

258 ... except TypeError as te: 

259 ... print(te) 

260 parse_row should be a callable but is int, namely 1. 

261 

262 >>> try: 

263 ... list(csv_read(text, _setup, _parse_row, None)) 

264 ... except TypeError as te: 

265 ... print(te) 

266 descriptor '__len__' requires a 'str' object but received a 'NoneType' 

267 

268 >>> try: 

269 ... list(csv_read(text, _setup, _parse_row, 1)) 

270 ... except TypeError as te: 

271 ... print(te) 

272 descriptor '__len__' requires a 'str' object but received a 'int' 

273 

274 >>> try: 

275 ... list(csv_read(text, _setup, _parse_row, "")) 

276 ... except ValueError as ve: 

277 ... print(ve) 

278 Invalid separator ''. 

279 

280 >>> try: 

281 ... list(csv_read(text, _setup, _parse_row, "-", 1)) 

282 ... except TypeError as te: 

283 ... print(te) 

284 descriptor '__len__' requires a 'str' object but received a 'int' 

285 

286 >>> try: 

287 ... list(csv_read(text, _setup, _parse_row, "-", "")) 

288 ... except ValueError as ve: 

289 ... print(ve) 

290 Invalid comment start: ''. 

291 

292 >>> try: 

293 ... list(csv_read(text, _setup, _parse_row, "-", " ")) 

294 ... except ValueError as ve: 

295 ... print(ve) 

296 Invalid comment start: ' '. 

297 

298 >>> try: 

299 ... list(csv_read(text, _setup, _parse_row, ";", ";")) 

300 ... except ValueError as ve: 

301 ... print(ve) 

302 Invalid comment start: ';'. 

303 

304 >>> text2 = ["a;b;a;d", "# test", " 1; 2;3;4", " 5 ;6 ", ";8;;9"] 

305 >>> try: 

306 ... list(csv_read(text2, _setup, _parse_row)) 

307 ... except ValueError as ve: 

308 ... print(ve) 

309 Invalid column headers: ['a', 'b', 'a', 'd']. 

310 

311 >>> text2 = ["a;b;c;d", "# test", " 1; 2;3;4", "1;2;3;4;5;6;7", ";8;;9"] 

312 >>> try: 

313 ... list(csv_read(text2, _setup, _parse_row)) 

314 ... except ValueError as ve: 

315 ... print(ve) 

316 Invalid row '1;2;3;4;5;6;7' contains 7 columns, but should have at most 4. 

317 """ 

318 if not isinstance(rows, Iterable): 

319 raise type_error(rows, "rows", Iterable) 

320 if not callable(setup): 

321 raise type_error(setup, "setup", call=True) 

322 if not callable(parse_row): 

323 raise type_error(parse_row, "parse_row", call=True) 

324 if str.__len__(separator) <= 0: 

325 raise ValueError(f"Invalid separator {separator!r}.") 

326 if (comment_start is not None) and ( 

327 (str.__len__(comment_start) <= 0) or ( 

328 str.strip(comment_start) != comment_start) or ( 

329 comment_start in separator)): 

330 raise ValueError(f"Invalid comment start: {comment_start!r}.") 

331 

332 col_count: int = -1 

333 

334 # cannot strip spaces that are part of the separator 

335 strip: Final[Callable[[str], str]] = str.strip 

336 stripper: Final[Callable[[str], str]] = strip if ( # type: ignore 

337 strip(separator) == separator) else str.rstrip # type: ignore 

338 find: Final[Callable[[str, str], int]] = str.find # type: ignore 

339 split: Final[Callable[[str, str], list[str]]] = str.split # type: ignore 

340 listlen: Final[Callable[[list], int]] = list.__len__ # type: ignore 

341 strlen: Final[Callable[[str], int]] = str.__len__ # type: ignore 

342 info: S | None = None # the column definition info generated by setup 

343 exts: dict[int, list[str]] = {} # the list of extensions 

344 

345 for orig_line in rows: # iterate over all the rows 

346 line: str = orig_line 

347 if comment_start is not None: # delete comment part, if any 

348 deli = find(line, comment_start) 

349 if deli >= 0: 

350 line = line[:deli] 

351 line = stripper(line) 

352 if strlen(line) <= 0: 

353 continue # nothing to do here 

354 

355 cols: list[str] = split(line, separator) # split into columns 

356 for i, v in enumerate(cols): # string whitespace off columns 

357 cols[i] = strip(v) 

358 

359 if info is None: # need to load column definition 

360 col_count = listlen(cols) 

361 colmap: dict[str, int] = {s: i for i, s in enumerate(cols)} 

362 if any(strlen(s) <= 0 for s in cols) or ( 

363 dict.__len__(colmap) != col_count) or (col_count <= 0): 

364 raise ValueError(f"Invalid column headers: {cols!r}.") 

365 info = setup(colmap) # obtain the column setup object 

366 del colmap # column map no longer needed 

367 continue # proceed with next line 

368 

369 count: int = listlen(cols) # get number of columns 

370 if count > col_count: # too many columns, throw error 

371 raise ValueError( 

372 f"Invalid row {orig_line!r} contains {count} columns, but " 

373 f"should have at most {col_count}.") 

374 if count < col_count: # do we need to add dummy columns? 

375 add: int = col_count - count # number of needed columns 

376 if add not in exts: # check if in cache 

377 exts[add] = [""] * add # add to cache 

378 cols.extend(exts[add]) 

379 yield parse_row(info, cols) 

380 

381 

382def pycommons_footer_bottom_comments( 

383 _: Any, additional: str | None = None) -> Iterable[str]: 

384 """ 

385 Print standard footer bottom comments for `pycommons`. 

386 

387 :param _: ignored 

388 :param additional: an optional line of additional comments 

389 :returns: an :class:`Iterable` of standard pycommons comments 

390 

391 >>> for p in pycommons_footer_bottom_comments(""): 

392 ... print(p[:70]) 

393 This CSV output has been created using the versatile CSV API of pycomm 

394 You can find pycommons at https://thomasweise.github.io/pycommons. 

395 

396 >>> for p in pycommons_footer_bottom_comments("", "Statistics are cool."): 

397 ... print(p[:70]) 

398 This CSV output has been created using the versatile CSV API of pycomm 

399 Statistics are cool. 

400 You can find pycommons at https://thomasweise.github.io/pycommons. 

401 """ 

402 yield ("This CSV output has been created using the versatile CSV API of " 

403 f"pycommons.io.csv, version {pycommons_version}.") 

404 if (additional is not None) and (str.__len__(additional) > 0): 

405 yield additional 

406 yield "You can find pycommons at https://thomasweise.github.io/pycommons." 

407 

408 

409def __print_comments(comments: Iterable[str] | None, 

410 comment_start: str, comment_type: str, 

411 empty_first_row: bool) -> Generator[str, None, None]: 

412 r""" 

413 Produce the comments after formatting and checking them. 

414 

415 :param comments: the comment source 

416 :param comment_start: the comment start string 

417 :param comment_type: the comment type 

418 :param empty_first_row: should we put an empty first row? 

419 :returns: the generator of the comment strings 

420 :raises TypeError: if an argument is of the wrong type 

421 :raises ValueError: if comments cannot be placed or contain newlines 

422 

423 >>> col = ["", "First comment.", "Second comment.", "", "", 

424 ... " Third comment. "] 

425 >>> for p in __print_comments(col, "#", "header", False): 

426 ... print(p) 

427 # First comment. 

428 # Second comment. 

429 # 

430 # Third comment. 

431 

432 >>> col.clear() 

433 >>> list(__print_comments(col, "#", "header", True)) 

434 [] 

435 

436 >>> col = ["", "First comment.", "Second comment.", "", "", 

437 ... " Third comment. "] 

438 >>> for p in __print_comments(col, "#", "header", True): 

439 ... print(p) 

440 # 

441 # First comment. 

442 # Second comment. 

443 # 

444 # Third comment. 

445 

446 >>> col = ["First comment.", "Second comment.", "", "", 

447 ... " Third comment. "] 

448 >>> for p in __print_comments(col, "#", "header", True): 

449 ... print(p) 

450 # 

451 # First comment. 

452 # Second comment. 

453 # 

454 # Third comment. 

455 

456 >>> col = ["", "", "First comment.", "Second comment.", "", "", 

457 ... " Third comment. "] 

458 >>> for p in __print_comments(col, "#", "header", True): 

459 ... print(p) 

460 # 

461 # First comment. 

462 # Second comment. 

463 # 

464 # Third comment. 

465 

466 >>> list(__print_comments([], "#", "header", False)) 

467 [] 

468 >>> list(__print_comments([""], "#", "header", False)) 

469 [] 

470 >>> list(__print_comments(["", ""], "#", "header", False)) 

471 [] 

472 >>> list(__print_comments([], "#", "header", True)) 

473 [] 

474 >>> list(__print_comments([""], "#", "header", True)) 

475 [] 

476 >>> list(__print_comments(["", ""], "#", "header", True)) 

477 [] 

478 

479 >>> list(__print_comments(None, "#", "header", True)) 

480 [] 

481 

482 >>> try: 

483 ... list(__print_comments(1, "#", "header", True)) 

484 ... except TypeError as te: 

485 ... print(te) 

486 comments should be an instance of typing.Iterable but is int, namely 1. 

487 

488 >>> try: 

489 ... list(__print_comments(["", 1, "Second comment."], "x", "header", 

490 ... False)) 

491 ... except TypeError as te: 

492 ... print(te) 

493 descriptor 'strip' for 'str' objects doesn't apply to a 'int' object 

494 

495 >>> try: 

496 ... list(__print_comments(["", None, "Second."], "x", "header", 

497 ... False)) 

498 ... except TypeError as te: 

499 ... print(te) 

500 descriptor 'strip' for 'str' objects doesn't apply to a 'NoneType' object 

501 

502 >>> try: 

503 ... list(__print_comments(["Hello", "x\ny", "z"], "#", "header", 

504 ... False)) 

505 ... except ValueError as ve: 

506 ... print(ve) 

507 A header comment must not contain a newline character, but 'x\ny' does. 

508 """ 

509 if comments is None: 

510 return 

511 if not isinstance(comments, Iterable): 

512 raise type_error(comments, "comments", Iterable) 

513 not_first = False 

514 for cmt in comments: 

515 xcmt = str.strip(cmt) # strip and typecheck 

516 if str.__len__(xcmt) <= 0: 

517 if not_first: 

518 yield comment_start 

519 empty_first_row = not_first = False 

520 continue 

521 if any(map(xcmt.__contains__, NEWLINE)): 

522 raise ValueError(f"A {comment_type} comment must not contain " 

523 f"a newline character, but {cmt!r} does.") 

524 not_first = True 

525 if empty_first_row: 

526 yield comment_start 

527 empty_first_row = False 

528 yield f"{comment_start} {xcmt}" 

529 

530 

531def __default_row(s: Iterable[str], t: Any) -> Iterable[str]: 

532 """ 

533 Generate row data in the default way. 

534 

535 :param s: the setup object: an :class:`Iterable` of string 

536 :param t: the row object 

537 :returns: an :class:`Iterable` of string 

538 

539 >>> list(__default_row(("a", "b"), ("1", "2"))) 

540 ['1', '2'] 

541 

542 >>> list(__default_row(("a", "b"), {"b": 45, "c": 44, "a": 6})) 

543 ['6', '45'] 

544 """ 

545 if isinstance(t, Mapping): 

546 return (str(t[ss]) if ss in t else "" for ss in s) 

547 return map(str, cast("Iterable[Any]", t)) 

548 

549 

550def csv_write( 

551 data: Iterable[T], 

552 column_titles: Iterable[str] | Callable[[S], Iterable[str]] = 

553 lambda t: cast("Iterable[str]", t), 

554 get_row: Callable[[S, T], Iterable[str]] = 

555 cast("Callable[[S, T], Iterable[str]]", __default_row), 

556 setup: Callable[[Iterable[T]], S] = lambda t: cast("S", t), 

557 separator: str = CSV_SEPARATOR, 

558 comment_start: str | None = COMMENT_START, 

559 header_comments: 

560 Iterable[str] | Callable[[S], Iterable[str] | None] | None = None, 

561 footer_comments: 

562 Iterable[str] | Callable[[S], Iterable[str] | None] | None = None, 

563 footer_bottom_comments: Iterable[str] | Callable[[ 

564 S], Iterable[str] | None] | None = 

565 pycommons_footer_bottom_comments) -> Generator[str, None, None]: 

566 r""" 

567 Produce a sequence of CSV formatted text. 

568 

569 The data is provided in form of a :class:`Iterable`. In a first step, the 

570 function `setup` is invoked and applied to the `data` :class:`Iterable`. 

571 It can return an object that sort of stores the structure of the data, 

572 e.g., which columns should be generated and how they should be formatted. 

573 

574 `column_titles` can either be an :class:`Iterable` with the column titles 

575 or a :class:`Callable`. In the latter case, the object generated by `setup` 

576 is passed to `column_titles`, which should generate the column titles. 

577 These titles are :meth:`~str.strip`-ped and concatenated to use the column 

578 `separator` string and the resulting header string is passed to `consumer`. 

579 

580 Then, for each element `e` in the `data` :class:`Iterable`, the function 

581 `get_row` is invoked. This function receives the setup information object 

582 (previously returned by `setup`). It should generate one string per 

583 column. These strings are then each :meth:`~str.strip`-ped and 

584 concatenated using the column `separator` string. All trailing `separator` 

585 are removed, but if all strings are empty, at least a single `separator` 

586 is retained. The resulting string (per row) is again passed to `consumer`. 

587 

588 Additionally, `header_comments` and `footer_comments` can be `None`, to 

589 not include any such comments, an :class:`Iterable` of comments, or 

590 functions to generate row comments as :class:`str`. These are then 

591 prepended or appends as comment rows before or after all of the 

592 above, respectively. In that case, `comment_start` is prepended to each 

593 line. If `comment_start is None`, then these comments are not printed. 

594 `footer_bottom_comments` provides means to print additional comments 

595 after the footer comments `comment_start is not None`. 

596 

597 If you create nested CSV formats, i.e., such where the `setup` function 

598 invokes the `setup` function of other data, and the data that you receive 

599 could come from a :class:`~typing.Generator` (or some other one-shot 

600 :class:`~typing.Iterator`), then you need to make sure to solidify the 

601 iterable data with :func:`~pycommons.ds.sequences.reiterable`. The 

602 structure of our CSV output is that `setup` is first invoked and then 

603 `get_row`. If `setup` already consumes the data away, then `get_row` may 

604 print nothing. Alternatively, if you apply multiple `setup` routines to 

605 the same data that extract different information, then the first `setup` 

606 run may consume all the data, leaving nothing for the second one. 

607 

608 If you want to write more complex CSV structures, then implementing the 

609 class :class:`CsvWriter` and using its class method 

610 :meth:`CsvWriter.write` may be a more convenient solution. 

611 They are wrappers around :func:`csv_write`. 

612 

613 :param data: the iterable of data to be written 

614 :param column_titles: get the column titles 

615 :param get_row: transform a row of data into a list of strings 

616 :param setup: the setup function that computes how the data should be 

617 represented 

618 :param separator: the string used to separate columns 

619 :param comment_start: the string starting comments 

620 :param header_comments: get the comments to be placed above the CSV 

621 header row -- only invoked if `comment_start is not None`. 

622 :param footer_comments: get the comments to be placed after the last 

623 row -- only invoked if `comment_start is not None`. 

624 :param footer_bottom_comments: get the footer bottom comments, i.e., 

625 comments to be printed after all other footers. These commonts may 

626 include something like the version information of the software used. 

627 This function is only invoked if `comment_start is not None`. 

628 :returns: a :class:`Generator` with the rows of CSV text 

629 :raises TypeError: if any of the parameters has the wrong type 

630 :raises ValueError: if the separator or comment start character are 

631 incompatible or if the data has some internal error 

632 

633 >>> dd = [{"a": 1, "c": 2}, {"b": 6, "c": 8}, 

634 ... {"a": 4, "d": 12, "b": 3}, {}] 

635 

636 >>> def __setup(datarows) -> list[str]: 

637 ... return sorted({dkey for datarow in datarows for dkey in datarow}) 

638 

639 >>> def __get_row(keyd: list[str], row: dict[str, int]) -> Iterable[str]: 

640 ... return map(str, (row.get(key, "") for key in keyd)) 

641 

642 >>> def __get_header_cmt(keyd: list[str]) -> list[str]: 

643 ... return ["This is a header comment.", " We have two of it. "] 

644 

645 >>> def __get_footer_cmt(keyd: list[str]) -> list[str]: 

646 ... return [" This is a footer comment."] 

647 

648 >>> for p in csv_write(dd, lambda x: x, __default_row, __setup, 

649 ... ";", "#", __get_header_cmt, __get_footer_cmt, 

650 ... lambda _: ()): 

651 ... print(p) 

652 # This is a header comment. 

653 # We have two of it. 

654 a;b;c;d 

655 1;;2 

656 ;6;8 

657 4;3;;12 

658 ; 

659 # This is a footer comment. 

660 

661 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup, 

662 ... ";", "#", __get_header_cmt, __get_footer_cmt): 

663 ... print(p[:70]) 

664 # This is a header comment. 

665 # We have two of it. 

666 a;b;c;d 

667 1;;2 

668 ;6;8 

669 4;3;;12 

670 ; 

671 # This is a footer comment. 

672 # 

673 # This CSV output has been created using the versatile CSV API of pyco 

674 # You can find pycommons at https://thomasweise.github.io/pycommons. 

675 

676 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup, 

677 ... ",", "@@", __get_header_cmt, __get_footer_cmt, 

678 ... lambda _: ()): 

679 ... print(p) 

680 @@ This is a header comment. 

681 @@ We have two of it. 

682 a,b,c,d 

683 1,,2 

684 ,6,8 

685 4,3,,12 

686 , 

687 @@ This is a footer comment. 

688 

689 >>> try: 

690 ... list(csv_write(None, lambda x: x, __get_row, __setup, 

691 ... ";", "#", __get_header_cmt, __get_footer_cmt)) 

692 ... except TypeError as te: 

693 ... print(str(te)[:60]) 

694 source should be an instance of any in {typing.Iterable, typ 

695 

696 >>> try: 

697 ... list(csv_write(1, lambda x: x, __get_row, __setup, 

698 ... ";", "#", __get_header_cmt, __get_footer_cmt)) 

699 ... except TypeError as te: 

700 ... print(str(te)[:60]) 

701 source should be an instance of any in {typing.Iterable, typ 

702 

703 >>> try: 

704 ... list(csv_write(dd, None, __get_row, __setup, 

705 ... ";", "#", __get_header_cmt, __get_footer_cmt)) 

706 ... except TypeError as te: 

707 ... print(str(te)[:70]) 

708 column_titles should be an instance of typing.Iterable or a callable b 

709 

710 >>> try: 

711 ... list(csv_write(dd, 1, __get_row, __setup, 

712 ... ";", "#", __get_header_cmt, __get_footer_cmt)) 

713 ... except TypeError as te: 

714 ... print(str(te)[:70]) 

715 column_titles should be an instance of typing.Iterable or a callable b 

716 

717 >>> try: 

718 ... list(csv_write(dd, lambda x: x, None, __setup, 

719 ... ";", "#", __get_header_cmt, __get_footer_cmt)) 

720 ... except TypeError as te: 

721 ... print(te) 

722 get_row should be a callable but is None. 

723 

724 >>> try: 

725 ... list(csv_write(dd, lambda x: x, 1, __setup, 

726 ... ";", "#", __get_header_cmt, __get_footer_cmt)) 

727 ... except TypeError as te: 

728 ... print(te) 

729 get_row should be a callable but is int, namely 1. 

730 

731 >>> try: 

732 ... list(csv_write(dd, lambda x: x, __get_row, None, 

733 ... ";", "#", __get_header_cmt, __get_footer_cmt)) 

734 ... except TypeError as te: 

735 ... print(te) 

736 setup should be a callable but is None. 

737 

738 >>> try: 

739 ... list(csv_write(dd, lambda x: x, __get_row, 1, 

740 ... ";", "#", __get_header_cmt, __get_footer_cmt)) 

741 ... except TypeError as te: 

742 ... print(te) 

743 setup should be a callable but is int, namely 1. 

744 

745 >>> try: 

746 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

747 ... None, "#", __get_header_cmt, __get_footer_cmt)) 

748 ... except TypeError as te: 

749 ... print(te) 

750 descriptor '__len__' requires a 'str' object but received a 'NoneType' 

751 

752 >>> try: 

753 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

754 ... 1, "#", __get_header_cmt, __get_footer_cmt)) 

755 ... except TypeError as te: 

756 ... print(te) 

757 descriptor '__len__' requires a 'str' object but received a 'int' 

758 

759 >>> try: 

760 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

761 ... ";", 1, __get_header_cmt, __get_footer_cmt)) 

762 ... except TypeError as te: 

763 ... print(te) 

764 descriptor '__len__' requires a 'str' object but received a 'int' 

765 

766 >>> try: 

767 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

768 ... ";", "#", 1, __get_footer_cmt)) 

769 ... except TypeError as te: 

770 ... print(str(te)[:70]) 

771 header_comments should be an instance of typing.Iterable or a callable 

772 

773 >>> try: 

774 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

775 ... ";", "", __get_header_cmt, __get_footer_cmt)) 

776 ... except ValueError as ve: 

777 ... print(ve) 

778 Invalid comment start: ''. 

779 

780 >>> try: 

781 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

782 ... ";", " ", __get_header_cmt, __get_footer_cmt)) 

783 ... except ValueError as ve: 

784 ... print(ve) 

785 Invalid comment start: ' '. 

786 

787 >>> try: 

788 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

789 ... ";", "# ", __get_header_cmt, __get_footer_cmt)) 

790 ... except ValueError as ve: 

791 ... print(ve) 

792 Invalid comment start: '# '. 

793 

794 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup, ";", 

795 ... None, None): 

796 ... print(p) 

797 a;b;c;d 

798 1;;2 

799 ;6;8 

800 4;3;;12 

801 ; 

802 

803 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup, 

804 ... ";", None, __get_header_cmt): 

805 ... print(p) 

806 a;b;c;d 

807 1;;2 

808 ;6;8 

809 4;3;;12 

810 ; 

811 

812 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup, 

813 ... ";", None, footer_comments=__get_footer_cmt, 

814 ... footer_bottom_comments= None): 

815 ... print(p) 

816 a;b;c;d 

817 1;;2 

818 ;6;8 

819 4;3;;12 

820 ; 

821 

822 >>> try: 

823 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

824 ... ";", "#", __get_header_cmt, 1)) 

825 ... except TypeError as te: 

826 ... print(str(te)[:70]) 

827 footer_comments should be an instance of typing.Iterable or a callable 

828 

829 >>> def __err_cmt_1(keyd: list[str]) -> Iterable[str]: 

830 ... return ("This is\n a comment with error.", ) 

831 

832 >>> try: 

833 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

834 ... ";", "#", __err_cmt_1)) 

835 ... except ValueError as ve: 

836 ... print(str(ve)[:58]) 

837 A header comment must not contain a newline character, but 

838 

839 >>> try: 

840 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

841 ... ";", "#", footer_comments=__err_cmt_1, 

842 ... footer_bottom_comments=None)) 

843 ... except ValueError as ve: 

844 ... print(str(ve)[:58]) 

845 A footer comment must not contain a newline character, but 

846 

847 >>> def __empty_cmt(keyd: list[str]) -> Iterable[str]: 

848 ... return (" ", ) 

849 

850 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup, 

851 ... ";", "#", __empty_cmt, __empty_cmt, __empty_cmt): 

852 ... print(p) 

853 a;b;c;d 

854 1;;2 

855 ;6;8 

856 4;3;;12 

857 ; 

858 

859 >>> for p in csv_write(dd, lambda x: x, __get_row, __setup, 

860 ... ";", "#", footer_comments=__empty_cmt, 

861 ... footer_bottom_comments=lambda _: ()): 

862 ... print(p) 

863 a;b;c;d 

864 1;;2 

865 ;6;8 

866 4;3;;12 

867 ; 

868 

869 >>> def __error_column_titles_1(keyd: list[str]) -> Iterable[str]: 

870 ... return () 

871 

872 >>> try: 

873 ... list(csv_write(dd, __error_column_titles_1, __get_row, 

874 ... __setup, ";", "#")) 

875 ... except ValueError as ve: 

876 ... print(ve) 

877 Cannot have zero columns. 

878 

879 >>> dde = dd.copy() 

880 >>> dde.append(None) 

881 >>> try: 

882 ... list(csv_write(dde, lambda x: x, __get_row, 

883 ... lambda _: ["a", "b", "c", "d"], 

884 ... ";", "#", footer_comments=__empty_cmt, 

885 ... footer_bottom_comments=lambda _: ())) 

886 ... except TypeError as te: 

887 ... print(te) 

888 data element should be an instance of object but is None. 

889 

890 >>> def __error_column_titles_2(keyd: list[str]) -> Iterable[str]: 

891 ... return (" ", ) 

892 

893 >>> try: 

894 ... list(csv_write(dd, __error_column_titles_2, __get_row, __setup, 

895 ... ";", "#")) 

896 ... except ValueError as ve: 

897 ... print(str(ve)[:50]) 

898 Invalid column title ' ', must neither be empty no 

899 

900 >>> def __error_column_titles_3(keyd: list[str]) -> Iterable[str]: 

901 ... return ("bla\nblugg", ) 

902 

903 >>> try: 

904 ... list(csv_write(dd, __error_column_titles_3, __get_row, __setup, 

905 ... ";", "#")) 

906 ... except ValueError as ve: 

907 ... print(str(ve)[:50]) 

908 Invalid column title 'bla\nblugg', must neither be 

909 

910 >>> def __error_column_titles_4(keyd: list[str]) -> Iterable[str]: 

911 ... return (None, ) 

912 

913 >>> try: 

914 ... list(csv_write(dd, __error_column_titles_4, __get_row, __setup, 

915 ... ";", "#")) 

916 ... except TypeError as te: 

917 ... print(te) 

918 descriptor 'strip' for 'str' objects doesn't apply to a 'NoneType' object 

919 

920 >>> def __error_column_titles_5(keyd: list[str]) -> Iterable[str]: 

921 ... return (1, ) 

922 

923 >>> try: 

924 ... list(csv_write(dd, __error_column_titles_5, __get_row, __setup, 

925 ... ";", "#")) 

926 ... except TypeError as te: 

927 ... print(te) 

928 descriptor 'strip' for 'str' objects doesn't apply to a 'int' object 

929 

930 >>> def __error_column_titles_6(keyd: list[str]) -> Iterable[str]: 

931 ... return ("a", "b", "c", "a") 

932 

933 >>> try: 

934 ... list(csv_write(dd, __error_column_titles_6, __get_row, __setup, 

935 ... ";", "#")) 

936 ... except ValueError as ve: 

937 ... print(ve) 

938 Cannot have duplicated columns: ['a', 'b', 'c', 'a']. 

939 

940 >>> def __error_column_titles_7(keyd: list[str]) -> Iterable[str]: 

941 ... return ("a", "b", "c;4") 

942 

943 >>> try: 

944 ... list(csv_write(dd, __error_column_titles_7, __get_row, __setup, 

945 ... ";", "#")) 

946 ... except ValueError as ve: 

947 ... print(str(ve)[:49]) 

948 Invalid column title 'c;4', must neither be empty 

949 

950 >>> def __error_column_titles_8(keyd: list[str]) -> Iterable[str]: 

951 ... return ("a", "b#x", "c") 

952 

953 >>> try: 

954 ... list(csv_write(dd, __error_column_titles_8, __get_row, __setup, 

955 ... ";", "#")) 

956 ... except ValueError as ve: 

957 ... print(str(ve)[:49]) 

958 Invalid column title 'b#x', must neither be empty 

959 

960 >>> def __error_row_1(keyd: list[str], row: dict[str, int]): 

961 ... return ("bla", None, "blubb") 

962 

963 >>> try: 

964 ... list(csv_write(dd, lambda x: x, __error_row_1, 

965 ... __setup, ";", "#", 

966 ... footer_bottom_comments=lambda _, __: None)) 

967 ... except TypeError as te: 

968 ... print(te) 

969 descriptor 'strip' for 'str' objects doesn't apply to a 'NoneType' object 

970 

971 >>> def __error_row_2(keyd: list[str], row: dict[str, int]): 

972 ... return ("bla", 2.3, "blubb") 

973 

974 >>> try: 

975 ... list(csv_write(dd, lambda x: x, __error_row_2, 

976 ... __setup, ";", "#", 

977 ... footer_bottom_comments=lambda _: None)) 

978 ... except TypeError as te: 

979 ... print(te) 

980 descriptor 'strip' for 'str' objects doesn't apply to a 'float' object 

981 

982 >>> def __error_row_3(keyd: list[str], row: dict[str, int]): 

983 ... return ("bla", "x\ny", "blubb") 

984 

985 >>> try: 

986 ... list(csv_write(dd, lambda x: x, __error_row_3, 

987 ... __setup, ";", "#", 

988 ... footer_bottom_comments=lambda _: None)) 

989 ... except ValueError as ve: 

990 ... print(str(ve)[:50]) 

991 Invalid column value 'x\ny', cannot contain any of 

992 

993 >>> def __error_row_4(keyd: list[str], row: dict[str, int]): 

994 ... return ("bla", "x#", "blubb") 

995 

996 >>> try: 

997 ... list(csv_write(dd, lambda x: x, __error_row_4, 

998 ... __setup, ";", "#", 

999 ... footer_bottom_comments=lambda _: None)) 

1000 ... except ValueError as ve: 

1001 ... print(str(ve)[:50]) 

1002 Invalid column value 'x#', cannot contain any of [ 

1003 

1004 >>> def __error_row_5(keyd: list[str], row: dict[str, int]): 

1005 ... return ("bla", "x;#", "blubb") 

1006 

1007 >>> try: 

1008 ... list(csv_write(dd, lambda x: x, __error_row_5, 

1009 ... __setup, ";", "#")) 

1010 ... except ValueError as ve: 

1011 ... print(str(ve)[:49]) 

1012 Invalid column value 'x;#', cannot contain any of 

1013 

1014 >>> def __error_column_titles_9(keyd: list[str]) -> Iterable[str]: 

1015 ... return ("a", ) 

1016 

1017 >>> def __error_row_6(keyd: list[str], row: dict[str, int]): 

1018 ... return ("", ) 

1019 

1020 >>> try: 

1021 ... list(csv_write(dd, __error_column_titles_9, __error_row_6, 

1022 ... __setup, ";", "#")) 

1023 ... except ValueError as ve: 

1024 ... print(ve) 

1025 Cannot have empty row in a single-column format, but got ['']. 

1026 

1027 >>> def __error_row_7(keyd: list[str], row: dict[str, int]): 

1028 ... return ("x", "y") 

1029 

1030 >>> try: 

1031 ... list(csv_write(dd, __error_column_titles_9, __error_row_7, 

1032 ... __setup, ";", "#")) 

1033 ... except ValueError as ve: 

1034 ... print(ve) 

1035 Too many columns in ['x', 'y'], should be 1. 

1036 

1037 >>> try: 

1038 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

1039 ... "", "#", footer_comments=__err_cmt_1)) 

1040 ... except ValueError as ve: 

1041 ... print(ve) 

1042 Invalid separator ''. 

1043 

1044 >>> try: 

1045 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

1046 ... "x", "#", footer_comments=1)) 

1047 ... except TypeError as te: 

1048 ... print(str(te)[:70]) 

1049 footer_comments should be an instance of typing.Iterable or a callable 

1050 

1051 >>> try: 

1052 ... list(csv_write(dd, lambda x: x, __get_row, __setup, 

1053 ... "x", "#", footer_bottom_comments=1)) 

1054 ... except TypeError as te: 

1055 ... print(str(te)[:70]) 

1056 footer_bottom_comments should be an instance of typing.Iterable or a c 

1057 

1058 >>> ddx = [{"a": 1, "c": 2}, None, 

1059 ... {"a": 4, "d": 12, "b": 3}, {}] 

1060 >>> def __error_row_9(_, __): 

1061 ... return ("1", "2", "3", "4") 

1062 >>> def __error_row_10(_): 

1063 ... __error_row_9(1, 2) 

1064 

1065 >>> try: 

1066 ... list(csv_write(ddx, __error_row_10, 

1067 ... __error_row_9, lambda x: x, ";", "#")) 

1068 ... except TypeError as te: 

1069 ... print(te) 

1070 'NoneType' object is not iterable 

1071 """ 

1072 if not (isinstance(column_titles, Iterable) or callable(column_titles)): 

1073 raise type_error(column_titles, "column_titles", Iterable, call=True) 

1074 if not callable(get_row): 

1075 raise type_error(get_row, "get_row", call=True) 

1076 if not callable(setup): 

1077 raise type_error(setup, "setup", call=True) 

1078 if str.__len__(separator) <= 0: 

1079 raise ValueError(f"Invalid separator {separator!r}.") 

1080 forbidden_marker: Final[set[str]] = set(NEWLINE) 

1081 forbidden_marker.add(separator) 

1082 if comment_start is not None: 

1083 if (str.__len__(comment_start) <= 0) or ( 

1084 str.strip(comment_start) != comment_start) or ( 

1085 comment_start in separator): 

1086 raise ValueError(f"Invalid comment start: {comment_start!r}.") 

1087 forbidden_marker.add(comment_start) 

1088 if (header_comments is not None) and (not (isinstance( 

1089 header_comments, Iterable) or callable(header_comments))): 

1090 raise type_error( 

1091 header_comments, "header_comments", Iterable, call=True) 

1092 if (footer_comments is not None) and (not (isinstance( 

1093 footer_comments, Iterable) or callable(footer_comments))): 

1094 raise type_error( 

1095 footer_comments, "footer_comments", Iterable, call=True) 

1096 if (footer_bottom_comments is not None) and (not (isinstance( 

1097 footer_bottom_comments, Iterable) or callable( 

1098 footer_bottom_comments))): 

1099 raise type_error(footer_bottom_comments, 

1100 "footer_bottom_comments", Iterable, call=True) 

1101 

1102 data = reiterable(data) # make sure we can iterate over the data twice 

1103 setting: Final[S] = setup(data) 

1104 forbidden: Final[list[str]] = sorted(forbidden_marker) 

1105 

1106 # first put header comments 

1107 if (comment_start is not None) and (header_comments is not None): 

1108 yield from __print_comments( 

1109 header_comments(setting) if callable(header_comments) 

1110 else header_comments, comment_start, "header", False) 

1111 

1112 # now process the column titles 

1113 collected: list[str] = list( 

1114 column_titles(setting) if callable(column_titles) else column_titles) 

1115 col_count: Final[int] = list.__len__(collected) 

1116 if col_count <= 0: 

1117 raise ValueError("Cannot have zero columns.") 

1118 for i, col in enumerate(collected): 

1119 collected[i] = xcol = str.strip(col) 

1120 if (str.__len__(xcol) <= 0) or any(map(xcol.__contains__, forbidden)): 

1121 raise ValueError(f"Invalid column title {col!r}, must neither be" 

1122 f" empty nor contain any of {forbidden!r}.") 

1123 if set.__len__(set(collected)) != col_count: 

1124 raise ValueError(f"Cannot have duplicated columns: {collected!r}.") 

1125 yield separator.join(collected) 

1126 

1127 # now do the single rows 

1128 for element in data: 

1129 if element is None: 

1130 raise type_error(element, "data element", object) 

1131 collected.clear() 

1132 collected.extend(get_row(setting, element)) 

1133 list_len: int = list.__len__(collected) 

1134 if list_len > col_count: 

1135 raise ValueError( 

1136 f"Too many columns in {collected!r}, should be {col_count}.") 

1137 last_non_empty: int = -1 

1138 for i, col in enumerate(collected): 

1139 collected[i] = xcol = str.strip(col) 

1140 if any(map(xcol.__contains__, forbidden)): 

1141 raise ValueError(f"Invalid column value {col!r}, cannot " 

1142 f"contain any of {forbidden!r}.") 

1143 if str.__len__(xcol) > 0: 

1144 last_non_empty = i + 1 

1145 if last_non_empty < list_len: 

1146 if last_non_empty <= 0: 

1147 if col_count <= 1: 

1148 raise ValueError( 

1149 f"Cannot have empty row in a single-column format, " 

1150 f"but got {collected!r}.") 

1151 yield separator 

1152 continue 

1153 del collected[last_non_empty:] 

1154 yield separator.join(collected) 

1155 

1156 # finally put footer comments 

1157 if comment_start is not None: 

1158 empty_next: bool = False 

1159 if footer_comments is not None: 

1160 for c in __print_comments(footer_comments(setting) if callable( 

1161 footer_comments) else footer_comments, comment_start, 

1162 "footer", False): 

1163 yield c 

1164 empty_next = True 

1165 if footer_bottom_comments is not None: 

1166 yield from __print_comments( 

1167 footer_bottom_comments(setting) if callable( 

1168 footer_bottom_comments) else footer_bottom_comments, 

1169 comment_start, "footer bottom", empty_next) 

1170 

1171 

1172def csv_str_or_none(data: list[str | None] | None, 

1173 index: int | None) -> str | None: 

1174 """ 

1175 Get a string or `None` from a data row. 

1176 

1177 This function is a shortcut for when data elements or columns are 

1178 optional. If `index` is `None` or outside of the valid index range of the 

1179 list `data`, then `None` is returned. If `data` itself is `None` or the 

1180 element at index `index` is the empty string, then `None` is returned. 

1181 Only if `data` and `index` are both not `None` and `index` is a valid 

1182 index into `data` and the element at index `index` in `data` is not the 

1183 empty string, then this element is returned. In other words, this is a 

1184 very tolerant function to handle optional data and to return `None` if the 

1185 data is not present. The function :func:`csv_val_or_none` further extends 

1186 this function by converting the data to another data type if it is 

1187 present. 

1188 

1189 :param data: the data 

1190 :param index: the index, if any 

1191 :returns: the string or nothing 

1192 

1193 >>> ddd = ["a", "b", "", "d"] 

1194 >>> print(csv_str_or_none(ddd, 0)) 

1195 a 

1196 >>> print(csv_str_or_none(ddd, 1)) 

1197 b 

1198 >>> print(csv_str_or_none(ddd, 2)) 

1199 None 

1200 >>> print(csv_str_or_none(ddd, 3)) 

1201 d 

1202 >>> print(csv_str_or_none(ddd, None)) 

1203 None 

1204 >>> print(csv_str_or_none(ddd, 10)) 

1205 None 

1206 >>> print(csv_str_or_none(ddd, -1)) 

1207 None 

1208 >>> print(csv_str_or_none(None, 0)) 

1209 None 

1210 """ 

1211 if (index is None) or (data is None): 

1212 return None 

1213 if 0 <= index <= list.__len__(data): 

1214 d: str = data[index] 

1215 return None if (d is None) or (str.__len__(d) <= 0) else d 

1216 return None 

1217 

1218 

1219#: a type variable for :func:`csv_val_or_none`. 

1220U = TypeVar("U") 

1221 

1222 

1223def csv_val_or_none(data: list[str | None] | None, index: int | None, 

1224 conv: Callable[[str], U]) -> U | None: 

1225 """ 

1226 Get a value or `None`. 

1227 

1228 See :func:`csv_str_or_none` allows us to extract an optional data element 

1229 from a CSV row and get `None` if the element is not present or if the 

1230 `index` is `None` or outside of the valid range. In case the data is 

1231 present and not the empty string, then the function `conv` is invoked to 

1232 convert it to another value. Otherwise, `None` is returned. 

1233 

1234 :param data: the data 

1235 :param index: the index 

1236 :param conv: the conversation function 

1237 :returns: the object 

1238 

1239 >>> ddd = ["11", "22", "", "33"] 

1240 >>> print(csv_val_or_none(ddd, 0, int)) 

1241 11 

1242 >>> print(csv_val_or_none(ddd, 1, int)) 

1243 22 

1244 >>> print(csv_val_or_none(ddd, 2, int)) 

1245 None 

1246 >>> print(csv_val_or_none(ddd, 3, int)) 

1247 33 

1248 >>> print(csv_val_or_none(ddd, None, int)) 

1249 None 

1250 """ 

1251 t: Final[str | None] = csv_str_or_none(data, index) 

1252 return None if t is None else conv(t) 

1253 

1254 

1255def csv_column(columns: dict[str, int], key: str, 

1256 remove_col: bool = True) -> int: 

1257 """ 

1258 Get the index of a CSV column. 

1259 

1260 This function will extract the index of a column from a column description 

1261 map. The index will be checked whether it is in a valid range and 

1262 returned. If no column fitting to `key` exists, this function will throw a 

1263 `KeyError`. If `remove_col` is `True` and a column fitting to `key` 

1264 exists, then this column will be deleted from `columns`. 

1265 

1266 :param columns: the columns set 

1267 :param key: the key 

1268 :param remove_col: should we remove the column? 

1269 :returns: the column 

1270 :raises TypeError: if any of the parameters is not of the prescribed type 

1271 :raises ValueError: if the column or key are invalid 

1272 :raises KeyError: if no column of the name `key` eixists 

1273 

1274 >>> csv_column({"a": 5}, "a") 

1275 5 

1276 

1277 >>> cols = {"a": 5, "b": 7} 

1278 >>> csv_column(cols, "a", False) 

1279 5 

1280 >>> cols 

1281 {'a': 5, 'b': 7} 

1282 >>> csv_column(cols, "a", True) 

1283 5 

1284 >>> cols 

1285 {'b': 7} 

1286 

1287 >>> try: 

1288 ... csv_column({"a": 5}, "b") 

1289 ... except KeyError as ke: 

1290 ... print(ke) 

1291 'b' 

1292 

1293 >>> try: 

1294 ... csv_column({"a": 5}, "a", "3") 

1295 ... except TypeError as te: 

1296 ... print(te) 

1297 remove_col should be an instance of bool but is str, namely '3'. 

1298 

1299 >>> try: 

1300 ... csv_column(None, "b") 

1301 ... except TypeError as te: 

1302 ... print(str(te)[:50]) 

1303 descriptor '__getitem__' for 'dict' objects doesn' 

1304 

1305 >>> try: 

1306 ... csv_column({"a": 5}, 1) 

1307 ... except TypeError as te: 

1308 ... print(te) 

1309 descriptor '__len__' requires a 'str' object but received a 'int' 

1310 

1311 >>> try: 

1312 ... csv_column({"a": -1}, "a") 

1313 ... except ValueError as ve: 

1314 ... print(ve) 

1315 a=-1 is invalid, must be in 0..1000000. 

1316 

1317 >>> try: 

1318 ... csv_column({"a": -1}, "") 

1319 ... except ValueError as ve: 

1320 ... print(ve) 

1321 Invalid key ''. 

1322 """ 

1323 if str.__len__(key) <= 0: 

1324 raise ValueError(f"Invalid key {key!r}.") 

1325 if not isinstance(remove_col, bool): 

1326 raise type_error(remove_col, "remove_col", bool) 

1327 res: Final[int] = check_int_range(dict.__getitem__( 

1328 columns, key), key, 0, 1_000_000) 

1329 if remove_col: 

1330 dict.__delitem__(columns, key) 

1331 return res 

1332 

1333 

1334def csv_column_or_none(columns: dict[str, int] | None = None, 

1335 key: str | None = None, 

1336 remove_col: bool = True) -> int | None: 

1337 """ 

1338 Get an optional CSV column index. 

1339 

1340 This function will extract the index of a column from a column description 

1341 map. The index will be checked whether it is in a valid range and 

1342 returned. If no column fitting to `key` exists, this function returns 

1343 `None`. If `remove_col` is `True` and a column fitting to `key` exists, 

1344 then this column will be deleted from `columns`. 

1345 

1346 :param columns: the columns 

1347 :param key: the key 

1348 :param remove_col: should we remove the column? 

1349 :returns: the column, or `None` if none was found 

1350 :raises TypeError: if any of the parameters is not of the prescribed type 

1351 :raises ValueError: if the column or key are invalid 

1352 

1353 >>> csv_column_or_none({"a": 5}, "a") 

1354 5 

1355 

1356 >>> cols = {"a": 5, "b": 7} 

1357 >>> csv_column_or_none(cols, "a", False) 

1358 5 

1359 >>> cols 

1360 {'a': 5, 'b': 7} 

1361 >>> csv_column_or_none(cols, "a", True) 

1362 5 

1363 >>> cols 

1364 {'b': 7} 

1365 

1366 >>> try: 

1367 ... csv_column_or_none({"a": 5}, "a", "3") 

1368 ... except TypeError as te: 

1369 ... print(te) 

1370 remove_col should be an instance of bool but is str, namely '3'. 

1371 

1372 >>> print(csv_column_or_none({"a": 5}, "b")) 

1373 None 

1374 

1375 >>> print(csv_column_or_none(None, "b")) 

1376 None 

1377 

1378 >>> print(csv_column_or_none({"a": 5}, None)) 

1379 None 

1380 

1381 >>> print(csv_column_or_none({"a": 5}, "")) 

1382 None 

1383 

1384 >>> try: 

1385 ... csv_column({"a": 5}, 1) 

1386 ... except TypeError as te: 

1387 ... print(te) 

1388 descriptor '__len__' requires a 'str' object but received a 'int' 

1389 

1390 >>> try: 

1391 ... csv_column({"a": -1}, "a") 

1392 ... except ValueError as ve: 

1393 ... print(ve) 

1394 a=-1 is invalid, must be in 0..1000000. 

1395 """ 

1396 if not isinstance(remove_col, bool): 

1397 raise type_error(remove_col, "remove_col", bool) 

1398 if (key is None) or (columns is None) or (str.__len__(key) <= 0): 

1399 return None 

1400 res: Final[int | None] = dict.get(columns, key) 

1401 if res is None: 

1402 return None 

1403 check_int_range(res, key, 0, 1_000_000) 

1404 if remove_col: 

1405 dict.__delitem__(columns, key) 

1406 return res 

1407 

1408 

1409def csv_select_scope( 

1410 conv: Callable[[dict[str, int]], U], 

1411 columns: dict[str, int], 

1412 scope: str | None = None, 

1413 additional: Iterable[tuple[str, int]] = (), 

1414 skip_orig_key: Callable[[str], bool] = lambda _: False, 

1415 skip_final_key: Callable[[str], bool] = lambda _: False, 

1416 skip_col: Callable[[int], bool] = lambda _: False, 

1417 include_scope: bool = True, 

1418 remove_cols: bool = True) -> U: 

1419 """ 

1420 Get all the columns of a given scope and pass them to the function `conv`. 

1421 

1422 This function is intended for selecting some keys from a column set and 

1423 pass them as parameters to a constructor of a CSV reader. It can do this 

1424 selection based on a `scope` prefix which is then removed from the column 

1425 names before passing them into the constructor. If no column matches, this 

1426 function throws a :class:`ValueError`. 

1427 All columns that are passed on to `conv` are deleted from `columns` if 

1428 `remove_cols == True`, which is the default. 

1429 

1430 :param conv: the function to which the selected columns should be passed, 

1431 and that creates the return value 

1432 :param columns: the existing columns 

1433 :param scope: the scope, or `None` or the empty string to select all 

1434 columns 

1435 :param skip_orig_key: a function that returns `True` for any original, 

1436 unchanged key in `columns` that should be ignored and that 

1437 returns `False` if the key can be processed normally (i.e., if we can 

1438 check if it starts with the given scope and move on) 

1439 :param skip_final_key: a function that returns `True` for any key in 

1440 `columns` that would fall into the right scope but that should still 

1441 be ignored. This function receives the key without the scope prefix. 

1442 :param skip_col: any column that should be ignored 

1443 :param additional: the additional columns to add *if* some keys/columns 

1444 remain after all the transformation and selection 

1445 :param include_scope: if scope appears as a lone column, should we 

1446 include it? 

1447 :param remove_cols: should we remove all selected columns? 

1448 :returns: The result of the function `conv` applied to all matching 

1449 columns (and those in `additional` are appended to them) 

1450 :raises ValueError: if no columns could be selected 

1451 :raises TypeError: if any of the elements passed in is of the wrong type 

1452 

1453 >>> csv_select_scope(lambda x: x, { 

1454 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "") 

1455 {'a.x': 1, 'a.y': 2, 'a': 3, 'b': 4, 'b.t': 5} 

1456 

1457 >>> try: 

1458 ... csv_select_scope(print, {"a.x": 1, "a.y": 2}, "v") 

1459 ... except ValueError as ve: 

1460 ... print(ve) 

1461 Did not find sufficient data of scope 'v' in {'a.x': 1, 'a.y': 2}. 

1462 

1463 >>> try: 

1464 ... csv_select_scope(print, {}, "v") 

1465 ... except ValueError as ve: 

1466 ... print(ve) 

1467 Did not find sufficient data of scope 'v' in {}. 

1468 """ 

1469 res: Final[U | None] = csv_select_scope_or_none( 

1470 conv, columns, scope, additional, skip_orig_key, skip_final_key, 

1471 skip_col, include_scope, remove_cols) \ 

1472 if dict.__len__(columns) > 0 else None 

1473 if res is None: 

1474 raise ValueError("Did not find sufficient data of " 

1475 f"scope {scope!r} in {columns!r}.") 

1476 return res 

1477 

1478 

1479def csv_select_scope_or_none( 

1480 conv: Callable[[dict[str, int]], U], 

1481 columns: dict[str, int] | None, 

1482 scope: str | None = None, 

1483 additional: Iterable[tuple[str, int]] = (), 

1484 skip_orig_key: Callable[[str], bool] = lambda _: False, 

1485 skip_final_key: Callable[[str], bool] = lambda _: False, 

1486 skip_col: Callable[[int], bool] = lambda _: False, 

1487 include_scope: bool = True, 

1488 remove_cols: bool = True) -> U | None: 

1489 """ 

1490 Get all the columns of a given scope and pass them to the function `conv`. 

1491 

1492 This function is intended for selecting some keys from a column set and 

1493 pass them as parameters to a constructor of a CSV reader. It can do this 

1494 selection based on a `scope` prefix which is then removed from the column 

1495 names before passing them into the constructor. If no column matches, this 

1496 function returns `None`. 

1497 All columns that are passed on to `conv` are deleted from `columns` if 

1498 `remove_cols == True`, which is the default. 

1499 

1500 :param conv: the function to which the selected columns should be passed, 

1501 if any, and that - in this case, returns the return value of this 

1502 function 

1503 :param columns: the existing columns 

1504 :param scope: the scope, or `None` or the empty string to select all 

1505 columns 

1506 :param skip_orig_key: a function that returns `True` for any original, 

1507 unchanged key in `columns` that should be ignored and that 

1508 returns `False` if the key can be processed normally (i.e., if we can 

1509 check if it starts with the given scope and move on) 

1510 :param skip_final_key: a function that returns `True` for any key in 

1511 `columns` that would fall into the right scope but that should still 

1512 be ignored. This function receives the key without the scope prefix. 

1513 :param skip_col: any column that should be ignored 

1514 :param additional: the additional columns to add *if* some keys/columns 

1515 remain after all the transformation and selection 

1516 :param include_scope: if scope appears as a lone column, should we 

1517 include it? 

1518 :param remove_cols: should we remove all selected columns? 

1519 :returns: `None` if no keys fall into the provided scope does not have any 

1520 keys matching it in `columns`. The result of `conv` otherwise, i.e., 

1521 if there are matching columns, these are selected (and those in 

1522 `additional` are appended to them) and these are then passed to `conv` 

1523 and the result of `conv` is returned 

1524 

1525 >>> csv_select_scope_or_none(print, { 

1526 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a") 

1527 {'x': 1, 'y': 2, 'a': 3} 

1528 

1529 >>> exa1 = {"a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5} 

1530 >>> csv_select_scope_or_none(print, exa1, "a", remove_cols=False) 

1531 {'x': 1, 'y': 2, 'a': 3} 

1532 >>> exa1 

1533 {'a.x': 1, 'a.y': 2, 'a': 3, 'b': 4, 'b.t': 5} 

1534 >>> csv_select_scope_or_none(print, exa1, "a", remove_cols=True) 

1535 {'x': 1, 'y': 2, 'a': 3} 

1536 >>> exa1 

1537 {'b': 4, 'b.t': 5} 

1538 >>> csv_select_scope_or_none(print, exa1, "b", remove_cols=True) 

1539 {'b': 4, 't': 5} 

1540 >>> exa1 

1541 {} 

1542 

1543 >>> csv_select_scope_or_none(print, { 

1544 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "") 

1545 {'a.x': 1, 'a.y': 2, 'a': 3, 'b': 4, 'b.t': 5} 

1546 

1547 >>> csv_select_scope_or_none(print, { 

1548 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, None) 

1549 {'a.x': 1, 'a.y': 2, 'a': 3, 'b': 4, 'b.t': 5} 

1550 

1551 >>> csv_select_scope_or_none(print, { 

1552 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1553 ... include_scope=False) 

1554 {'x': 1, 'y': 2} 

1555 

1556 >>> csv_select_scope_or_none(print, { 

1557 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b") 

1558 {'b': 4, 't': 5} 

1559 

1560 >>> csv_select_scope_or_none(print, { 

1561 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b", 

1562 ... additional=(('z', 23), ('v', 45))) 

1563 {'b': 4, 't': 5, 'z': 23, 'v': 45} 

1564 

1565 >>> csv_select_scope_or_none(print, { 

1566 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b", 

1567 ... additional=(('t', 23), ('v', 45))) 

1568 {'b': 4, 't': 5, 'v': 45} 

1569 

1570 >>> csv_select_scope_or_none(print, { 

1571 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1572 ... additional=(('x', 44), ('v', 45))) 

1573 {'x': 1, 'y': 2, 'a': 3, 'v': 45} 

1574 

1575 >>> csv_select_scope_or_none(print, { 

1576 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b", 

1577 ... additional=(('z', 23), ('v', 45)), 

1578 ... skip_col=lambda c: c == 23) 

1579 {'b': 4, 't': 5, 'v': 45} 

1580 

1581 >>> csv_select_scope_or_none(print, { 

1582 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b", 

1583 ... additional=(('z', 23), ('v', 45)), 

1584 ... skip_orig_key=lambda ok: ok == "b.t") 

1585 {'b': 4, 'z': 23, 'v': 45} 

1586 

1587 >>> csv_select_scope_or_none(print, { 

1588 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b", 

1589 ... additional=(('z', 23), ('v', 45)), 

1590 ... skip_final_key=lambda fk: fk == "z") 

1591 {'b': 4, 't': 5, 'v': 45} 

1592 

1593 >>> print(csv_select_scope_or_none(print, {}, "a")) 

1594 None 

1595 

1596 >>> print(csv_select_scope_or_none(print, {}, None)) 

1597 None 

1598 

1599 >>> print(csv_select_scope_or_none(print, None, None)) 

1600 None 

1601 

1602 >>> print(csv_select_scope_or_none(print, {"a.x": 45}, "a", 

1603 ... skip_col=lambda c: c == 45)) 

1604 None 

1605 

1606 >>> try: 

1607 ... csv_select_scope_or_none(None, { 

1608 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a") 

1609 ... except TypeError as te: 

1610 ... print(te) 

1611 conv should be a callable but is None. 

1612 

1613 >>> try: 

1614 ... csv_select_scope_or_none(print, { 

1615 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1616 ... remove_cols=1) 

1617 ... except TypeError as te: 

1618 ... print(te) 

1619 remove_cols should be an instance of bool but is int, namely 1. 

1620 

1621 >>> try: 

1622 ... csv_select_scope_or_none("x", { 

1623 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a") 

1624 ... except TypeError as te: 

1625 ... print(te) 

1626 conv should be a callable but is str, namely 'x'. 

1627 

1628 >>> try: 

1629 ... csv_select_scope_or_none(print, "x", "a") 

1630 ... except TypeError as te: 

1631 ... print(te) 

1632 descriptor '__len__' requires a 'dict' object but received a 'str' 

1633 

1634 >>> try: 

1635 ... csv_select_scope_or_none(print, { 

1636 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, int) 

1637 ... except TypeError as te: 

1638 ... print(te) 

1639 descriptor '__len__' requires a 'str' object but received a 'type' 

1640 

1641 >>> try: 

1642 ... csv_select_scope_or_none(print, { 

1643 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1644 ... additional=2) 

1645 ... except TypeError as te: 

1646 ... print(str(te)[:-7]) 

1647 additional should be an instance of typing.Iterable but is int, na 

1648 

1649 >>> try: 

1650 ... csv_select_scope_or_none(print, { 

1651 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1652 ... additional=((1, 2), )) 

1653 ... except TypeError as te: 

1654 ... print(te) 

1655 descriptor '__len__' requires a 'str' object but received a 'int' 

1656 

1657 >>> try: 

1658 ... csv_select_scope_or_none(print, { 

1659 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1660 ... additional=(None, )) 

1661 ... except TypeError as te: 

1662 ... print(te) 

1663 cannot unpack non-iterable NoneType object 

1664 

1665 >>> try: 

1666 ... csv_select_scope_or_none(print, { 

1667 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1668 ... additional=(("yx", "a"), )) 

1669 ... except TypeError as te: 

1670 ... print(te) 

1671 yx should be an instance of int but is str, namely 'a'. 

1672 

1673 >>> try: 

1674 ... csv_select_scope_or_none(print, { 

1675 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1676 ... additional=(("yx", -2), )) 

1677 ... except ValueError as ve: 

1678 ... print(ve) 

1679 yx=-2 is invalid, must be in 0..1000000. 

1680 

1681 >>> try: 

1682 ... csv_select_scope_or_none(print, { 

1683 ... "a.x": 1, "a.y": 2, "a": 3, "a.b": -4, "b.t": 5}, "a") 

1684 ... except ValueError as ve: 

1685 ... print(ve) 

1686 a.b=-4 is invalid, must be in 0..1000000. 

1687 

1688 >>> try: 

1689 ... csv_select_scope_or_none(print, { 

1690 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1691 ... skip_col=None) 

1692 ... except TypeError as te: 

1693 ... print(te) 

1694 skip_col should be a callable but is None. 

1695 

1696 >>> try: 

1697 ... csv_select_scope_or_none(print, { 

1698 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1699 ... skip_orig_key=None) 

1700 ... except TypeError as te: 

1701 ... print(te) 

1702 skip_orig_key should be a callable but is None. 

1703 

1704 >>> try: 

1705 ... csv_select_scope_or_none(print, { 

1706 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1707 ... skip_final_key=None) 

1708 ... except TypeError as te: 

1709 ... print(te) 

1710 skip_final_key should be a callable but is None. 

1711 

1712 >>> try: 

1713 ... csv_select_scope(print, { 

1714 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1715 ... include_scope=3) 

1716 ... except TypeError as te: 

1717 ... print(te) 

1718 include_scope should be an instance of bool but is int, namely 3. 

1719 

1720 >>> try: 

1721 ... csv_select_scope_or_none(print, { 

1722 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, 4) 

1723 ... except TypeError as te: 

1724 ... print(te) 

1725 descriptor '__len__' requires a 'str' object but received a 'int' 

1726 

1727 >>> try: 

1728 ... csv_select_scope_or_none(print, 11) 

1729 ... except TypeError as te: 

1730 ... print(te) 

1731 descriptor '__len__' requires a 'dict' object but received a 'int' 

1732 

1733 >>> try: 

1734 ... csv_select_scope_or_none(print, { 

1735 ... "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a", 

1736 ... additional=(("", 2), )) 

1737 ... except ValueError as ve: 

1738 ... print(ve) 

1739 Invalid additional column ''. 

1740 """ 

1741 if not callable(conv): 

1742 raise type_error(conv, "conv", call=True) 

1743 if not callable(skip_orig_key): 

1744 raise type_error(skip_orig_key, "skip_orig_key", call=True) 

1745 if not callable(skip_final_key): 

1746 raise type_error(skip_final_key, "skip_final_key", call=True) 

1747 if not isinstance(additional, Iterable): 

1748 raise type_error(additional, "additional", Iterable) 

1749 if not isinstance(include_scope, bool): 

1750 raise type_error(include_scope, "include_scope", bool) 

1751 if not callable(skip_col): 

1752 raise type_error(skip_col, "skip_col", call=True) 

1753 if not isinstance(remove_cols, bool): 

1754 raise type_error(remove_cols, "remove_cols", bool) 

1755 

1756 if (columns is None) or (dict.__len__(columns) <= 0): 

1757 return None 

1758 selection: Final[list[tuple[str, str, int]]] = [ 

1759 (k, k, v) for k, v in columns.items() 

1760 if not (skip_orig_key(k) or skip_col(v))] 

1761 sel_len: Final[int] = list.__len__(selection) 

1762 if sel_len <= 0: 

1763 return None 

1764 

1765 if (scope is not None) and (str.__len__(scope) > 0): 

1766 use_scope: Final[str] = f"{scope}{SCOPE_SEPARATOR}" 

1767 usl: Final[int] = str.__len__(use_scope) 

1768 for i in range(sel_len - 1, -1, -1): 

1769 k, _, v = selection[i] 

1770 if str.startswith(k, use_scope): 

1771 use_key = k[usl:] 

1772 if not skip_final_key(use_key): 

1773 list.__setitem__(selection, i, (k, use_key, v)) 

1774 continue 

1775 elif include_scope and (k == scope): 

1776 if not skip_final_key(k): 

1777 continue 

1778 list.__delitem__(selection, i) 

1779 

1780 if list.__len__(selection) <= 0: 

1781 return None 

1782 

1783 if remove_cols: 

1784 for kv in selection: 

1785 dict.__delitem__(columns, kv[0]) 

1786 

1787 subset: Final[dict[str, int]] = { 

1788 kv[1]: check_int_range( 

1789 kv[2], kv[0], 0, 1_000_000) for kv in selection} 

1790 

1791 for kkk, vvv in additional: 

1792 if str.__len__(kkk) <= 0: 

1793 raise ValueError(f"Invalid additional column {kkk!r}.") 

1794 if skip_final_key(kkk) or skip_col(vvv): 

1795 continue 

1796 if kkk not in subset: 

1797 subset[kkk] = check_int_range(vvv, kkk, 0, 1_000_000) 

1798 return conv(subset) 

1799 

1800 

1801class CsvReader[T]: 

1802 """ 

1803 A base class for CSV readers. 

1804 

1805 Using this class and its :meth:`read` class method provides for a more 

1806 elegant way to construct nested and combined CSV formats compared to 

1807 creating classes and handing their methods to :func:`csv_read`. 

1808 

1809 >>> class R(CsvReader): 

1810 ... def __init__(self, columns: dict[str, int]) -> None: 

1811 ... super().__init__(columns) 

1812 ... self.cols = columns 

1813 ... def parse_row(self, row: list[str]) -> dict: 

1814 ... return {x: row[y] for x, y in self.cols.items()} 

1815 

1816 >>> text = ["a;b;c;d", "# test", " 1; 2;3;4", " 5 ;6 ", ";8;;9", 

1817 ... "", "10", "# 11;12"] 

1818 

1819 >>> for p in R.read(text): 

1820 ... print(p) 

1821 {'a': '1', 'b': '2', 'c': '3', 'd': '4'} 

1822 {'a': '5', 'b': '6', 'c': '', 'd': ''} 

1823 {'a': '', 'b': '8', 'c': '', 'd': '9'} 

1824 {'a': '10', 'b': '', 'c': '', 'd': ''} 

1825 

1826 >>> text = ["a,b,c,d", "v test", " 1, 2,3,4", " 5 ,6 ", ",8,,9", 

1827 ... "", "10", "v 11,12"] 

1828 

1829 >>> for p in R.read(text, separator=',', comment_start='v'): 

1830 ... print(p) 

1831 {'a': '1', 'b': '2', 'c': '3', 'd': '4'} 

1832 {'a': '5', 'b': '6', 'c': '', 'd': ''} 

1833 {'a': '', 'b': '8', 'c': '', 'd': '9'} 

1834 {'a': '10', 'b': '', 'c': '', 'd': ''} 

1835 

1836 >>> class S(CsvReader): 

1837 ... def __init__(self, columns: dict[str, int], add: str) -> None: 

1838 ... super().__init__(columns) 

1839 ... self.cols = columns 

1840 ... self.s = add 

1841 ... def parse_row(self, row: list[str]) -> dict: 

1842 ... return {x: self.s + row[y] for x, y in self.cols.items()} 

1843 

1844 >>> text = ["a;b;c;d", "# test", " 1; 2;3;4", " 5 ;6 ", ";8;;9", 

1845 ... "", "10", "# 11;12"] 

1846 

1847 >>> for p in S.read(text, add="b"): 

1848 ... print(p) 

1849 {'a': 'b1', 'b': 'b2', 'c': 'b3', 'd': 'b4'} 

1850 {'a': 'b5', 'b': 'b6', 'c': 'b', 'd': 'b'} 

1851 {'a': 'b', 'b': 'b8', 'c': 'b', 'd': 'b9'} 

1852 {'a': 'b10', 'b': 'b', 'c': 'b', 'd': 'b'} 

1853 

1854 >>> ccc = S({"a": 1}, add="x") 

1855 >>> print(ccc.parse_optional_row(None)) 

1856 None 

1857 >>> print(S.parse_optional_row(None, None)) 

1858 None 

1859 >>> print((ccc).parse_optional_row(["x", "y"])) 

1860 {'a': 'xy'} 

1861 

1862 >>> try: 

1863 ... CsvReader("x") 

1864 ... except TypeError as te: 

1865 ... print(te) 

1866 columns should be an instance of dict but is str, namely 'x'. 

1867 

1868 >>> try: 

1869 ... CsvReader({"a": 1}).parse_row(["a"]) 

1870 ... except NotImplementedError as nie: 

1871 ... print(type(nie)) 

1872 <class 'NotImplementedError'> 

1873 """ 

1874 

1875 def __init__(self, columns: dict[str, int]) -> None: 

1876 """ 

1877 Create the CSV reader. 

1878 

1879 :param columns: the columns 

1880 :raises TypeError: if `columns` is not a :class:`dict` 

1881 """ 

1882 super().__init__() 

1883 if not isinstance(columns, dict): 

1884 raise type_error(columns, "columns", dict) 

1885 

1886 def parse_row(self, data: list[str]) -> T: 

1887 """ 

1888 Parse a row of data. 

1889 

1890 :param data: the data row 

1891 :returns: the object representing the row 

1892 :raises NotImplementedError: because it must be overridden 

1893 :raises ValueError: should raise a :class:`ValueError` if the row is 

1894 incomplete or invalid 

1895 """ 

1896 raise NotImplementedError 

1897 

1898 def parse_optional_row(self, data: list[str] | None) -> T | None: 

1899 """ 

1900 Parse a row of data that may be incomplete or empty. 

1901 

1902 The default implementation of this method returns `None` if the data 

1903 row is `None`, or if `self` is `None`, which should never happen. 

1904 Otherwise, it calls :meth:`parse_row`, which will probably raise a 

1905 :class:`ValueError`. 

1906 

1907 :param data: the row of data that may be empty 

1908 :returns: an object constructed from the partial row, if possible, 

1909 or `None` 

1910 """ 

1911 if (self is None) or (data is None): 

1912 return None 

1913 return self.parse_row(data) 

1914 

1915 @classmethod 

1916 def read(cls: type["CsvReader"], rows: Iterable[str], 

1917 separator: str = CSV_SEPARATOR, 

1918 comment_start: str | None = COMMENT_START, 

1919 **kwargs) -> Generator[T, None, None]: 

1920 """ 

1921 Parse a stream of CSV data. 

1922 

1923 This class method creates a single new instance of `cls` and passes it 

1924 the column names/indices as well as any additional named arguments of 

1925 this method into the constructor. It then uses the method 

1926 :meth:`parse_row` of the class to parse the row data to generate the 

1927 output stream. 

1928 

1929 It offers a more convenient wrapper around :func:`csv_read` for cases 

1930 where it makes more sense to implement the parsing functionality in a 

1931 class. 

1932 

1933 :param rows: the rows of strings with CSV data 

1934 :param separator: the separator character 

1935 :param comment_start: the comment start character 

1936 """ 

1937 def __creator(y: dict[str, int], __c=cls, # pylint: disable=W0102 

1938 __x=kwargs) -> "CsvReader": # noqa # type: ignore 

1939 return cls(y, **__x) # noqa # type: ignore 

1940 

1941 yield from csv_read(rows=rows, 

1942 setup=__creator, 

1943 parse_row=cls.parse_row, # type: ignore 

1944 separator=separator, 

1945 comment_start=comment_start) 

1946 

1947 

1948class CsvWriter[T]: 

1949 """ 

1950 A base class for structured CSV writers. 

1951 

1952 >>> class W(CsvWriter): 

1953 ... def __init__(self, data: Iterable[dict[str, int]], 

1954 ... scope: str | None = None) -> None: 

1955 ... super().__init__(data, scope) 

1956 ... self.rows = sorted({dkey for datarow in data 

1957 ... for dkey in datarow}) 

1958 ... def get_column_titles(self) -> Iterable[str]: 

1959 ... return self.rows 

1960 ... def get_row(self, row: dict[str, int]) -> Iterable[str]: 

1961 ... return map(str, (row.get(key, "") for key in self.rows)) 

1962 ... def get_header_comments(self) -> list[str]: 

1963 ... return ["This is a header comment.", " We have two of it. "] 

1964 ... def get_footer_comments(self) -> list[str]: 

1965 ... return [" This is a footer comment."] 

1966 

1967 >>> dd = [{"a": 1, "c": 2}, {"b": 6, "c": 8}, 

1968 ... {"a": 4, "d": 12, "b": 3}, {}] 

1969 

1970 >>> for p in W.write(dd): 

1971 ... print(p[:-8] if "version" in p else p) 

1972 # This is a header comment. 

1973 # We have two of it. 

1974 a;b;c;d 

1975 1;;2 

1976 ;6;8 

1977 4;3;;12 

1978 ; 

1979 # This is a footer comment. 

1980 # 

1981 # This CSV output has been created using the versatile CSV API of \ 

1982pycommons.io.csv, version 

1983 # You can find pycommons at https://thomasweise.github.io/pycommons. 

1984 

1985 >>> class W2(CsvWriter): 

1986 ... def __init__(self, data: Iterable[dict[str, int]], 

1987 ... scope: str | None = None) -> None: 

1988 ... super().__init__(data, scope) 

1989 ... self.rows = sorted({dkey for datarow in data 

1990 ... for dkey in datarow}) 

1991 ... def get_column_titles(self) -> Iterable[str]: 

1992 ... return self.rows if self.scope is None else [ 

1993 ... f"{self.scope}.{r}" for r in self.rows] 

1994 ... def get_row(self, row: dict[str, int]) -> Iterable[str]: 

1995 ... return map(str, (row.get(key, "") for key in self.rows)) 

1996 ... def get_footer_bottom_comments(self) -> Iterable[str] | None: 

1997 ... return ["Bla!"] 

1998 

1999 >>> for p in W2.write(dd, separator="@", comment_start="B"): 

2000 ... print(p) 

2001 a@b@c@d 

2002 1@@2 

2003 @6@8 

2004 4@3@@12 

2005 @ 

2006 B Bla! 

2007 

2008 >>> for p in W2.write(dd, scope="k", separator="@", comment_start="B"): 

2009 ... print(p) 

2010 k.a@k.b@k.c@k.d 

2011 1@@2 

2012 @6@8 

2013 4@3@@12 

2014 @ 

2015 B Bla! 

2016 

2017 >>> ";".join(W2(dd).get_optional_row(None)) 

2018 ';;;' 

2019 >>> ";".join(W2(dd).get_optional_row(dd[0])) 

2020 '1;;2;' 

2021 

2022 >>> try: 

2023 ... CsvWriter(1, None) 

2024 ... except TypeError as te: 

2025 ... print(te) 

2026 data should be an instance of typing.Iterable but is int, namely 1. 

2027 

2028 >>> try: 

2029 ... CsvWriter([], 1) 

2030 ... except TypeError as te: 

2031 ... print(te) 

2032 descriptor 'strip' for 'str' objects doesn't apply to a 'int' object 

2033 

2034 >>> try: 

2035 ... CsvWriter([], "x x") 

2036 ... except ValueError as ve: 

2037 ... print(ve) 

2038 invalid scope 'x x' 

2039 

2040 >>> try: 

2041 ... CsvWriter([], " x") 

2042 ... except ValueError as ve: 

2043 ... print(ve) 

2044 invalid scope ' x' 

2045 

2046 >>> try: 

2047 ... CsvWriter([]).get_row("x") 

2048 ... except NotImplementedError as nie: 

2049 ... print(type(nie)) 

2050 <class 'NotImplementedError'> 

2051 

2052 >>> try: 

2053 ... CsvWriter([]).get_column_titles() 

2054 ... except NotImplementedError as nie: 

2055 ... print(type(nie)) 

2056 <class 'NotImplementedError'> 

2057 """ 

2058 

2059 def __init__(self, data: Iterable[T], 

2060 scope: str | None = None) -> None: 

2061 """ 

2062 Initialize the csv writer. 

2063 

2064 :param data: the data to be written 

2065 :param scope: the prefix to be pre-pended to all columns 

2066 :raises TypeError: if `data` is not an `Iterable` or if `scope` is 

2067 neither `None` nor a string 

2068 :raises ValueError: if `scope` is not `None` but: an empty string, 

2069 becomes an empty string after stripping, or contains any 

2070 whitespace or newline character 

2071 """ 

2072 super().__init__() 

2073 if not isinstance(data, Iterable): 

2074 raise type_error(data, "data", Iterable) 

2075 if (scope is not None) and ((str.strip(scope) != scope) or ( 

2076 str.__len__(scope) <= 0) or (any(map( 

2077 scope.__contains__, WHITESPACE_OR_NEWLINE)))): 

2078 raise ValueError(f"invalid scope {scope!r}") 

2079 #: the optional scope 

2080 self.scope: Final[str | None] = scope 

2081 

2082 def get_column_titles(self) -> Iterable[str]: 

2083 """ 

2084 Get the column titles. 

2085 

2086 :returns: the column titles 

2087 """ 

2088 raise NotImplementedError 

2089 

2090 def get_optional_row(self, data: T | None) -> Iterable[str]: 

2091 """ 

2092 Attach an empty row of the correct shape to the output. 

2093 

2094 :param data: the data item or `None` 

2095 :returns: the optional row data 

2096 """ 

2097 if data is None: # very crude and slow way to create an optional row 

2098 return [""] * list.__len__(list(self.get_column_titles())) 

2099 return self.get_row(data) 

2100 

2101 def get_row(self, data: T) -> Iterable[str]: 

2102 """ 

2103 Render a single sample statistics to a CSV row. 

2104 

2105 :param data: the data sample statistics 

2106 :returns: the row iterator 

2107 """ 

2108 raise NotImplementedError 

2109 

2110 def get_header_comments(self) -> Iterable[str]: 

2111 """ 

2112 Get any possible header comments. 

2113 

2114 :returns: the iterable of header comments 

2115 """ 

2116 return () 

2117 

2118 def get_footer_comments(self) -> Iterable[str]: 

2119 """ 

2120 Get any possible footer comments. 

2121 

2122 :returns: the footer comments 

2123 """ 

2124 return () 

2125 

2126 def get_footer_bottom_comments(self) -> Iterable[str] | None: 

2127 """ 

2128 Get the bottom footer comments. 

2129 

2130 :returns: an iterator with the bottom comments 

2131 """ 

2132 return pycommons_footer_bottom_comments(self) 

2133 

2134 @classmethod 

2135 def write( 

2136 cls: type["CsvWriter"], 

2137 data: Iterable[T], 

2138 scope: str | None = None, 

2139 separator: str = CSV_SEPARATOR, 

2140 comment_start: str | None = COMMENT_START, 

2141 **kwargs) -> Generator[str, None, None]: 

2142 """ 

2143 Write the CSV data based on the methods provided by the class `cls`. 

2144 

2145 :param data: the data 

2146 :param separator: the CSV separator 

2147 :param comment_start: the comment start character 

2148 :param scope: the scope, or `None` 

2149 :param kwargs: additional arguments to be passed to the constructor 

2150 

2151 :raises TypeError: if `kwargs` is not `None` but also not a 

2152 :class:`dict` 

2153 """ 

2154 def __creator(y: Iterable[T], __c=cls, # pylint: disable=W0102 

2155 __s=scope, # noqa # type: ignore 

2156 __x=kwargs) -> "CsvWriter": # noqa # type: ignore 

2157 return __c(data=y, scope=__s, **__x) # noqa # type: ignore 

2158 

2159 yield from csv_write( 

2160 data=data, 

2161 column_titles=cls.get_column_titles, # type: ignore 

2162 get_row=cls.get_row, # type: ignore 

2163 setup=__creator, 

2164 separator=separator, 

2165 comment_start=comment_start, 

2166 header_comments=cls.get_header_comments, # type: ignore 

2167 footer_comments=cls.get_footer_comments, # type: ignore 

2168 footer_bottom_comments=cls. # type: ignore 

2169 get_footer_bottom_comments) # type: ignore