Coverage for pycommons / math / rank.py: 100%
43 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-15 10:00 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-15 10:00 +0000
1"""Some tools for ranking data."""
3from math import isfinite
4from operator import add, itemgetter
5from typing import Callable, Final, Iterable, TypeVar
7from pycommons.types import type_error
9#: the type of the element to sort during ranking
10T = TypeVar("T")
12#: the type of the key to use during ranking
13K = TypeVar("K")
15#: the rank type: must either be int, float, or a union of both
16R = TypeVar("R", int, float, int | float)
18#: the output type var
19X = TypeVar("X")
22def rank(source: Iterable[T],
23 key: Callable[[T], K] = lambda x: x, # type: ignore
24 output: Callable[[R, T, K], X] = # type: ignore
25 lambda rr, tt, _: (rr, tt), # type: ignore
26 rank_join: Callable[[int, int], R] = add, # type: ignore
27 rank_offset: int = 0) -> list[X]:
28 """
29 Rank the elements in a data source based on a given key function.
31 The default behavior of this function is to basically sort the data from
32 `source` and return tuples with the rank and the data element in a list.
33 The result list is sorted by the keys of the object.
35 By default, ranks start at `0` and increase in steps of `2`. The ranks of
36 objects that would have the same rank are resolved by averaging their
37 ranks. This is why we increment ranks in steps of `2`: This way, the mean
38 of two ranks is always an integer:
40 >>> rank([3, 6, 6, 12])
41 [(0, 3), (3, 6), (3, 6), (6, 12)]
43 This averaging can be modified by providing a `rank_join` function that
44 computes a joint rank for objects as well as a `rank_offset`:
46 >>> rank([3, 6, 6, 12], rank_join=lambda a, b: 0.5 * (a + b))
47 [(0.0, 3), (1.5, 6), (1.5, 6), (3.0, 12)]
49 >>> rank([3, 6, 6, 12], rank_join=min, rank_offset=1)
50 [(1, 3), (2, 6), (2, 6), (4, 12)]
52 >>> rank([3, 6, 6, 12], rank_join=max, rank_offset=1)
53 [(1, 3), (3, 6), (3, 6), (4, 12)]
55 However, the result of `rank_offset + rank_join(a, b)` must always be
56 either an `int` or a `float` and also always finite and never negative,
57 for any two non-negative integers `a` and `b`.
59 The `key` function must compute a key for each element of `source` which
60 can be used for sorting. By default, it returns the element itself.
61 But it can be customized.
63 >>> rank((6, 5, 3, 4, 0, 7))
64 [(0, 0), (2, 3), (4, 4), (6, 5), (8, 6), (10, 7)]
66 >>> sorted(rank({"a", "c", "X", "y", "x", "xx", "L", "l"}, key=str.lower))
67 [(0, 'a'), (2, 'c'), (5, 'L'), (5, 'l'), (9, 'X'), (9, 'x'), (12, 'xx'), \
68(14, 'y')]
70 The `output` function is used to create the records to be placed in the
71 list returned by this function. Its input are the rank, the object, and
72 its computed key. By default, it creates tuples of the rank and the object
73 obtained from `source`. You can customize this as well:
75 >>> rank([5, 7, 4, 9, 2, 1])
76 [(0, 1), (2, 2), (4, 4), (6, 5), (8, 7), (10, 9)]
78 >>> rank([5, 7, 4, 9, 2, 1], output=lambda rr, oo, kk: f"{rr}:{oo}")
79 ['0:1', '2:2', '4:4', '6:5', '8:7', '10:9']
81 >>> rank([5, 7, 4, 19, 2, 1], key=str,
82 ... output=lambda rr, oo, kk: (rr, oo, kk))
83 [(0, 1, '1'), (2, 19, '19'), (4, 2, '2'), (6, 4, '4'), (8, 5, '5'), \
84(10, 7, '7')]
86 :param source: the data source
87 :param key: a function returning the key for a given object
88 :param output: a function creating the output object, receiving the rank,
89 original object, and key as input
90 :param rank_join: a function for joining a maximum and minimum index of an
91 object to a rank; by default this returns the sum of both
92 :param rank_offset: an offset to be added to the ranks
93 :returns: a list with the objects generated by the `output` function,
94 which by default are tuples of rank and object
96 >>> rank({})
97 []
98 >>> rank([12])
99 [(0, 12)]
100 >>> rank([12, 3])
101 [(0, 3), (2, 12)]
102 >>> rank([3, 12], rank_offset=2)
103 [(2, 3), (4, 12)]
104 >>> rank([12, 12])
105 [(1, 12), (1, 12)]
106 >>> rank([12, 12], output=lambda rr, tt, kk: rr)
107 [1, 1]
108 >>> rank([-1, 0, 4, 3, 3, 5, 6, 1])
109 [(0, -1), (2, 0), (4, 1), (7, 3), (7, 3), (10, 4), (12, 5), (14, 6)]
110 >>> rank([-1, 0, 4, 3, 3, 5, 6, 1], rank_join=lambda a, b: 0.5 * (a + b))
111 [(0.0, -1), (1.0, 0), (2.0, 1), (3.5, 3), (3.5, 3), (5.0, 4), (6.0, 5), \
112(7.0, 6)]
113 >>> sorted(rank(("a", "B", "c", "b", "A", "A", "cc"), key=str.casefold))
114 [(2, 'A'), (2, 'A'), (2, 'a'), (7, 'B'), (7, 'b'), (10, 'c'), (12, 'cc')]
116 >>> try:
117 ... rank(1)
118 ... except TypeError as te:
119 ... print(te)
120 source should be an instance of typing.Iterable but is int, namely 1.
122 >>> try:
123 ... rank([], key=1)
124 ... except TypeError as te:
125 ... print(te)
126 key should be a callable but is int, namely 1.
128 >>> try:
129 ... rank([], output=1)
130 ... except TypeError as te:
131 ... print(te)
132 output should be a callable but is int, namely 1.
134 >>> try:
135 ... rank([], rank_join=1)
136 ... except TypeError as te:
137 ... print(te)
138 rank_join should be a callable but is int, namely 1.
140 >>> try:
141 ... rank([], rank_offset="x")
142 ... except TypeError as te:
143 ... print(te)
144 rank_offset should be an instance of any in {float, int} \
145but is str, namely 'x'.
147 >>> from math import inf, nan
148 >>> try:
149 ... rank([], rank_offset=inf)
150 ... except ValueError as ve:
151 ... print(ve)
152 rank_offset=inf should be finite
154 >>> try:
155 ... rank([], rank_offset=nan)
156 ... except ValueError as ve:
157 ... print(ve)
158 rank_offset=nan should be finite
160 >>> try:
161 ... rank([1, 2, 3], rank_join=lambda a, b: "x")
162 ... except TypeError as te:
163 ... print(te)
164 rank_join(0, 0) should be an instance of any in {float, int} \
165but is str, namely 'x'.
167 >>> try:
168 ... rank([1, 2, 3], rank_join=lambda a, b: inf)
169 ... except ValueError as ve:
170 ... print(ve)
171 rank inf=rank_join(0, 0) + 0 is not finite and non-negative.
173 >>> try:
174 ... rank([1, 2, 3], rank_join=lambda a, b: nan)
175 ... except ValueError as ve:
176 ... print(ve)
177 rank nan=rank_join(0, 0) + 0 is not finite and non-negative.
178 """
179 if not isinstance(source, Iterable):
180 raise type_error(source, "source", Iterable)
181 if not callable(key):
182 raise type_error(key, "key", call=True)
183 if not callable(output):
184 raise type_error(output, "output", call=True)
185 if not callable(rank_join):
186 raise type_error(rank_join, "rank_join", call=True)
187 if not isinstance(rank_offset, int | float):
188 raise type_error(rank_offset, "rank_offset", (int, float))
189 if not isfinite(rank_offset):
190 raise ValueError(
191 f"rank_offset={rank_offset} should be finite")
193 data: list = [(key(t), t) for t in source] # convert data to list
194 max_hi: Final[int] = list.__len__(data) - 1 # maximum index
195 if max_hi < 0: # data is empty, can return it as-is
196 return data
198 data.sort(key=itemgetter(0)) # sort the data by the key
200 lo: int = 0
201 while lo <= max_hi: # iterate through all the data
202 # first, we obtain the index range of objects with the same rank
203 lo_key = data[lo][0]
204 hi: int = lo
205 while (hi < max_hi) and (not lo_key < data[hi + 1][0]):
206 hi += 1
208 r: R = rank_join(lo, hi) # compute the joint rank
209 if not isinstance(r, int | float): # sanity check of rank, part 1
210 raise type_error(r, f"rank_join({lo}, {hi})", (int, float))
212 r += rank_offset
213 if (not isfinite(r)) or (r < 0): # sanity check of rank, part 2
214 raise ValueError(f"rank {r}=rank_join({lo}, {hi}) + {rank_offset}"
215 " is not finite and non-negative.")
217 for i in range(lo, hi + 1): # assign rank and create output objects
218 dk, dt = data[i]
219 data[i] = output(r, dt, dk)
221 lo = hi + 1 # move on to next object
223 return data # return finalized list