pycommons.io package

Common utilities for input and output.

Submodules

pycommons.io.arguments module

The parser for command line arguments.

pycommons.io.arguments.make_argparser(file, description, epilog, version=None)[source]

Create an argument parser with default settings.

Parameters:
  • file (str) – the __file__ special variable of the calling script

  • description (str) – the description string

  • epilog (str) – the epilogue string

  • version (Optional[str], default: None) – an optional version string

Return type:

ArgumentParser

Returns:

the argument parser

>>> ap = make_argparser(__file__, "This is a test program.",
...                     "This is a test.")
>>> isinstance(ap, ArgumentParser)
True
>>> from contextlib import redirect_stdout
>>> from io import StringIO
>>> s = StringIO()
>>> with redirect_stdout(s):
...     ap.print_usage()
>>> print(s.getvalue())
usage: python3 -m pycommons.io.arguments [-h]
>>> s = StringIO()
>>> with redirect_stdout(s):
...     ap.print_help()
>>> print(s.getvalue())
usage: python3 -m pycommons.io.arguments [-h]

This is a test program.

options:
  -h, --help  show this help message and exit

This is a test.
>>> ap = make_argparser(__file__, "This is a test program.",
...                     "This is a test.", "0.2")
>>> isinstance(ap, ArgumentParser)
True
>>> from contextlib import redirect_stdout
>>> from io import StringIO
>>> s = StringIO()
>>> with redirect_stdout(s):
...     ap.print_usage()
>>> print(s.getvalue())
usage: python3 -m pycommons.io.arguments [-h] [--version]
>>> s = StringIO()
>>> with redirect_stdout(s):
...     ap.print_help()
>>> print(s.getvalue())
usage: python3 -m pycommons.io.arguments [-h] [--version]

This is a test program.

options:
  -h, --help  show this help message and exit
  --version   show program's version number and exit

This is a test.
>>> ap = make_argparser(__file__, "This is a test program.",
...     make_epilog("This program computes something",
...                 2022, 2023, "Thomas Weise",
...                 url="https://github.com/thomasWeise/pycommons",
...                 email="tweise@hfuu.edu.cn"))
>>> s = StringIO()
>>> with redirect_stdout(s):
...     ap.print_help()
>>> v = ('usage: python3 -m pycommons.io.arguments [-h]\n\nThis is '
...      'a test program.\n\noptions:\n  -h, --help  show this help '
...      'message and exit\n\nThis program computes something Copyright'
...      '\xa0©\xa02022\u20112023,\xa0Thomas\xa0Weise,\nGNU\xa0GENERAL'
...      '\xa0PUBLIC\xa0LICENSE\xa0Version\xa03,\xa029\xa0June'
...      '\xa02007,\nhttps://github.com/thomasWeise/pycommons, '
...      'tweise@hfuu.edu.cn\n')
>>> s.getvalue() == v
True
>>> try:
...     make_argparser(1, "", "")
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     make_argparser(None, "", "")
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'NoneType'
>>> try:
...     make_argparser("te", "", "")
... except ValueError as ve:
...     print(ve)
invalid file='te'.
>>> try:
...     make_argparser("test", 1, "")
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> try:
...     make_argparser("Test", None, "")
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'NoneType' object
>>> try:
...     make_argparser("Test", "Bla", "")
... except ValueError as ve:
...     print(ve)
invalid description='Bla'.
>>> try:
...     make_argparser("Test", "This is a long test", 1)
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> try:
...     make_argparser("Test", "This is a long test", None)
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'NoneType' object
>>> try:
...     make_argparser("Test", "This is a long test", "epi")
... except ValueError as ve:
...     print(ve)
invalid epilog='epi'.
>>> try:
...     make_argparser(__file__, "This is a long test",
...         "long long long epilog", 1)
... except TypeError as te:
...     print(str(te)[:60])
descriptor 'strip' for 'str' objects doesn't apply to a 'int
>>> try:
...     make_argparser(__file__, "This is a long test",
...         "long long long epilog", " ")
... except ValueError as ve:
...     print(ve)
Invalid version string ' '.
pycommons.io.arguments.make_epilog(text, copyright_start=None, copyright_end=None, author=None, the_license='GNU GENERAL PUBLIC LICENSE Version 3, 29 June 2007', url=None, email=None)[source]

Build an epilogue from the given components.

Parameters:
  • text (str) – the epilog text

  • copyright_start (Optional[int], default: None) – the start year of the copyright, or None for no copyright duration

  • copyright_end (Optional[int], default: None) – the end year of the copyright, or None for using the current year (unless copyright_start is None, in which case, no copyright information is generated).

  • author (Optional[str], default: None) – the author name, or None for no author

  • the_license (str | None, default: 'GNU GENERAL PUBLIC LICENSE Version 3, 29 June 2007') – the license, or None for no license

  • url (Optional[str], default: None) – the URL, or None for no URL

  • email (Optional[str], default: None) – the email address(es) of the author, or None for no email address information

Return type:

str

Returns:

the copyright information

>>> cy = datetime.now(tz=timezone.utc).year
>>> ex = (f"This is a test.\n\nGNU\xa0GENERAL\xa0PUBLIC\xa0LICENSE"
...       "\xa0Version\xa03,\xa029\xa0June\xa02007")
>>> make_epilog("This is a test.") == ex
True
>>> make_epilog("This is a test.", 2011, 2030, "Test User",
...             "Test License", "http://testurl", "test@test.com")[:50]
'This is a test.\n\nCopyright\xa0©\xa02011\u20112030,\xa0Test\xa0User,'
>>> ex = (f"This is a test.\n\nCopyright\xa0©\xa02011\u2011{cy},"
...        "\xa0Test\xa0User, Test\xa0License, http://testurl, "
...        "test@test.com")
>>> make_epilog("This is a test.", 2011, None, "Test User",
...             "Test License", "http://testurl", "test@test.com") == ex
True
>>> make_epilog("This is a test.", 2011, 2030, "Test User",
...             "Test License", "http://testurl", "test@test.com")[50:]
' Test\xa0License, http://testurl, test@test.com'
>>> make_epilog("This is a test.", 2030, 2030, "Test User",
...             "Test License", "http://testurl", "test@test.com")[:50]
'This is a test.\n\nCopyright\xa0©\xa02030,\xa0Test\xa0User, Test'
>>> make_epilog("This is a test.", 2030, 2030, "Test User",
...             "Test License", "http://testurl", "test@test.com")[50:]
'\xa0License, http://testurl, test@test.com'
>>> make_epilog("This is a test.", None, None, "Test User",
...             "Test License", "http://testurl", "test@test.com")[:50]
'This is a test.\n\nTest\xa0User, Test\xa0License, http://t'
>>> make_epilog("This is a test.", None, None, "Test User",
...             "Test License", "http://testurl", "test@test.com")[50:]
'esturl, test@test.com'
>>> try:
...     make_epilog(1, None, None, "Test User",
...                 "Test License", "http://testurl", "test@test.com")
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> try:
...     make_epilog(None, None, None, "Test User",
...                 "Test License", "http://testurl", "test@test.com")
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'NoneType' object
>>> try:
...     make_epilog("1", None, None, "Test User",
...                 "Test License", "http://testurl", "test@test.com")
... except ValueError as ve:
...     print(ve)
Epilog text too short: '1'.
>>> try:
...     make_epilog("This is a test.", "v", None, "Test User",
...                 "Test License", "http://testurl", "test@test.com")
... except TypeError as te:
...     print(te)
copyright_start should be an instance of int but is str, namely 'v'.
>>> try:
...     make_epilog("This is a test.", -2, None, "Test User",
...                 "Test License", "http://testurl", "test@test.com")
... except ValueError as ve:
...     print(ve)
copyright_start=-2 is invalid, must be in 1970..2500.
>>> try:
...     make_epilog("This is a test.", 3455334, None, "Test User",
...                 "Test License", "http://testurl", "test@test.com")
... except ValueError as ve:
...     print(ve)
copyright_start=3455334 is invalid, must be in 1970..2500.
>>> try:
...     make_epilog("This is a test.", 2002, "v", "Test User",
...                 "Test License", "http://testurl", "test@test.com")
... except TypeError as te:
...     print(te)
copyright_end should be an instance of int but is str, namely 'v'.
>>> try:
...     make_epilog("This is a test.", 2002, 12, "Test User",
...                 "Test License", "http://testurl", "test@test.com")
... except ValueError as ve:
...     print(ve)
copyright_end=12 is invalid, must be in 2002..2500.
>>> try:
...     make_epilog("This is a test.", 2023, 3455334, "Test User",
...                 "Test License", "http://testurl", "test@test.com")
... except ValueError as ve:
...     print(ve)
copyright_end=3455334 is invalid, must be in 2023..2500.
>>> try:
...     make_epilog("This is a test.", None, None, 2,
...                 "Test License", "http://testurl", "test@test.com")
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> try:
...     make_epilog("This is a test.", None, None, "",
...                 "Test License", "http://testurl", "test@test.com")
... except ValueError as ve:
...     print(ve)
Author too short: ''.
>>> try:
...     make_epilog("This is a test.", None, None, "Tester",
...                 23, "http://testurl", "test@test.com")
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> try:
...     make_epilog("This is a test.", None, None, "Tester",
...                 "Te", "http://testurl", "test@test.com")
... except ValueError as ve:
...     print(ve)
License too short: 'Te'.
>>> try:
...     make_epilog("This is a test.", None, None, "Tester",
...                 "GPL", 2, "test@test.com")
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> try:
...     make_epilog("This is a test.", None, None, "Tester",
...                 "GPL", "http", "test@test.com")
... except ValueError as ve:
...     print(ve)
Url too short: 'http'.
>>> try:
...     make_epilog("This is a test.", None, None, "Tester",
...                 "GPL", "http://www.test.com", 1)
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> try:
...     make_epilog("This is a test.", None, None, "Tester",
...                 "GPL", "http://www.test.com", "a@b")
... except ValueError as ve:
...     print(ve)
Email too short: 'a@b'.
pycommons.io.arguments.pycommons_argparser(file, description, epilog)[source]

Create an argument parser with default settings for pycommons.

Parameters:
  • file (str) – the __file__ special variable of the calling script

  • description (str) – the description string

  • epilog (str) – the epilogue string

Return type:

ArgumentParser

Returns:

the argument parser

>>> ap = pycommons_argparser(
...     __file__, "This is a test program.", "This is a test.")
>>> isinstance(ap, ArgumentParser)
True
>>> "Copyright" in ap.epilog
True

pycommons.io.console module

The logger routine for writing a log string to stdout.

pycommons.io.console.logger(message, note='', lock=<contextlib.nullcontext object>)[source]

Write a message to the console log.

The line starts with the current date and time, includes the note, and then the message string after an “: “. This function can use a lock context to prevent multiple processes or threads to write to the console at the same time.

Parameters:
  • message (str) – the message

  • note (str, default: '') – a note to put between the time and the message

  • lock (AbstractContextManager, default: <contextlib.nullcontext object at 0x7f4774bf6da0>) – the lock to prevent multiple threads to write log output at the same time

>>> from io import StringIO
>>> from contextlib import redirect_stdout
>>> sio = StringIO()
>>> dt1 = datetime.datetime.now()
>>> with redirect_stdout(sio):
...     logger("hello world!")
>>> line = sio.getvalue().strip()
>>> print(line[26:])
: hello world!
>>> dt2 = datetime.datetime.now()
:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``
>>> dtx = datetime.datetime.strptime(line[:26], "%Y-%m-%d %H:%M:%S.%f")
>>> dt1 <= dtx <= dt2
True
>>> sio = StringIO()
>>> with redirect_stdout(sio):
...     logger("hello world!", "note")
>>> line = sio.getvalue().strip()
>>> print(line[26:])
note: hello world!
>>> from contextlib import AbstractContextManager
>>> class T:
...     def __enter__(self):
...         print("x")
...     def __exit__(self, exc_type, exc_val, exc_tb):
...         print("y")
>>> sio = StringIO()
>>> with redirect_stdout(sio):
...     logger("hello world!", "", T())
>>> sio.seek(0)
0
>>> lines = sio.readlines()
>>> print(lines[0].rstrip())
x
>>> print(lines[1][26:].rstrip())
: hello world!
>>> print(lines[2].rstrip())
y
>>> sio = StringIO()
>>> with redirect_stdout(sio):
...     logger("hello world!", "note", T())
>>> sio.seek(0)
0
>>> lines = sio.readlines()
>>> print(lines[0].rstrip())
x
>>> print(lines[1][26:].rstrip())
note: hello world!
>>> print(lines[2].rstrip())
y

pycommons.io.csv module

Tools for CSV output and input.

Our CSV format tools are intended to read and write structured objects from and to a comma-separated-values format. This format consists of one header, where the column titles are included (separated by a CSV_SEPARATOR) and one row per data object, with one value per column.

Different from other CSV processing tools, we want to

  1. Permit that data is extracted from / parsed in form of hierarchically structured objects.

  2. Columns have fixed types based on the object definition.

  3. The data read and written is strictly validated during the process.

  4. Data can be processed in form of a stream and is not necessarily all loaded into memory at once.

  5. The order of the columns is unimportant.

  6. Useless white space is automatically stripped and ignored.

  7. Multiple objects may be written per row, maybe even nested objects, and this is signified by “scope” column titles, e.g., something like “weight.min”, “weight.median”, …, “age.min”, “age.median”, …

  8. Comments may be added to the header or footer of the CSV file that describe the contents of the columns.

The separator is configurable, but by default set to CSV_SEPARATOR. Comments start with a comment start with COMMENT_START by default.

pycommons.io.csv.COMMENT_START: Final[str] = '#'

everything after this character is considered a comment

pycommons.io.csv.CSV_SEPARATOR: Final[str] = ';'

the default CSV separator

class pycommons.io.csv.S

the type variable for the CSV output setup

alias of TypeVar(‘S’)

pycommons.io.csv.SCOPE_SEPARATOR: Final[str] = '.'

the separator to be used between scopes for nested column prefixes

class pycommons.io.csv.T

the type variable for data to be written to CSV or to be read from CSV

alias of TypeVar(‘T’)

class pycommons.io.csv.U

a type variable for csv_val_or_none().

alias of TypeVar(‘U’)

pycommons.io.csv.csv_column(columns, key, remove_col=True)[source]

Get the index of a CSV column.

This function will extract the index of a column from a column description map. The index will be checked whether it is in a valid range and returned. If no column fitting to key exists, this function will throw a KeyError. If remove_col is True and a column fitting to key exists, then this column will be deleted from columns.

Parameters:
  • columns (dict[str, int]) – the columns set

  • key (str) – the key

  • remove_col (bool, default: True) – should we remove the column?

Return type:

int

Returns:

the column

Raises:
  • TypeError – if any of the parameters is not of the prescribed type

  • ValueError – if the column or key are invalid

  • KeyError – if no column of the name key eixists

>>> csv_column({"a": 5}, "a")
5
>>> cols = {"a": 5, "b": 7}
>>> csv_column(cols, "a", False)
5
>>> cols
{'a': 5, 'b': 7}
>>> csv_column(cols, "a", True)
5
>>> cols
{'b': 7}
>>> try:
...     csv_column({"a": 5}, "b")
... except KeyError as ke:
...     print(ke)
'b'
>>> try:
...     csv_column({"a": 5}, "a", "3")
... except TypeError as te:
...     print(te)
remove_col should be an instance of bool but is str, namely '3'.
>>> try:
...     csv_column(None, "b")
... except TypeError as te:
...     print(str(te)[:50])
descriptor '__getitem__' for 'dict' objects doesn'
>>> try:
...     csv_column({"a": 5}, 1)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     csv_column({"a": -1}, "a")
... except ValueError as ve:
...     print(ve)
a=-1 is invalid, must be in 0..1000000.
>>> try:
...     csv_column({"a": -1}, "")
... except ValueError as ve:
...     print(ve)
Invalid key ''.
pycommons.io.csv.csv_column_or_none(columns=None, key=None, remove_col=True)[source]

Get an optional CSV column index.

This function will extract the index of a column from a column description map. The index will be checked whether it is in a valid range and returned. If no column fitting to key exists, this function returns None. If remove_col is True and a column fitting to key exists, then this column will be deleted from columns.

Parameters:
  • columns (Optional[dict[str, int]], default: None) – the columns

  • key (Optional[str], default: None) – the key

  • remove_col (bool, default: True) – should we remove the column?

Return type:

int | None

Returns:

the column, or None if none was found

Raises:
  • TypeError – if any of the parameters is not of the prescribed type

  • ValueError – if the column or key are invalid

>>> csv_column_or_none({"a": 5}, "a")
5
>>> cols = {"a": 5, "b": 7}
>>> csv_column_or_none(cols, "a", False)
5
>>> cols
{'a': 5, 'b': 7}
>>> csv_column_or_none(cols, "a", True)
5
>>> cols
{'b': 7}
>>> try:
...     csv_column_or_none({"a": 5}, "a", "3")
... except TypeError as te:
...     print(te)
remove_col should be an instance of bool but is str, namely '3'.
>>> print(csv_column_or_none({"a": 5}, "b"))
None
>>> print(csv_column_or_none(None, "b"))
None
>>> print(csv_column_or_none({"a": 5}, None))
None
>>> print(csv_column_or_none({"a": 5}, ""))
None
>>> try:
...     csv_column({"a": 5}, 1)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     csv_column({"a": -1}, "a")
... except ValueError as ve:
...     print(ve)
a=-1 is invalid, must be in 0..1000000.
pycommons.io.csv.csv_read(rows, setup, parse_row, consumer=<function <lambda>>, separator=';', comment_start='#')[source]

Read (parse) a sequence of strings as CSV data.

All lines str split() based on the separator string and each of the resulting strings is stripped via strip(). The first non-empty line of the data is interpreted as header line.

This header is passed to the setup function in form of a dict that maps column titles to column indices. This function then returns an object of setup data. To each of the rows of CSV data, the function parse_row is applied. This function receives the object returned by setup as first argument and the row as list of strings as second argument. Each line is therefore split() (by the CSV separator) and its component strip()-ped. It is permitted that a line in the CSV file contains fewer columns than declared in the header. In this case, the missing columns are set to empty strings. Lines that are entirely empty are skipped.

If comment_start is not none, then all text in a line starting at the first occurence of comment_start is discarted before the line is processed.

Parameters:
  • rows (Iterable[str]) – the rows of text

  • setup (Callable[[dict[str, int]], TypeVar(S)]) – a function which creates an object holding the necessary information for row parsing

  • parse_row (Callable[[TypeVar(S), list[str]], TypeVar(T)]) – the unction parsing the rows

  • consumer (Callable[[TypeVar(T)], Any], default: <function <lambda> at 0x7f47727bbac0>) – the consumer function receiving the parsed results

  • separator (str, default: ';') – the string used to separate columns

  • comment_start (str | None, default: '#') – the string starting comments

Raises:
  • TypeError – if any of the parameters has the wrong type

  • ValueError – if the separator or comment start character are incompatible or if the data has some internal error

Return type:

None

>>> def _setup(colidx: dict[str, int]) -> dict[str, int]:
...     return colidx
>>> def _parse_row(colidx: dict[str, int], row: list[str]) -> None:
...         return {x: row[y] for x, y in colidx.items()}
>>> def _consumer(d: dict[str, str]):
...     print(d)
>>> text = ["a;b;c;d", "# test", " 1; 2;3;4", " 5 ;6 ", ";8;;9",
...         "", "10", "# 11;12"]
>>> csv_read(text, _setup, _parse_row, _consumer)
{'a': '1', 'b': '2', 'c': '3', 'd': '4'}
{'a': '5', 'b': '6', 'c': '', 'd': ''}
{'a': '', 'b': '8', 'c': '', 'd': '9'}
{'a': '10', 'b': '', 'c': '', 'd': ''}
>>> csv_read((t.replace(";", ",") for t in text), _setup, _parse_row,
...            _consumer, ",")
{'a': '1', 'b': '2', 'c': '3', 'd': '4'}
{'a': '5', 'b': '6', 'c': '', 'd': ''}
{'a': '', 'b': '8', 'c': '', 'd': '9'}
{'a': '10', 'b': '', 'c': '', 'd': ''}
>>> csv_read((t.replace(";", "\t") for t in text), _setup, _parse_row,
...           _consumer, "\t")
{'a': '1', 'b': '2', 'c': '3', 'd': '4'}
{'a': '5', 'b': '6', 'c': '', 'd': ''}
{'a': '', 'b': '8', 'c': '', 'd': '9'}
{'a': '10', 'b': '', 'c': '', 'd': ''}
>>> csv_read(text, _setup, _parse_row, _consumer, comment_start=None)
{'a': '# test', 'b': '', 'c': '', 'd': ''}
{'a': '1', 'b': '2', 'c': '3', 'd': '4'}
{'a': '5', 'b': '6', 'c': '', 'd': ''}
{'a': '', 'b': '8', 'c': '', 'd': '9'}
{'a': '10', 'b': '', 'c': '', 'd': ''}
{'a': '# 11', 'b': '12', 'c': '', 'd': ''}
>>> try:
...     csv_read(None, _setup, _parse_row, _consumer)
... except TypeError as te:
...     print(te)
rows should be an instance of typing.Iterable but is None.
>>> try:
...     csv_read(1, _setup, _parse_row, _consumer)
... except TypeError as te:
...     print(te)
rows should be an instance of typing.Iterable but is int, namely '1'.
>>> try:
...     csv_read(text, None, _parse_row, _consumer)
... except TypeError as te:
...     print(te)
setup should be a callable but is None.
>>> try:
...     csv_read(text, 1, _parse_row, _consumer)
... except TypeError as te:
...     print(te)
setup should be a callable but is int, namely '1'.
>>> try:
...     csv_read(text, _setup, None, _consumer)
... except TypeError as te:
...     print(te)
parse_row should be a callable but is None.
>>> try:
...     csv_read(text, _setup, 1, _consumer)
... except TypeError as te:
...     print(te)
parse_row should be a callable but is int, namely '1'.
>>> try:
...     csv_read(text, _setup, _parse_row, None)
... except TypeError as te:
...     print(te)
consumer should be a callable but is None.
>>> try:
...     csv_read(text, _setup, _parse_row, 1)
... except TypeError as te:
...     print(te)
consumer should be a callable but is int, namely '1'.
>>> try:
...     csv_read(text, _setup, _parse_row, _consumer, None)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'NoneType'
>>> try:
...     csv_read(text, _setup, _parse_row, _consumer, 1)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     csv_read(text, _setup, _parse_row, _consumer, "")
... except ValueError as ve:
...     print(ve)
Invalid separator ''.
>>> try:
...     csv_read(text, _setup, _parse_row, _consumer, "-", 1)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     csv_read(text, _setup, _parse_row, _consumer, "-", "")
... except ValueError as ve:
...     print(ve)
Invalid comment start: ''.
>>> try:
...     csv_read(text, _setup, _parse_row, _consumer, "-", " ")
... except ValueError as ve:
...     print(ve)
Invalid comment start: ' '.
>>> try:
...     csv_read(text, _setup, _parse_row, _consumer, ";", ";")
... except ValueError as ve:
...     print(ve)
Invalid comment start: ';'.
>>> text2 = ["a;b;a;d", "# test", " 1; 2;3;4", " 5 ;6 ", ";8;;9"]
>>> try:
...     csv_read(text2, _setup, _parse_row, _consumer)
... except ValueError as ve:
...     print(ve)
Invalid column headers: ['a', 'b', 'a', 'd'].
pycommons.io.csv.csv_scope(scope, key)[source]

Combine a scope and a key.

Parameters:
  • scope (str | None) – the scope, or None

  • key (str | None) – the key, or None

Return type:

str

Returns:

the scope joined with the key

>>> csv_scope("a", "b")
'a.b'
>>> csv_scope("a", None)
'a'
>>> csv_scope(None, "b")
'b'
>>> try:
...     csv_scope(1, "b")
... except TypeError as te:
...     print(str(te))
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     csv_scope("a", 1)
... except TypeError as te:
...     print(str(te))
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     csv_scope("a ", "b")
... except ValueError as ve:
...     print(str(ve))
Invalid csv scope 'a '.
>>> try:
...     csv_scope("", "b")
... except ValueError as ve:
...     print(ve)
Invalid csv scope ''.
>>> try:
...     csv_scope("a", " b")
... except ValueError as ve:
...     print(str(ve))
Invalid csv key ' b'.
>>> try:
...     csv_scope("a", "")
... except ValueError as ve:
...     print(str(ve))
Invalid csv key ''.
>>> try:
...     csv_scope(None, None)
... except ValueError as ve:
...     print(str(ve))
Csv scope and key cannot both be None.
pycommons.io.csv.csv_select_scope(conv, columns, scope=None, additional=(), skip_orig_key=<function <lambda>>, skip_final_key=<function <lambda>>, skip_col=<function <lambda>>, include_scope=True, remove_cols=True)[source]

Get all the columns of a given scope and pass them to the function conv.

This function is intended for selecting some keys from a column set and pass them as parameters to a constructor of a CSV reader. It can do this selection based on a scope prefix which is then removed from the column names before passing them into the constructor. If no column matches, this function throws a ValueError. All columns that are passed on to conv are deleted from columns if remove_cols == True, which is the default.

Parameters:
  • conv (Callable[[dict[str, int]], TypeVar(U)]) – the function to which the selected columns should be passed, and that creates the return value

  • columns (dict[str, int]) – the existing columns

  • scope (Optional[str], default: None) – the scope, or None or the empty string to select all columns

  • skip_orig_key (Callable[[str], bool], default: <function <lambda> at 0x7f47727bbeb0>) – a function that returns True for any original, unchanged key in columns that should be ignored and that returns False if the key can be processed normally (i.e., if we can check if it starts with the given scope and move on)

  • skip_final_key (Callable[[str], bool], default: <function <lambda> at 0x7f47727bbf40>) – a function that returns True for any key in columns that would fall into the right scope but that should still be ignored. This function receives the key without the scope prefix.

  • skip_col (Callable[[int], bool], default: <function <lambda> at 0x7f4772480040>) – any column that should be ignored

  • additional (Iterable[tuple[str, int]], default: ()) – the additional columns to add if some keys/columns remain after all the transformation and selection

  • include_scope (bool, default: True) – if scope appears as a lone column, should we include it?

  • remove_cols (bool, default: True) – should we remove all selected columns?

Return type:

TypeVar(U)

Returns:

The result of the function conv applied to all matching columns (and those in additional are appended to them)

Raises:
  • ValueError – if no columns could be selected

  • TypeError – if any of the elements passed in is of the wrong type

>>> csv_select_scope(lambda x: x, {
...     "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "")
{'a.x': 1, 'a.y': 2, 'a': 3, 'b': 4, 'b.t': 5}
>>> try:
...     csv_select_scope(print, {"a.x": 1, "a.y": 2}, "v")
... except ValueError as ve:
...     print(ve)
Did not find sufficient data of scope 'v' in {'a.x': 1, 'a.y': 2}.
>>> try:
...     csv_select_scope(print, {}, "v")
... except ValueError as ve:
...     print(ve)
Did not find sufficient data of scope 'v' in {}.
pycommons.io.csv.csv_select_scope_or_none(conv, columns, scope=None, additional=(), skip_orig_key=<function <lambda>>, skip_final_key=<function <lambda>>, skip_col=<function <lambda>>, include_scope=True, remove_cols=True)[source]

Get all the columns of a given scope and pass them to the function conv.

This function is intended for selecting some keys from a column set and pass them as parameters to a constructor of a CSV reader. It can do this selection based on a scope prefix which is then removed from the column names before passing them into the constructor. If no column matches, this function returns None. All columns that are passed on to conv are deleted from columns if remove_cols == True, which is the default.

Parameters:
  • conv (Callable[[dict[str, int]], TypeVar(U)]) – the function to which the selected columns should be passed, if any, and that - in this case, returns the return value of this function

  • columns (dict[str, int] | None) – the existing columns

  • scope (Optional[str], default: None) – the scope, or None or the empty string to select all columns

  • skip_orig_key (Callable[[str], bool], default: <function <lambda> at 0x7f4772480160>) – a function that returns True for any original, unchanged key in columns that should be ignored and that returns False if the key can be processed normally (i.e., if we can check if it starts with the given scope and move on)

  • skip_final_key (Callable[[str], bool], default: <function <lambda> at 0x7f47724801f0>) – a function that returns True for any key in columns that would fall into the right scope but that should still be ignored. This function receives the key without the scope prefix.

  • skip_col (Callable[[int], bool], default: <function <lambda> at 0x7f4772480280>) – any column that should be ignored

  • additional (Iterable[tuple[str, int]], default: ()) – the additional columns to add if some keys/columns remain after all the transformation and selection

  • include_scope (bool, default: True) – if scope appears as a lone column, should we include it?

  • remove_cols (bool, default: True) – should we remove all selected columns?

Return type:

Optional[TypeVar(U)]

Returns:

None if no keys fall into the provided scope does not have any keys matching it in columns. The result of conv otherwise, i.e., if there are matching columns, these are selected (and those in additional are appended to them) and these are then passed to conv and the result of conv is returned

>>> csv_select_scope_or_none(print, {
...     "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a")
{'x': 1, 'y': 2, 'a': 3}
>>> exa1 = {"a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}
>>> csv_select_scope_or_none(print, exa1, "a", remove_cols=False)
{'x': 1, 'y': 2, 'a': 3}
>>> exa1
{'a.x': 1, 'a.y': 2, 'a': 3, 'b': 4, 'b.t': 5}
>>> csv_select_scope_or_none(print, exa1, "a", remove_cols=True)
{'x': 1, 'y': 2, 'a': 3}
>>> exa1
{'b': 4, 'b.t': 5}
>>> csv_select_scope_or_none(print, exa1, "b", remove_cols=True)
{'b': 4, 't': 5}
>>> exa1
{}
>>> csv_select_scope_or_none(print, {
...     "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "")
{'a.x': 1, 'a.y': 2, 'a': 3, 'b': 4, 'b.t': 5}
>>> csv_select_scope_or_none(print, {
...     "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, None)
{'a.x': 1, 'a.y': 2, 'a': 3, 'b': 4, 'b.t': 5}
>>> csv_select_scope_or_none(print, {
...     "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...     include_scope=False)
{'x': 1, 'y': 2}
>>> csv_select_scope_or_none(print, {
...     "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b")
{'b': 4, 't': 5}
>>> csv_select_scope_or_none(print, {
...     "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b",
...     additional=(('z', 23), ('v', 45)))
{'b': 4, 't': 5, 'z': 23, 'v': 45}
>>> csv_select_scope_or_none(print, {
...     "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b",
...     additional=(('t', 23), ('v', 45)))
{'b': 4, 't': 5, 'v': 45}
>>> csv_select_scope_or_none(print, {
...     "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...     additional=(('x', 44), ('v', 45)))
{'x': 1, 'y': 2, 'a': 3, 'v': 45}
>>> csv_select_scope_or_none(print, {
...     "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b",
...     additional=(('z', 23), ('v', 45)),
...     skip_col=lambda c: c == 23)
{'b': 4, 't': 5, 'v': 45}
>>> csv_select_scope_or_none(print, {
...     "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b",
...     additional=(('z', 23), ('v', 45)),
...     skip_orig_key=lambda ok: ok == "b.t")
{'b': 4, 'z': 23, 'v': 45}
>>> csv_select_scope_or_none(print, {
...     "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "b",
...     additional=(('z', 23), ('v', 45)),
...     skip_final_key=lambda fk: fk == "z")
{'b': 4, 't': 5, 'v': 45}
>>> print(csv_select_scope_or_none(print, {}, "a"))
None
>>> print(csv_select_scope_or_none(print, {}, None))
None
>>> print(csv_select_scope_or_none(print, None, None))
None
>>> print(csv_select_scope_or_none(print, {"a.x": 45}, "a",
...         skip_col=lambda c: c == 45))
None
>>> try:
...     csv_select_scope_or_none(None, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a")
... except TypeError as te:
...     print(te)
conv should be a callable but is None.
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...         remove_cols=1)
... except TypeError as te:
...     print(te)
remove_cols should be an instance of bool but is int, namely '1'.
>>> try:
...     csv_select_scope_or_none("x", {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a")
... except TypeError as te:
...     print(te)
conv should be a callable but is str, namely 'x'.
>>> try:
...     csv_select_scope_or_none(print, "x", "a")
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'dict' object but received a 'str'
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, int)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'type'
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...         additional=2)
... except TypeError as te:
...     print(str(te)[:-7])
additional should be an instance of typing.Iterable but is int, name
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...         additional=((1, 2), ))
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...         additional=(None, ))
... except TypeError as te:
...     print(te)
cannot unpack non-iterable NoneType object
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...         additional=(("yx", "a"), ))
... except TypeError as te:
...     print(te)
yx should be an instance of int but is str, namely 'a'.
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...         additional=(("yx", -2), ))
... except ValueError as ve:
...     print(ve)
yx=-2 is invalid, must be in 0..1000000.
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "a.b": -4, "b.t": 5}, "a")
... except ValueError as ve:
...     print(ve)
a.b=-4 is invalid, must be in 0..1000000.
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...         skip_col=None)
... except TypeError as te:
...     print(te)
skip_col should be a callable but is None.
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...         skip_orig_key=None)
... except TypeError as te:
...     print(te)
skip_orig_key should be a callable but is None.
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...         skip_final_key=None)
... except TypeError as te:
...     print(te)
skip_final_key should be a callable but is None.
>>> try:
...     csv_select_scope(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...         include_scope=3)
... except TypeError as te:
...     print(te)
include_scope should be an instance of bool but is int, namely '3'.
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, 4)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     csv_select_scope_or_none(print, 11)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'dict' object but received a 'int'
>>> try:
...     csv_select_scope_or_none(print, {
...         "a.x": 1, "a.y": 2, "a": 3, "b": 4, "b.t": 5}, "a",
...         additional=(("", 2), ))
... except ValueError as ve:
...     print(ve)
Invalid additional column ''.
pycommons.io.csv.csv_str_or_none(data, index)[source]

Get a string or None from a data row.

This function is a shortcut for when data elements or columns are optional. If index is None or outside of the valid index range of the list data, then None is returned. If data itself is None or the element at index index is the empty string, then None is returned. Only if data and index are both not None and index is a valid index into data and the element at index index in data is not the empty string, then this element is returned. In other words, this is a very tolerant function to handle optional data and to return None if the data is not present. The function csv_val_or_none() further extends this function by converting the data to another data type if it is present.

Parameters:
Return type:

str | None

Returns:

the string or nothing

>>> ddd = ["a", "b", "", "d"]
>>> print(csv_str_or_none(ddd, 0))
a
>>> print(csv_str_or_none(ddd, 1))
b
>>> print(csv_str_or_none(ddd, 2))
None
>>> print(csv_str_or_none(ddd, 3))
d
>>> print(csv_str_or_none(ddd, None))
None
>>> print(csv_str_or_none(ddd, 10))
None
>>> print(csv_str_or_none(ddd, -1))
None
>>> print(csv_str_or_none(None, 0))
None
pycommons.io.csv.csv_val_or_none(data, index, conv)[source]

Get a value or None.

See csv_str_or_none() allows us to extract an optional data element from a CSV row and get None if the element is not present or if the index is None or outside of the valid range. In case the data is present and not the empty string, then the function conv is invoked to convert it to another value. Otherwise, None is returned.

Parameters:
Return type:

Optional[TypeVar(U)]

Returns:

the object

>>> ddd = ["11", "22", "", "33"]
>>> print(csv_val_or_none(ddd, 0, int))
11
>>> print(csv_val_or_none(ddd, 1, int))
22
>>> print(csv_val_or_none(ddd, 2, int))
None
>>> print(csv_val_or_none(ddd, 3, int))
33
>>> print(csv_val_or_none(ddd, None, int))
None
pycommons.io.csv.csv_write(data, consumer, get_column_titles, get_row, setup=<function <lambda>>, separator=';', comment_start='#', get_header_comments=<function <lambda>>, get_footer_comments=<function <lambda>>)[source]

Write data in CSV format to a text destination.

The data is provided in form of a Iterable. In a first step, the function setup is invoked and applied to the data Iterable. It can return an object that sort of stores the structure of the data, e.g., which columns should be generated and how they should be formatted.

This returned object is passed to get_column_titles, which should pass the titles of the columns to a Callable. These titles are strip()-ped and concatenated to use the column separator string and the resulting header string is passed to consumer.

Then, for each element e in the data Iterable, the function get_row is invoked. This function receives the setup information object (previously returned by setup) and a Callable to which one string per column should be passed. These strings are then each strip()-ped and concatenated using the column separator string. All trailing separator are removed, but if all strings are empty, at least a single separator is retained. The resulting string (per row) is again passed to consumer.

Additionally, get_header_comments and get_footer_comments can be provided to pass row comments as str to a Callable which then prepends or appends them as comment rows before or after all of the above, respectively. In that case, comment_start is prepended to each line.

If you create nested CSV formats, i.e., such where the setup function invokes the setup function of other data, and the data that you receive could come from a Generator (or some other one-shot Iterator), then you need to make sure to solidify the iterable data with reiterable(). The structure of our CSV output is that setup is first invoked and then get_row. If setup already consumes the data away, then get_row may print nothing. Alternatively, if you apply multiple setup routines to the same data that extract different information, then the first setup run may consume all the data, leaving nothing for the second one.

Parameters:
Raises:
  • TypeError – if any of the parameters has the wrong type

  • ValueError – if the separator or comment start character are incompatible or if the data has some internal error

Return type:

None

>>> dd = [{"a": 1, "c": 2}, {"b": 6, "c": 8},
...       {"a": 4, "d": 12, "b": 3}, {}]
>>> def __setup(datarows) -> list[str]:
...     return sorted({dkey for datarow in datarows for dkey in datarow})
>>> def __get_column_titles(keyd: list[str], app: Callable[[str], None]):
...     for kx in keyd:
...         app(kx)
>>> def __get_row(keyd: list[str], row: dict[str, int],
...               app: Callable[[str], None]):
...     for kx in map(str, (row.get(key, "") for key in keyd)):
...         app(kx)
>>> def __get_header_cmt(keyd: list[str], app: Callable[[str], None]):
...     app("This is a header comment.")
...     app(" We have two of it. ")
>>> def __get_footer_cmt(keyd: list[str], app: Callable[[str], None]):
...     app(" This is a footer comment.")
>>> csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", "#", __get_header_cmt, __get_footer_cmt)
# This is a header comment.
# We have two of it.
a;b;c;d
1;;2
;6;8
4;3;;12
;
# This is a footer comment.
>>> csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ",", "@@", __get_header_cmt, __get_footer_cmt)
@@ This is a header comment.
@@ We have two of it.
a,b,c,d
1,,2
,6,8
4,3,,12
,
@@ This is a footer comment.
>>> try:
...     csv_write(None, print, __get_column_titles, __get_row, __setup,
...           ";", "#", __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(str(te)[:60])
source should be an instance of any in {typing.Iterable, typ
>>> try:
...     csv_write(1, print, __get_column_titles, __get_row, __setup,
...           ";", "#", __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(str(te)[:60])
source should be an instance of any in {typing.Iterable, typ
>>> try:
...     csv_write(dd, None, __get_column_titles, __get_row, __setup,
...           ";", "#", __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(te)
consumer should be a callable but is None.
>>> try:
...     csv_write(dd, 1, __get_column_titles, __get_row, __setup,
...           ";", "#", __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(te)
consumer should be a callable but is int, namely '1'.
>>> try:
...     csv_write(dd, print, None, __get_row, __setup,
...           ";", "#", __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(te)
get_column_titles should be a callable but is None.
>>> try:
...     csv_write(dd, print, 1, __get_row, __setup,
...           ";", "#", __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(te)
get_column_titles should be a callable but is int, namely '1'.
>>> try:
...     csv_write(dd, print, __get_column_titles, None, __setup,
...           ";", "#", __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(te)
get_row should be a callable but is None.
>>> try:
...     csv_write(dd, print, __get_column_titles, 1, __setup,
...           ";", "#", __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(te)
get_row should be a callable but is int, namely '1'.
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, None,
...           ";", "#", __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(te)
setup should be a callable but is None.
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, 1,
...           ";", "#", __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(te)
setup should be a callable but is int, namely '1'.
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           None, "#", __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'NoneType'
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           1, "#", __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", 1, __get_header_cmt, __get_footer_cmt)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", "#", None, __get_footer_cmt)
... except TypeError as te:
...     print(te)
get_header_comments should be a callable but is None.
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", "#", 1, __get_footer_cmt)
... except TypeError as te:
...     print(te)
get_header_comments should be a callable but is int, namely '1'.
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", "", __get_header_cmt, __get_footer_cmt)
... except ValueError as ve:
...     print(ve)
Invalid comment start: ''.
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", " ", __get_header_cmt, __get_footer_cmt)
... except ValueError as ve:
...     print(ve)
Invalid comment start: ' '.
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", "# ", __get_header_cmt, __get_footer_cmt)
... except ValueError as ve:
...     print(ve)
Invalid comment start: '# '.
>>> csv_write(dd, print, __get_column_titles, __get_row, __setup, ";",
...             None)
a;b;c;d
1;;2
;6;8
4;3;;12
;
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", None, __get_header_cmt)
... except ValueError as ve:
...     print(str(ve)[:60])
Cannot place header comment 'This is a header comment.' if c
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", None, get_footer_comments=__get_footer_cmt)
... except ValueError as ve:
...     print(str(ve)[:59])
a;b;c;d
1;;2
;6;8
4;3;;12
;
Cannot place footer comment ' This is a footer comment.' if
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", "#", __get_header_cmt, None)
... except TypeError as te:
...     print(te)
get_footer_comments should be a callable but is None.
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", "#", __get_header_cmt, 1)
... except TypeError as te:
...     print(te)
get_footer_comments should be a callable but is int, namely '1'.
>>> def __err_cmt_1(keyd: list[str], app: Callable[[str], None]):
...     app("This is\n a comment with error.")
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", "#", __err_cmt_1)
... except ValueError as ve:
...     print(str(ve)[:59])
Header comment must not contain newline, but 'This is\n a c
>>> try:
...     csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", "#", get_footer_comments=__err_cmt_1)
... except ValueError as ve:
...     print(str(ve)[:59])
a;b;c;d
1;;2
;6;8
4;3;;12
;
Footer comment must not contain newline, but 'This is\n a c
>>> def __empty_cmt(keyd: list[str], app: Callable[[str], None]):
...     app(" ")
>>> csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", "#", __empty_cmt)
a;b;c;d
1;;2
;6;8
4;3;;12
;
>>> csv_write(dd, print, __get_column_titles, __get_row, __setup,
...           ";", "#", get_footer_comments=__empty_cmt)
a;b;c;d
1;;2
;6;8
4;3;;12
;
>>> def __error_column_titles_1(keyd: list[str],
...                             app: Callable[[str], None]):
...     pass
>>> try:
...     csv_write(dd, print, __error_column_titles_1, __get_row, __setup,
...           ";", "#")
... except ValueError as ve:
...     print(ve)
Cannot have zero columns.
>>> def __error_column_titles_2(keyd: list[str],
...                             app: Callable[[str], None]):
...     app(" ")
>>> try:
...     csv_write(dd, print, __error_column_titles_2, __get_row, __setup,
...           ";", "#")
... except ValueError as ve:
...     print(str(ve)[:50])
Invalid column title ' ', must neither be empty no
>>> def __error_column_titles_3(keyd: list[str],
...                             app: Callable[[str], None]):
...     app("bla\nblugg")
>>> try:
...     csv_write(dd, print, __error_column_titles_3, __get_row, __setup,
...           ";", "#")
... except ValueError as ve:
...     print(str(ve)[:50])
Invalid column title 'bla\nblugg', must neither be
>>> def __error_column_titles_4(keyd: list[str],
...                             app: Callable[[str], None]):
...     app(None)
>>> try:
...     csv_write(dd, print, __error_column_titles_4, __get_row, __setup,
...           ";", "#")
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'NoneType' object
>>> def __error_column_titles_5(keyd: list[str],
...                             app: Callable[[str], None]):
...     app(1)
>>> try:
...     csv_write(dd, print, __error_column_titles_5, __get_row, __setup,
...           ";", "#")
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> def __error_column_titles_6(keyd: list[str],
...                             app: Callable[[str], None]):
...     for xa in ("a", "b", "c", "a"):
...         app(xa)
>>> try:
...     csv_write(dd, print, __error_column_titles_6, __get_row, __setup,
...           ";", "#")
... except ValueError as ve:
...     print(ve)
Cannot have duplicated columns: ['a', 'b', 'c', 'a'].
>>> def __error_column_titles_7(keyd: list[str],
...                             app: Callable[[str], None]):
...     for xa in ("a", "b", "c;4"):
...         app(xa)
>>> try:
...     csv_write(dd, print, __error_column_titles_7, __get_row, __setup,
...           ";", "#")
... except ValueError as ve:
...     print(str(ve)[:49])
Invalid column title 'c;4', must neither be empty
>>> def __error_column_titles_8(keyd: list[str],
...                             app: Callable[[str], None]):
...     for xa in ("a", "b#x", "c"):
...         app(xa)
>>> try:
...     csv_write(dd, print, __error_column_titles_8, __get_row, __setup,
...           ";", "#")
... except ValueError as ve:
...     print(str(ve)[:49])
Invalid column title 'b#x', must neither be empty
>>> def __error_row_1(keyd: list[str], row: dict[str, int],
...                   app: Callable[[str], None]):
...     for xa in ("bla", None, "blubb"):
...         app(xa)
>>> try:
...     csv_write(dd, print, __get_column_titles, __error_row_1,
...               __setup, ";", "#")
... except TypeError as te:
...     print(te)
a;b;c;d
descriptor 'strip' for 'str' objects doesn't apply to a 'NoneType' object
>>> def __error_row_2(keyd: list[str], row: dict[str, int],
...                   app: Callable[[str], None]):
...     for xa in ("bla", 2.3, "blubb"):
...         app(xa)
>>> try:
...     csv_write(dd, print, __get_column_titles, __error_row_2,
...               __setup, ";", "#")
... except TypeError as te:
...     print(te)
a;b;c;d
descriptor 'strip' for 'str' objects doesn't apply to a 'float' object
>>> def __error_row_3(keyd: list[str], row: dict[str, int],
...                   app: Callable[[str], None]):
...     for xa in ("bla", "x\ny", "blubb"):
...         app(xa)
>>> try:
...     csv_write(dd, print, __get_column_titles, __error_row_3,
...               __setup, ";", "#")
... except ValueError as ve:
...     print(str(ve)[:50])
a;b;c;d
Invalid column value 'x\ny', cannot contain any of
>>> def __error_row_4(keyd: list[str], row: dict[str, int],
...                   app: Callable[[str], None]):
...     for xa in ("bla", "x#", "blubb"):
...         app(xa)
>>> try:
...     csv_write(dd, print, __get_column_titles, __error_row_4,
...               __setup, ";", "#")
... except ValueError as ve:
...     print(str(ve)[:50])
a;b;c;d
Invalid column value 'x#', cannot contain any of [
>>> def __error_row_5(keyd: list[str], row: dict[str, int],
...                   app: Callable[[str], None]):
...     for xa in ("bla", "x;#", "blubb"):
...         app(xa)
>>> try:
...     csv_write(dd, print, __get_column_titles, __error_row_5,
...               __setup, ";", "#")
... except ValueError as ve:
...     print(str(ve)[:49])
a;b;c;d
Invalid column value 'x;#', cannot contain any of
>>> def __error_column_titles_9(keyd: list[str],
...                             app: Callable[[str], None]):
...     app("a")
>>> def __error_row_6(keyd: list[str], row: dict[str, int],
...                   app: Callable[[str], None]):
...     app("")
>>> try:
...     csv_write(dd, print, __error_column_titles_9, __error_row_6,
...               __setup, ";", "#")
... except ValueError as ve:
...     print(ve)
a
Cannot have empty row in a single-column format, but got [''].
>>> def __error_row_7(keyd: list[str], row: dict[str, int],
...                   app: Callable[[str], None]):
...     app("x")
...     app("y")
>>> try:
...     csv_write(dd, print, __error_column_titles_9, __error_row_7,
...               __setup, ";", "#")
... except ValueError as ve:
...     print(ve)
a
Too many columns in ['x', 'y'], should be 1.

pycommons.io.path module

The class Path for handling paths to files and directories.

The instances of Path identify file system paths. They are always fully canonicalized with all relative components resolved. They thus allow the clear and unique identification of files and directories. They also offer support for opening streams, creating paths to sub-folders, and so on.

The first goal is to encapsulate the functionality of the os.path module into a single class. The second goal is to make sure that we do not run into any dodgy situation with paths pointing to security-sensitive locations or something due to strange . and .. trickery. If you try to resolve a path inside a directory and the resulting canonical path is outside that directory, you get an error raised, for example.

class pycommons.io.path.Path(value: Any)[source]

Bases: str

An immutable representation of a canonical path.

All instances of this class identify a fully-qualified path which does not contain any relative parts (“.” or “..”), is fully expanded, and, if the file system is case-insensitive, has the case normalized. A path is also an instance of str, so it can be used wherever strings are required and functions can be designed to accept str and receive Path instances instead.

>>> try:
...     Path(1)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     Path(None)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'NoneType'
>>> try:
...     Path("")
... except ValueError as ve:
...     print(ve)
Path must not be empty.
>>> try:
...     Path(" ")
... except ValueError as ve:
...     print(ve)
Path must not start or end with white space, but ' ' does.
>>> from os.path import dirname
>>> Path(dirname(realpath(__file__)) + '/..') == dirname(dirname(realpath(__file__)))
True
>>> Path(dirname(realpath(__file__)) + "/.") == dirname(realpath(__file__))
True
>>> Path(__file__) == realpath(__file__)
True
>>> from os import getcwd
>>> Path(".") == realpath(getcwd())
True
>>> from os import getcwd
>>> Path("..") == dirname(realpath(getcwd()))
True
>>> from os import getcwd
>>> Path("../.") == dirname(realpath(getcwd()))
True
>>> from os import getcwd
>>> Path("../1.txt") == join(dirname(realpath(getcwd())), "1.txt")
True
>>> from os import getcwd
>>> Path("./1.txt") == join(realpath(getcwd()), "1.txt")
True
>>> from os.path import isabs
>>> isabs(Path(".."))
True
basename()[source]

Get the name of the file or directory identified by this path.

Return type:

str

Returns:

the name of the file or directory

>>> file_path(__file__).basename()
'path.py'
>>> file_path(__file__).up(2).basename()
'pycommons'
>>> try:
...     Path("/").basename()
... except ValueError as ve:
...     print(ve)
Invalid basename '' of path '/'.
contains(other)[source]

Check whether this path is a directory and contains another path.

A file can never contain anything else. A directory contains itself as well as any sub-directories, i.e., a/b/ contains a/b/ and a/b/c. The function enforce_contains() throws an exception if the path does not contain other.

Parameters:

other (str) – the other path

Return type:

bool

Returns:

True is this path contains the other path, False of not

>>> from os.path import dirname
>>> Path(dirname(__file__)).contains(__file__)
True
>>> Path(__file__).contains(__file__)
False
>>> Path(dirname(__file__)).contains(dirname(__file__))
True
>>> Path(__file__).contains(dirname(__file__))
False
>>> Path(join(dirname(__file__), "a")).contains(join(dirname(__file__), "b"))
False
>>> try:
...     Path(dirname(__file__)).contains(1)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     Path(dirname(__file__)).contains(None)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'NoneType'
>>> try:
...     Path(dirname(__file__)).contains("")
... except ValueError as ve:
...     print(ve)
Path must not be empty.
create_file_or_truncate()[source]

Create the file identified by this path and truncate it if it exists.

Raises:

ValueError if anything goes wrong during the file creation

>>> from tempfile import mkstemp
>>> from os import close as osxclose
>>> from os import remove as osremove
:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``
>>> (h, tf) = mkstemp()
>>> osxclose(h)
>>> pth = Path(tf)
>>> pth.write_all_str("test")
>>> print(pth.read_all_str())
test
>>> pth.create_file_or_truncate()
>>> pth.is_file()
True
>>> try:
...     pth.read_all_str()
... except ValueError as ve:
...     print(str(ve)[-17:])
contains no text.
>>> osremove(pth)
>>> pth.is_file()
False
>>> pth.create_file_or_truncate()
>>> pth.is_file()
True
>>> osremove(pth)
>>> from os import makedirs as osmkdir
>>> from os import rmdir as osrmdir
>>> osmkdir(pth)
>>> try:
...     pth.create_file_or_truncate()
... except ValueError as ve:
...     print(str(ve)[:35])
Error when truncating/creating file
>>> osrmdir(pth)
enforce_contains(other)[source]

Raise an exception if this is not a directory containing another path.

The method contains() checks whether this path is a directory and contains the other path and returns the result of this check as a bool. This function here raises an exception if that check fails.

Parameters:

other (str) – the other path

Raises:

ValueError – if other is not a sub-path of this path.

>>> try:
...     Path(__file__).enforce_contains(__file__)
:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``
... except ValueError as ve:
...     print(str(ve)[-25:])
not identify a directory.
>>> from os.path import dirname
>>> Path(dirname(__file__)).enforce_contains(__file__)  # nothing
>>> try:
...     Path(join(dirname(__file__), "a")).enforce_contains(Path(join(dirname(__file__), "b")))
... except ValueError as ve:
...     print(str(ve)[-25:])
not identify a directory.
>>> Path(dirname(__file__)).enforce_contains(Path(join(dirname(__file__), "b")))  # nothing happens
>>> try:
...     Path(dirname(__file__)).enforce_contains(dirname(dirname(__file__)))
... except ValueError as ve:
...     print(str(ve)[:4])
...     print("does not contain" in str(ve))
Path
True
enforce_dir()[source]

Raise an error if the path does not reference an existing directory.

This function uses is_dir() internally and raises a ValueError if it returns False. It is therefore a shorthand for situations where you want to have an error if a path does not identify a directory.

Raises:

ValueError – if this path does not reference an existing directory

>>> try:
...     Path(__file__).enforce_dir()
... except ValueError as ve:
:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``

… print(str(ve)[-30:]) does not identify a directory.

>>> from os import getcwd
>>> Path(getcwd()).enforce_dir()   # nothing happens
enforce_file()[source]

Raise an error if the path does not reference an existing file.

This function uses is_file() internally and raises a ValueError if it returns False. It is therefore a shorthand for situations where you want to have an error if a path does not identify a file.

Raises:

ValueError – if this path does not reference an existing file

>>> Path(__file__).enforce_file()   # nothing happens
>>> from os import getcwd
>>> try:
...     Path(getcwd()).enforce_file()
... except ValueError as ve:
:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``

… print(str(ve)[-25:]) does not identify a file.

ensure_dir_exists()[source]

Make sure that the directory exists, create it otherwise.

Method is_dir() checks whether the path identifies an existing directory, method enforce_dir() raises an error if not, and this method creates the directory if it does not exist.

Raises:

ValueError – if the directory did not exist and creation failed

Return type:

None

>>> from os.path import dirname
>>> Path(dirname(__file__)).ensure_dir_exists()  # nothing happens
>>> try:
...     Path(__file__).ensure_dir_exists()
... except ValueError as ve:
...     print("does not identify a directory" in str(ve))
True
>>> try:
...     Path(join(__file__, "a")).ensure_dir_exists()
... except ValueError as ve:
...     print("Error when trying to create directory" in str(ve))
True
>>> from tempfile import mkdtemp
>>> from os import rmdir as osrmdirx
>>> td = mkdtemp()
>>> Path(td).ensure_dir_exists()
>>> osrmdirx(td)
>>> Path(td).ensure_dir_exists()
>>> p = Path(td).resolve_inside("a")
>>> p.ensure_dir_exists()
>>> p2 = p.resolve_inside("b")
>>> p2.ensure_dir_exists()
>>> osrmdirx(p2)
>>> osrmdirx(p)
>>> osrmdirx(td)
>>> p2.ensure_dir_exists()
>>> osrmdirx(p2)
>>> osrmdirx(p)
>>> osrmdirx(td)
ensure_file_exists()[source]

Atomically ensure that the file exists and create it otherwise.

While is_file() checks if the path identifies an existing file and enforce_file() raises an error if it does not, this method here creates the file if it does not exist. The method can only create the file if the directory already exists.

Return type:

bool

Returns:

True if the file already existed and False if it was newly and atomically created.

Raises:

ValueError if anything goes wrong during the file creation

>>> print(Path(__file__).ensure_file_exists())
True
>>> from os.path import dirname
>>> try:
...     Path(dirname(__file__)).ensure_file_exists()
...     print("??")
... except ValueError as ve:
...     print("does not identify a file." in str(ve))
True
>>> try:
...     Path(join(join(dirname(__file__), "a"), "b")).ensure_file_exists()
...     print("??")
... except ValueError as ve:
...     print("Error when trying to create file" in str(ve))
True
ensure_parent_dir_exists()[source]

Make sure that the parent directory exists, create it otherwise.

This path may identify a file or directory to be created that does not yet exist. The parent directory of this path is ensured to exist, i.e., if it already exists, nothing happens, but if it does not yet exist, it is created. If the parent directory cannot be created, a ValueError is raised.

Raises:

ValueError – if the directory did not exist and creation failed

Return type:

None

>>> from os.path import dirname
>>> Path(__file__).ensure_parent_dir_exists()  # nothing happens
>>> try:
...     Path(join(__file__, "a")).ensure_parent_dir_exists()
... except ValueError as ve:
...     print("is not a directory" in str(ve))
True
>>> from tempfile import mkdtemp
>>> from os import rmdir as osrmdirx
>>> td = mkdtemp()
>>> tf = Path(join(td, "xxx"))
>>> tf.ensure_parent_dir_exists()
>>> osrmdirx(td)
>>> isdir(dirname(tf))
False
>>> tf.ensure_parent_dir_exists()
>>> isdir(dirname(tf))
True
>>> osrmdirx(td)
>>> td = mkdtemp()
>>> isdir(td)
True
>>> td2 = join(td, "xxx")
>>> isdir(td2)
False
>>> tf = join(td2, "xxx")
>>> Path(tf).ensure_parent_dir_exists()
>>> isdir(td2)
True
>>> osrmdirx(td2)
>>> osrmdirx(td)
>>> td = mkdtemp()
>>> isdir(td)
True
>>> td2 = join(td, "xxx")
>>> isdir(td2)
False
>>> td3 = join(td2, "xxx")
>>> isdir(td3)
False
>>> tf = join(td3, "xxx")
>>> Path(tf).ensure_parent_dir_exists()
>>> isdir(td3)
True
>>> isdir(td2)
True
>>> osrmdirx(td3)
>>> osrmdirx(td2)
>>> osrmdirx(td)
exists()[source]

Check if this path identifies an existing file or directory.

See also is_file() and is_dir().

Return type:

bool

Returns:

True if this path identifies an existing file, False otherwise.

>>> Path(__file__).exists()
True
>>> from os.path import dirname
>>> Path(dirname(__file__)).exists()
True
>>> from tempfile import mkstemp
>>> from os import close as osxclose
>>> from os import remove as osremove
>>> (h, tf) = mkstemp()
>>> osxclose(h)
>>> p = Path(tf)
>>> p.exists()
True
>>> osremove(p)
>>> p.exists()
False
is_dir()[source]

Check if this path identifies an existing directory.

The method enforce_dir() also checks this, but raises an exception if it is not True.

Return type:

bool

Returns:

True if this path identifies an existing directory, False otherwise.

>>> Path(__file__).is_dir()
False
>>> from os.path import dirname
>>> Path(dirname(__file__)).is_dir()
True
is_file()[source]

Check if this path identifies an existing file.

See also enforce_file(), which raises an error if the is_file is not True.

Return type:

bool

Returns:

True if this path identifies an existing file, False otherwise.

>>> Path(__file__).is_file()
True
>>> from os.path import dirname
>>> Path(dirname(__file__)).is_file()
False
list_dir(files=True, directories=True)[source]

List the files and/or sub-directories in this directory.

Return type:

Iterator[Path]

Returns:

an iterable with the fully-qualified paths

>>> from tempfile import mkstemp, mkdtemp
>>> from os import close as osxclose
>>> dir1 = Path(mkdtemp())
>>> dir2 = Path(mkdtemp(dir=dir1))
>>> dir3 = Path(mkdtemp(dir=dir1))
>>> (h, tf1) = mkstemp(dir=dir1)
>>> osclose(h)
>>> (h, tf2) = mkstemp(dir=dir1)
>>> osclose(h)
>>> file1 = Path(tf1)
>>> file2 = Path(tf2)
>>> set(dir1.list_dir()) == {dir2, dir3, file1, file2}
True
>>> set(dir1.list_dir(files=False)) == {dir2, dir3}
True
>>> set(dir1.list_dir(directories=False)) == {file1, file2}
True
>>> try:
...     dir1.list_dir(None)
... except TypeError as te:
...     print(te)
files should be an instance of bool but is None.
>>> try:
...     dir1.list_dir(1)
... except TypeError as te:
...     print(te)
files should be an instance of bool but is int, namely '1'.
>>> try:
...     dir1.list_dir(True, None)
... except TypeError as te:
...     print(te)
directories should be an instance of bool but is None.
>>> try:
...     dir1.list_dir(True, 1)
... except TypeError as te:
...     print(te)
directories should be an instance of bool but is int, namely '1'.
>>> try:
...     dir1.list_dir(False, False)
... except ValueError as ve:
...     print(ve)
files and directories cannot both be False.
>>> delete_path(dir1)
open_for_read()[source]

Open this file for reading text.

The resulting text stream will automatically use the right encoding and take any encoding error serious. If the path does not identify an existing file, an exception is thrown.

Return type:

TextIOBase

Returns:

the file open for reading

Raises:

ValueError – if the path does not identify a file

>>> with Path(__file__).open_for_read() as rd:
...     print(f"{len(rd.readline())}")
...     print(f"{rd.readline()!r}")
4
'The class `Path` for handling paths to files and directories.\n'
>>> from os.path import dirname
>>> try:
...     with Path(dirname(__file__)).open_for_read():
...         pass
... except ValueError as ve:
...     print(str(ve)[-25:])
does not identify a file.
open_for_write()[source]

Open the file for writing UTF-8 encoded text.

If the path cannot be opened for writing, some error will be raised.

Return type:

TextIOBase

Returns:

the text io wrapper for writing

Raises:

ValueError – if the path does not identify a file or such a file cannot be created

>>> from tempfile import mkstemp
>>> from os import remove as osremovex
>>> h, p = mkstemp(text=True)
>>> osclose(h)
>>> with Path(p).open_for_write() as wd:
...     wd.write("1234")
4
>>> Path(p).read_all_str()
'1234'
>>> osremovex(p)
>>> from os.path import dirname
>>> try:
...     with Path(dirname(__file__)).open_for_write() as wd:
...         pass
... except ValueError as ve:
...     print("does not identify a file." in str(ve))
True
read_all_str()[source]

Read a file as a single string.

Read the complete contents of a file as a single string. If the file is empty, an exception will be raised. No modification is applied to the text that is read.

Return type:

str

Returns:

the single string of text

Raises:

ValueError – if the path does not identify a file or if the file it identifies is empty

>>> Path(__file__).read_all_str()[4:30]
'The class `Path` for handl'
>>> from os.path import dirname
>>> try:
...     Path(dirname(__file__)).read_all_str()
... except ValueError as ve:
...     print(str(ve)[-25:])
does not identify a file.
>>> from tempfile import mkstemp
>>> from os import remove as osremovex
>>> h, p = mkstemp(text=True)
>>> osclose(h)
>>> try:
...     Path(p).read_all_str()
... except ValueError as ve:
...     print(str(ve)[-19:])
' contains no text.
>>> with open(p, "wt") as tx:
...     tx.write("aa\n")
...     tx.write(" bb   ")
3
6
>>> Path(p).read_all_str()
'aa\n bb   '
>>> osremovex(p)
relative_to(base_path)[source]

Compute a relative path of this path towards the given base path.

Parameters:

base_path (str) – the string

Return type:

str

Returns:

a relative path

Raises:

ValueError – if this path is not inside base_path or the relativization result is otherwise invalid

>>> from os.path import dirname
>>> f = file_path(__file__)
>>> d1 = directory_path(dirname(f))
>>> d2 = directory_path(dirname(d1))
>>> d3 = directory_path(dirname(d2))
>>> f.relative_to(d1)
'path.py'
>>> f.relative_to(d2)
'io/path.py'
>>> f.relative_to(d3)
'pycommons/io/path.py'
>>> d1.relative_to(d3)
'pycommons/io'
>>> d1.relative_to(d1)
'.'
>>> try:
...     d1.relative_to(f)
... except ValueError as ve:
...     print(str(ve)[-30:])
does not identify a directory.
>>> try:
...     d2.relative_to(d1)
... except ValueError as ve:
...     print(str(ve)[-21:])
pycommons/pycommons'.
resolve_inside(relative_path)[source]

Resolve a relative path to an absolute path inside this path.

Resolve the relative path inside this path. This path must identify a directory. The relative path cannot contain anything that makes it leave the directory, e.g., any “..”. The paths are joined and then it is enforced that this path must contain the result via enforce_contains() and otherwise an error is raised.

Parameters:

relative_path (str) – the path to resolve

Return type:

Path

Returns:

the resolved child path

Raises:
  • TypeError – If the relative_path is not a string.

  • ValueError – If the relative_path would resolve to something outside of this path and/or if it is empty.

>>> from os.path import dirname
>>> Path(dirname(__file__)).resolve_inside("a.txt")[-5:]
'a.txt'
>>> from os.path import basename
>>> Path(dirname(__file__)).resolve_inside(basename(__file__)) == Path(__file__)
True
>>> try:
...     Path(dirname(__file__)).resolve_inside("..")
... except ValueError as ve:
...     print("does not contain" in str(ve))
True
>>> try:
...     Path(__file__).resolve_inside("..")
... except ValueError as ve:
...     print("does not identify a directory" in str(ve))
True
>>> try:
...     Path(dirname(__file__)).resolve_inside(None)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'NoneType'
>>> try:
...     Path(dirname(__file__)).resolve_inside(2)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     Path(__file__).resolve_inside("")
... except ValueError as ve:
...     print(ve)
Relative path must not be empty.
>>> try:
...     Path(__file__).resolve_inside(" ")
... except ValueError as ve:
...     print(ve)
Relative path must not start or end with white space, but ' ' does.
up(levels=1)[source]

Go up the directory tree for a given number of times.

Get a Path identifying the containing directory, or its containing directory, depending on the number of levels specified.

Parameters:

levels (int, default: 1) – the number levels to go up: 1 for getting the directly containing directory, 2 for the next higher directory, and so on.

Return type:

Path

Returns:

the resulting path

>>> f = file_path(__file__)
>>> print(f.up()[-13:])
/pycommons/io
>>> print(f.up(1)[-13:])
/pycommons/io
>>> print(f.up(2)[-10:])
/pycommons
>>> try:
...     f.up(0)
... except ValueError as ve:
...     print(ve)
levels=0 is invalid, must be in 1..255.
>>> try:
...     f.up(None)
... except TypeError as te:
...     print(te)
levels should be an instance of int but is None.
>>> try:
...     f.up('x')
... except TypeError as te:
...     print(te)
levels should be an instance of int but is str, namely 'x'.
>>> try:
...     f.up(255)
... except ValueError as ve:
...     print(str(ve)[:70])
Cannot go up from directory '/' anymore when going up for 255 levels f
write_all_str(contents)[source]

Write the given string to the file.

The string contents is written to a file. If it does not end with n, then n will automatically be appended. No other changes are applied to contents. contents must be a str and it must not be empty.

Parameters:

contents (str) – the contents to write

Raises:
  • TypeError – if the contents are not a string or an Iterable of strings

  • ValueError – if the path is not a file or it cannot be opened as a file or the contents are an empty string

>>> from tempfile import mkstemp
:rtype: :sphinx_autodoc_typehints_type:`\:py\:obj\:\`None\``
>>> from os import remove as osremovex
>>> h, p = mkstemp(text=True)
>>> osclose(h)
>>> try:
...     Path(p).write_all_str(None)
... except TypeError as te:
...     print(str(te))
descriptor '__len__' requires a 'str' object but received a 'NoneType'
>>> try:
...     Path(p).write_all_str(["a"])
... except TypeError as te:
...     print(str(te))
descriptor '__len__' requires a 'str' object but received a 'list'
>>> Path(p).write_all_str("\na\nb")
>>> Path(p).read_all_str()
'\na\nb\n'
>>> Path(p).write_all_str(" \na\n b ")
>>> Path(p).read_all_str()
' \na\n b \n'
>>> try:
...     Path(p).write_all_str("")
... except ValueError as ve:
...     print(str(ve)[:34])
Cannot write empty content to file
>>> osremovex(p)
>>> from os.path import dirname
>>> try:
...     Path(dirname(__file__)).write_all_str("a")
... except ValueError as ve:
...     print("does not identify a file." in str(ve))
True
pycommons.io.path.UTF8: Final[str] = 'utf-8-sig'

the UTF-8 encoding

pycommons.io.path.delete_path(path)[source]

Delete a path, completely, and recursively.

This is intentionally inserted as an additional function and not a member of the Path in order make the deletion more explicit and to avoid any form of accidental deleting. This function will not raise an error if the file deletion fails.

Parameters:

path (str) – The path to be deleted

Raises:
  • ValueError – if path does not refer to an existing file or directory

  • TypeError – if path is not a string

Return type:

None

>>> from tempfile import mkstemp, mkdtemp
>>> from os import close as osxclose
>>> (h, tf) = mkstemp()
>>> isfile(tf)
True
>>> delete_path(tf)
>>> isfile(tf)
False
>>> try:
...     delete_path(tf)
... except ValueError as ve:
...     print(str(ve).endswith("is neither file nor directory."))
True
>>> td = mkdtemp()
>>> isdir(td)
True
>>> delete_path(td)
>>> isdir(td)
False
>>> try:
...     delete_path(tf)
... except ValueError as ve:
...     print(str(ve).endswith("is neither file nor directory."))
True
pycommons.io.path.directory_path(pathstr)[source]

Get a path identifying an existing directory.

This is a shorthand for creating a Path and then invoking enforce_dir().

Parameters:

pathstr (str) – the path

Return type:

Path

Returns:

the file

>>> from os.path import dirname
>>> directory_path(dirname(__file__))[-12:]
'pycommons/io'
>>> try:
...     directory_path(__file__)
... except ValueError as ve:
...     print("does not identify a directory." in str(ve))
True
pycommons.io.path.file_path(pathstr)[source]

Get a path identifying an existing file.

This is a shorthand for creating a Path and then invoking enforce_file().

Parameters:

pathstr (str) – the path

Return type:

Path

Returns:

the file

>>> file_path(__file__)[-20:]
'pycommons/io/path.py'
>>> from os.path import dirname
>>> try:
...     file_path(dirname(__file__))
... except ValueError as ve:
...     print("does not identify a file." in str(ve))
True
pycommons.io.path.line_writer(output)[source]

Create a line-writing typing.Callable from an output stream.

This function takes any string passed to it and writes it to the typing.TextIO instance. If the string does not end in “n”, it then writes “n” as well to terminate the line. If something that is not a str is passed in, it will throw a TypeError.

Notice that write() and writelines() of class io.TextIOBase do not terminate lines that are written with a “n”. This means that, unless you manually make sure that all lines are terminated by “n”, they get written as a single line instead of multiple lines. To solve this issue conveniently, we provide the functions line_writer(), which wraps the write() into another function, which automatically terminates all strings passed to it with “n” unless they already end in “n”, and write_lines(), which iterates over a sequence of strings and writes each of them to a given typing.TextIO and automatically adds the “n” terminator to each of them if necessary.

Parameters:

output (TextIO | TextIOBase) – the output stream

Return type:

Callable[[str], None]

Returns:

an instance of typing.Callable that will write each string it receives as a properly terminated line to the output stream.

Raises:

TypeError – if output is not an instance of io.TextIOBase.

>>> from tempfile import mkstemp
>>> from os import close as osclose
>>> from os import remove as osremove
>>> (h, tf) = mkstemp()
>>> osclose(h)
>>> with open(tf, "wt") as out:
...     w = line_writer(out)
...     w("123")
>>> with open(tf, "rt") as inp:
...     print(list(inp))
['123\n']
>>> with open(tf, "wt") as out:
...     w = line_writer(out)
...     w("")
>>> with open(tf, "rt") as inp:
...     print(list(inp))
['\n']
>>> with open(tf, "wt") as out:
...     w = line_writer(out)
...     w("123\n")
>>> with open(tf, "rt") as inp:
...     print(list(inp))
['123\n']
>>> with open(tf, "wt") as out:
...     w = line_writer(out)
...     w("\n")
>>> with open(tf, "rt") as inp:
...     print(list(inp))
['\n']
>>> with open(tf, "wt") as out:
...     w = line_writer(out)
...     w("123")
...     w("456")
>>> with open(tf, "rt") as inp:
...     print(list(inp))
['123\n', '456\n']
>>> with open(tf, "wt") as out:
...     w = line_writer(out)
...     w("123  ")
...     w("")
...     w("  456")
>>> with open(tf, "rt") as inp:
...     print(list(inp))
['123  \n', '\n', '  456\n']
>>> with open(tf, "wt") as out:
...     w = line_writer(out)
...     w("123  \n")
...     w("\n")
...     w("  456")
>>> with open(tf, "rt") as inp:
...     print(list(inp))
['123  \n', '\n', '  456\n']
>>> try:
...     with open(tf, "wt") as out:
...         w = line_writer(out)
...         w("123  ")
...         w(None)
... except TypeError as te:
...     print(str(te)[:-10])
descriptor 'endswith' for 'str' objects doesn't apply to a 'NoneTy
>>> try:
...     with open(tf, "wt") as out:
...         w = line_writer(out)
...         w("123  ")
...         w(2)
... except TypeError as te:
...     print(te)
descriptor 'endswith' for 'str' objects doesn't apply to a 'int' object
>>> osremove(tf)
>>> try:
...     line_writer(1)
... except TypeError as te:
...     print(te)
output should be an instance of io.TextIOBase but is int, namely '1'.
>>> try:
...     line_writer(None)
... except TypeError as te:
...     print(te)
output should be an instance of io.TextIOBase but is None.
pycommons.io.path.write_lines(lines, output)[source]

Write all the lines in the given typing.Iterable to the output.

This function takes care of properly terminating lines using “n” when writing them to an output and also performs type-checking.

Notice that write() and writelines() of class io.TextIOBase do not terminate lines that are written with a “n”. This means that, unless you manually make sure that all lines are terminated by “n”, they get written as a single line instead of multiple lines. To solve this issue conveniently, we provide the functions line_writer(), which wraps the write() into another function, which automatically terminates all strings passed to it with “n” unless they already end in “n”, and write_lines(), which iterates over a sequence of strings and writes each of them to a given typing.TextIO and automatically adds the “n” terminator to each of them if necessary.

Parameters:
Raises:

TypeError – If anything is of the wrong type.

Return type:

None

>>> from io import StringIO
>>> with StringIO() as sio:
...     write_lines(("123", "456"), sio)
...     print(sio.getvalue())
123
456
>>> from io import StringIO
>>> with StringIO() as sio:
...     write_lines(("123\n", "456"), sio)
...     print(sio.getvalue())
123
456
>>> from io import StringIO
>>> with StringIO() as sio:
...     write_lines(("123\n", "456\n"), sio)
...     print(sio.getvalue())
123
456
>>> with StringIO() as sio:
...     write_lines(["123"], sio)
...     print(sio.getvalue())
123
>>> with StringIO() as sio:
...     write_lines(["123\n"], sio)
...     print(sio.getvalue())
123
>>> with StringIO() as sio:
...     write_lines("123", sio)
...     print(sio.getvalue())
1
2
3
>>> with StringIO() as sio:
...     write_lines((sss for sss in ["123", "abc"]), sio)
...     print(sio.getvalue())
123
abc
>>> with StringIO() as sio:
...     write_lines("", sio)
...     print(sio.getvalue())
>>> from tempfile import mkstemp
>>> from os import close as osclose
>>> from os import remove as osremove
>>> (h, tf) = mkstemp()
>>> osclose(h)
>>> with open(tf, "wt") as out:
...     write_lines(["123"], out)
>>> with open(tf, "rt") as inp:
...     print(list(inp))
['123\n']
>>> with open(tf, "wt") as out:
...     write_lines([""], out)
>>> with open(tf, "rt") as inp:
...     print(repr(inp.read()))
'\n'
>>> with open(tf, "wt") as out:
...     write_lines(["\n"], out)
>>> with open(tf, "rt") as inp:
...     print(repr(inp.read()))
'\n'
>>> with open(tf, "wt") as out:
...     write_lines([" \n"], out)
>>> with open(tf, "rt") as inp:
...     print(repr(inp.read()))
' \n'
>>> osremove(tf)
>>> with StringIO() as sio:
...     write_lines(["\n"], sio)
...     print(repr(sio.getvalue()))
'\n'
>>> with StringIO() as sio:
...     write_lines([""], sio)
...     print(repr(sio.getvalue()))
'\n'
>>> sio = StringIO()
>>> try:
...     write_lines(None, sio)
... except TypeError as te:
...     print(te)
lines should be an instance of typing.Iterable but is None.
>>> sio = StringIO()
>>> try:
...     write_lines(123, sio)
... except TypeError as te:
...     print(te)
lines should be an instance of typing.Iterable but is int, namely '123'.
>>> sio = StringIO()
>>> try:
...     write_lines([1, "sdf"], sio)
... except TypeError as te:
...     print(te)
descriptor 'endswith' for 'str' objects doesn't apply to a 'int' object
>>> sio = StringIO()
>>> try:
...     write_lines(["sdf", 1], sio)
... except TypeError as te:
...     print(te)
descriptor 'endswith' for 'str' objects doesn't apply to a 'int' object
>>> print(repr(sio.getvalue()))
'sdf\n'
>>> try:
...     write_lines("x", None)
... except TypeError as te:
...     print(te)
output should be an instance of io.TextIOBase but is None.
>>> try:
...     write_lines("x", 1)
... except TypeError as te:
...     print(te)
output should be an instance of io.TextIOBase but is int, namely '1'.
>>> try:
...     write_lines(2, 1)
... except TypeError as te:
...     print(te)
lines should be an instance of typing.Iterable but is int, namely '2'.

pycommons.io.temp module

Automatically deleted temporary files and directories.

This module provides two classes, temp_dir() for temporary directories and temp_file() for temporary files. Both of them implement the typing.ContextManager protocol and will be deleted when going out of scope.

class pycommons.io.temp.TempPath(value: str)[source]

Bases: Path

A path to a temp file or directory for use in a with statement.

pycommons.io.temp.temp_dir(directory=None)[source]

Create the temporary directory.

Parameters:

directory (Optional[str], default: None) – an optional root directory

Raises:

TypeError – if directory is not None but also no str

>>> with temp_dir() as td:
...     pass
>>> try:
...     with temp_dir(1):
...         pass
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
:rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`\~pycommons.io.temp.TempPath\``
>>> from os.path import dirname
>>> with temp_dir(dirname(__file__)) as td:
...     pass
pycommons.io.temp.temp_file(directory=None, prefix=None, suffix=None)[source]

Create a temporary file that will be deleted when going out of scope.

Parameters:
  • directory (Optional[str], default: None) – a root directory or TempDir instance

  • prefix (Optional[str], default: None) – an optional prefix

  • suffix (Optional[str], default: None) – an optional suffix, e.g., .txt

Raises:
  • TypeError – if any of the parameters does not fulfill the type contract

  • ValueError – if the prefix or suffix are specified, but are empty strings, or if directory does not identify an existing directory although not being None

>>> with temp_file() as tf:
...     tf.is_file()
...     p = Path(tf)
...     p.is_file()
True
:rtype: :sphinx_autodoc_typehints_type:`\:py\:class\:\`\~pycommons.io.temp.TempPath\``
True
>>> p.is_file()
False
>>> try:
...     temp_file(1)
... except TypeError as te:
...     print(te)
descriptor '__len__' requires a 'str' object but received a 'int'
>>> try:
...     temp_file("")
... except ValueError as ve:
...     print(ve)
Path must not be empty.
>>> try:
...     temp_file(None, 1)
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> try:
...     temp_file(None, None, 1)
... except TypeError as te:
...     print(te)
descriptor 'strip' for 'str' objects doesn't apply to a 'int' object
>>> try:
...     temp_file(None, "")
... except ValueError as ve:
...     print(ve)
Stripped prefix cannot be empty if specified.
>>> try:
...     temp_file(None, None, "")
... except ValueError as ve:
...     print(ve)
Stripped suffix cannot be empty if specified.
>>> try:
...     temp_file(None, None, "bla.")
... except ValueError as ve:
...     print(ve)
Stripped suffix must not end with '.', but 'bla.' does.
>>> try:
...     temp_file(None, None, "bl/a")
... except ValueError as ve:
...     print(ve)
Suffix must contain neither '/' nor '\', but 'bl/a' does.
>>> try:
...     temp_file(None, None, "b\\la")
... except ValueError as ve:
...     print(ve)
Suffix must contain neither '/' nor '\', but 'b\\la' does.
>>> try:
...     temp_file(None, "bl/a", None)
... except ValueError as ve:
...     print(ve)
Prefix must contain neither '/' nor '\', but 'bl/a' does.
>>> try:
...     temp_file(None, "b\\la", None)
... except ValueError as ve:
...     print(ve)
Prefix must contain neither '/' nor '\', but 'b\\la' does.
>>> from os.path import dirname
>>> from pycommons.io.path import file_path
>>> bd = directory_path(dirname(__file__))
>>> with temp_file(bd) as tf:
...     bd.enforce_contains(tf)
...     bd in tf
...     p = file_path(str(f"{tf}"))
True
>>> p.is_file()
False
>>> from os.path import basename
>>> with temp_file(None, "pre") as tf:
...     "pre" in tf
...     bd.contains(tf)
...     basename(tf).startswith("pre")
...     p = file_path(str(f"{tf}"))
True
False
True
>>> p.is_file()
False
>>> with temp_file(bd, "pre") as tf:
...     "pre" in tf
...     bd.contains(tf)
...     basename(tf).startswith("pre")
...     p = file_path(str(f"{tf}"))
True
True
True
>>> p.is_file()
False
>>> with temp_file(bd, None, "suf") as tf:
...     "suf" in tf
...     bd.contains(tf)
...     tf.endswith("suf")
...     p = file_path(str(f"{tf}"))
True
True
True
>>> p.is_file()
False
>>> with temp_file(None, None, "suf") as tf:
...     "suf" in tf
...     tf.endswith("suf")
...     bd.contains(tf)
...     p = file_path(str(f"{tf}"))
True
True
False
>>> p.is_file()
False
>>> with temp_file(None, "pref", "suf") as tf:
...     tf.index("pref") < tf.index("suf")
...     tf.endswith("suf")
...     basename(tf).startswith("pref")
...     bd.contains(tf)
...     p = file_path(str(f"{tf}"))
True
True
True
False
>>> p.is_file()
False
>>> with temp_file(bd, "pref", "suf") as tf:
...     tf.index("pref") < tf.index("suf")
...     tf.endswith("suf")
...     basename(tf).startswith("pref")
...     bd.contains(tf)
...     p = file_path(str(f"{tf}"))
True
True
True
True
>>> p.is_file()
False