Coverage for pycommons / math / rank.py: 100%
43 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 03:04 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 03:04 +0000
1"""Some tools for ranking data."""
3from math import isfinite
4from operator import add, itemgetter
5from typing import Callable, Final, Iterable, TypeVar
7from pycommons.types import type_error
9#: the type of the element to sort during ranking
10T = TypeVar("T")
12#: the type of the key to use during ranking
13K = TypeVar("K")
15#: the rank type: must either be int, float, or a union of both
16R = TypeVar("R", int, float, int | float)
18#: the output type var
19X = TypeVar("X")
22def rank(source: Iterable[T],
23 key: Callable[[T], K] = lambda x: x, # type: ignore
24 output: Callable[[R, T, K], X] = # type: ignore
25 lambda rr, tt, _: (rr, tt), # type: ignore
26 rank_join: Callable[[int, int], R] = add, # type: ignore
27 rank_offset: int = 0) -> list[X]:
28 """
29 Rank the elements in a data source based on a given key function.
31 The default behavior of this function is to basically sort the data from
32 `source` and return tuples with the rank and the data element in a list.
33 The result list is sorted by the keys of the object.
35 By default, ranks start at `0` and increase in steps of `2`. The ranks of
36 objects that would have the same rank are resolved by averaging their
37 ranks. This is why we increment ranks in steps of `2`: This way, the mean
38 of two ranks is always an integer:
39 >>> rank([3, 6, 6, 12])
40 [(0, 3), (3, 6), (3, 6), (6, 12)]
42 This averaging can be modified by providing a `rank_join` function that
43 computes a joint rank for objects as well as a `rank_offset`:
44 >>> rank([3, 6, 6, 12], rank_join=lambda a, b: 0.5 * (a + b))
45 [(0.0, 3), (1.5, 6), (1.5, 6), (3.0, 12)]
47 >>> rank([3, 6, 6, 12], rank_join=min, rank_offset=1)
48 [(1, 3), (2, 6), (2, 6), (4, 12)]
50 >>> rank([3, 6, 6, 12], rank_join=max, rank_offset=1)
51 [(1, 3), (3, 6), (3, 6), (4, 12)]
53 However, the result of `rank_offset + rank_join(a, b)` must always be
54 either an `int` or a `float` and also always finite and never negative,
55 for any two non-negative integers `a` and `b`.
57 The `key` function must compute a key for each element of `source` which
58 can be used for sorting. By default, it returns the element itself.
59 But it can be customized.
61 >>> rank((6, 5, 3, 4, 0, 7))
62 [(0, 0), (2, 3), (4, 4), (6, 5), (8, 6), (10, 7)]
64 >>> sorted(rank({"a", "c", "X", "y", "x", "xx", "L", "l"}, key=str.lower))
65 [(0, 'a'), (2, 'c'), (5, 'L'), (5, 'l'), (9, 'X'), (9, 'x'), (12, 'xx'), \
66(14, 'y')]
68 The `output` function is used to create the records to be placed in the
69 list returned by this function. Its input are the rank, the object, and
70 its computed key. By default, it creates tuples of the rank and the object
71 obtained from `source`. You can customize this as well:
72 >>> rank([5, 7, 4, 9, 2, 1])
73 [(0, 1), (2, 2), (4, 4), (6, 5), (8, 7), (10, 9)]
75 >>> rank([5, 7, 4, 9, 2, 1], output=lambda rr, oo, kk: f"{rr}:{oo}")
76 ['0:1', '2:2', '4:4', '6:5', '8:7', '10:9']
78 >>> rank([5, 7, 4, 19, 2, 1], key=str,
79 ... output=lambda rr, oo, kk: (rr, oo, kk))
80 [(0, 1, '1'), (2, 19, '19'), (4, 2, '2'), (6, 4, '4'), (8, 5, '5'), \
81(10, 7, '7')]
83 :param source: the data source
84 :param key: a function returning the key for a given object
85 :param output: a function creating the output object, receiving the rank,
86 original object, and key as input
87 :param rank_join: a function for joining a maximum and minimum index of an
88 object to a rank; by default this returns the sum of both
89 :param rank_offset: an offset to be added to the ranks
90 :returns: a list with the objects generated by the `output` function,
91 which by default are tuples of rank and object
93 >>> rank({})
94 []
95 >>> rank([12])
96 [(0, 12)]
97 >>> rank([12, 3])
98 [(0, 3), (2, 12)]
99 >>> rank([3, 12], rank_offset=2)
100 [(2, 3), (4, 12)]
101 >>> rank([12, 12])
102 [(1, 12), (1, 12)]
103 >>> rank([12, 12], output=lambda rr, tt, kk: rr)
104 [1, 1]
105 >>> rank([-1, 0, 4, 3, 3, 5, 6, 1])
106 [(0, -1), (2, 0), (4, 1), (7, 3), (7, 3), (10, 4), (12, 5), (14, 6)]
107 >>> rank([-1, 0, 4, 3, 3, 5, 6, 1], rank_join=lambda a, b: 0.5 * (a + b))
108 [(0.0, -1), (1.0, 0), (2.0, 1), (3.5, 3), (3.5, 3), (5.0, 4), (6.0, 5), \
109(7.0, 6)]
110 >>> sorted(rank(("a", "B", "c", "b", "A", "A", "cc"), key=str.casefold))
111 [(2, 'A'), (2, 'A'), (2, 'a'), (7, 'B'), (7, 'b'), (10, 'c'), (12, 'cc')]
113 >>> try:
114 ... rank(1)
115 ... except TypeError as te:
116 ... print(te)
117 source should be an instance of typing.Iterable but is int, namely 1.
119 >>> try:
120 ... rank([], key=1)
121 ... except TypeError as te:
122 ... print(te)
123 key should be a callable but is int, namely 1.
125 >>> try:
126 ... rank([], output=1)
127 ... except TypeError as te:
128 ... print(te)
129 output should be a callable but is int, namely 1.
131 >>> try:
132 ... rank([], rank_join=1)
133 ... except TypeError as te:
134 ... print(te)
135 rank_join should be a callable but is int, namely 1.
137 >>> try:
138 ... rank([], rank_offset="x")
139 ... except TypeError as te:
140 ... print(te)
141 rank_offset should be an instance of any in {float, int} \
142but is str, namely 'x'.
144 >>> from math import inf, nan
145 >>> try:
146 ... rank([], rank_offset=inf)
147 ... except ValueError as ve:
148 ... print(ve)
149 rank_offset=inf should be finite
151 >>> try:
152 ... rank([], rank_offset=nan)
153 ... except ValueError as ve:
154 ... print(ve)
155 rank_offset=nan should be finite
157 >>> try:
158 ... rank([1, 2, 3], rank_join=lambda a, b: "x")
159 ... except TypeError as te:
160 ... print(te)
161 rank_join(0, 0) should be an instance of any in {float, int} \
162but is str, namely 'x'.
164 >>> try:
165 ... rank([1, 2, 3], rank_join=lambda a, b: inf)
166 ... except ValueError as ve:
167 ... print(ve)
168 rank inf=rank_join(0, 0) + 0 is not finite and non-negative.
170 >>> try:
171 ... rank([1, 2, 3], rank_join=lambda a, b: nan)
172 ... except ValueError as ve:
173 ... print(ve)
174 rank nan=rank_join(0, 0) + 0 is not finite and non-negative.
175 """
176 if not isinstance(source, Iterable):
177 raise type_error(source, "source", Iterable)
178 if not callable(key):
179 raise type_error(key, "key", call=True)
180 if not callable(output):
181 raise type_error(output, "output", call=True)
182 if not callable(rank_join):
183 raise type_error(rank_join, "rank_join", call=True)
184 if not isinstance(rank_offset, int | float):
185 raise type_error(rank_offset, "rank_offset", (int, float))
186 if not isfinite(rank_offset):
187 raise ValueError(
188 f"rank_offset={rank_offset} should be finite")
190 data: list = [(key(t), t) for t in source] # convert data to list
191 max_hi: Final[int] = list.__len__(data) - 1 # maximum index
192 if max_hi < 0: # data is empty, can return it as-is
193 return data
195 data.sort(key=itemgetter(0)) # sort the data by the key
197 lo: int = 0
198 while lo <= max_hi: # iterate through all the data
199 # first, we obtain the index range of objects with the same rank
200 lo_key = data[lo][0]
201 hi: int = lo
202 while (hi < max_hi) and (not lo_key < data[hi + 1][0]):
203 hi += 1
205 r: R = rank_join(lo, hi) # compute the joint rank
206 if not isinstance(r, int | float): # sanity check of rank, part 1
207 raise type_error(r, f"rank_join({lo}, {hi})", (int, float))
209 r += rank_offset
210 if (not isfinite(r)) or (r < 0): # sanity check of rank, part 2
211 raise ValueError(f"rank {r}=rank_join({lo}, {hi}) + {rank_offset}"
212 " is not finite and non-negative.")
214 for i in range(lo, hi + 1): # assign rank and create output objects
215 dk, dt = data[i]
216 data[i] = output(r, dt, dk)
218 lo = hi + 1 # move on to next object
220 return data # return finalized list