Coverage for pycommons / ds / sequences.py: 100%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-02 06:36 +0000

1"""Tools for working with sequences.""" 

2 

3from heapq import merge 

4from typing import Callable, Final, Generator, Iterable, Iterator, TypeVar 

5 

6from pycommons.types import type_error 

7 

8#: the type of the element of the sequences to process 

9T = TypeVar("T") 

10 

11 

12class __Reiterator(Iterable[T]): 

13 """The internal class for re-iteration.""" 

14 

15 def __init__(self, source: Iterator[T]) -> None: 

16 """ 

17 Create the re-iterator. 

18 

19 :param source: the source 

20 """ 

21 #: the original source 

22 self.__source: Final[Iterator[T]] = source 

23 #: we store all elements from source for re-iteration 

24 self.__more: Final[list[T]] = [] 

25 

26 def __iter__(self) -> Generator[T, None, None]: 

27 """ 

28 Generate the sequence of elements. 

29 

30 :returns: the generator 

31 """ 

32 get_length: Callable[[], int] = self.__more.__len__ 

33 get: Callable[[int], T] = self.__more.__getitem__ 

34 append: Final[Callable[[T], None]] = self.__more.append 

35 nexter: Final[Callable[[], T]] = self.__source.__next__ 

36 pos: int = 0 # The next position in __more. 

37 skip: int = -1 # The last element in __more we took from __source. 

38 try: # We always get to the StopIteration of __source. 

39 while True: # Until we reached the end of list and end of iter. 

40 ll: int = get_length() # Get length (may have changed). 

41 while pos < ll: # First, return all elements from __more. 

42 if pos != skip: # Skip = the element we returned last. 

43 yield get(pos) # Yield the __more list element. 

44 pos += 1 # Increment position. 

45 skip = get_length() # Get the length: May have changed! 

46 if skip > ll: # Maybe something changed since yield. 

47 skip = -1 # Invalidate skip, we instead return from list. 

48 continue 

49 other = nexter() # Get next actual iteration element. 

50 append(other) # Store the element in the internal list. 

51 yield other # Yield the element 

52 except StopIteration: 

53 pass 

54 

55 

56def reiterable(source: Iterable[T] | Iterator[T]) -> Iterable[T]: 

57 """ 

58 Ensure that an :class:`Iterable` can be iterated over multiple times. 

59 

60 This function will solidify an :class:`Iterator` into an 

61 :class:`Iterable`. In Python, :class:`Iterator` is a sub-class of 

62 :class:`Iterable`. This means that if your function accepts instances of 

63 :class:`Iterable` as input, it may expect to be able to iterate over them 

64 multiple times. However, if an :class:`Iterator` is passed in, which also 

65 is an instance of :class:`Iterable` and thus fulfills the function's type 

66 requirement, this is not the case. A typical example of this would be if 

67 a :class:`Generator` is passed in. A :class:`Generator` is an instance of 

68 :class:`Iterator`, which, in turn, is an instance of :class:`Iterable`. 

69 However, you can iterate over a :class:`Generator` only once. 

70 

71 For such single-use objects, a new :class:`Iterable` wrapper is created. 

72 This wrapper will iterate over the original sequence, but cache all 

73 elements in an internal :class:`list`. When you iterate over the sequence 

74 again, the elements in the :class:`list` will be used. This means that 

75 all elements of the original sequence will be stored in memory. However, 

76 they are only stored if/when they are actually accessed via the iteration 

77 sequence. If you do not iterate over them completely, they are not all 

78 stored. 

79 

80 This form of re-iterabling is useful if you maybe generate items from a 

81 slower sequence or do not plan to use all of them. If you want to use 

82 all elements several times anyway, it may be more efficient to just wrap 

83 the original source into a :class:`tuple`. But if, for example, your 

84 sequence is the result of iterating over a directory tree on the file 

85 system, or maybe if it comes from loading a file, then using 

86 :func:`reiterable` could be useful. 

87 

88 This is also true if you actually process the generated sequence in some 

89 way that may fail or terminate early. Then, first loading all data into 

90 a :class:`tuple` may be annoying if your first processed element after 

91 that causes a failure or early termination. The bit of overhead of 

92 :func:`reiterable` may then well be worth your while. 

93 

94 Of course, this can only work if the :class:`Iterator` is not otherwise 

95 used after calling this function. If you extract elements from the 

96 :class:`Iterator` by yourself otherwise, maybe via :func:`next`, then 

97 :func:`reiterable` cannot work. However, if you only apply :func:`next` 

98 or other looping paradigms to the :class:`Iterable` returned by 

99 :func:`reiterable`, then you can iterate as often as you want over a 

100 :class:`Generator`, for example. 

101 

102 :param source: the data source 

103 :returns: the resulting re-iterable iterator 

104 :raises TypeError: if `source` is neither an :class:`Iterable` nor an 

105 :class:`Iterator`. 

106 

107 >>> ri = reiterable(iter(range(4))) 

108 >>> for i in ri: 

109 ... print(i) 

110 0 

111 1 

112 2 

113 3 

114 >>> for i in ri: 

115 ... print(i) 

116 0 

117 1 

118 2 

119 3 

120 

121 >>> g = (i ** 2 for i in range(5)) 

122 >>> r = reiterable(g) 

123 >>> tuple(r) 

124 (0, 1, 4, 9, 16) 

125 >>> tuple(r) 

126 (0, 1, 4, 9, 16) 

127 >>> tuple(r) 

128 (0, 1, 4, 9, 16) 

129 >>> tuple(r) 

130 (0, 1, 4, 9, 16) 

131 >>> tuple(g) 

132 () 

133 

134 >>> g = (i ** 2 for i in range(5)) 

135 >>> r = reiterable(g) 

136 >>> i1 = iter(r) 

137 >>> i2 = iter(r) 

138 >>> next(i1) 

139 0 

140 >>> next(i2) 

141 0 

142 >>> next(i2) 

143 1 

144 >>> next(i1) 

145 1 

146 >>> next(i1) 

147 4 

148 >>> next(i1) 

149 9 

150 >>> next(i2) 

151 4 

152 >>> next(i2) 

153 9 

154 >>> i3 = iter(r) 

155 >>> next(i3) 

156 0 

157 >>> next(i3) 

158 1 

159 >>> next(i3) 

160 4 

161 >>> next(i3) 

162 9 

163 >>> next(i3) 

164 16 

165 >>> next(i2) 

166 16 

167 >>> try: 

168 ... next(i2) 

169 ... except StopIteration as si: 

170 ... print(type(si)) 

171 <class 'StopIteration'> 

172 >>> try: 

173 ... next(i3) 

174 ... except StopIteration as si: 

175 ... print(type(si)) 

176 <class 'StopIteration'> 

177 >>> next(i1) 

178 16 

179 >>> try: 

180 ... next(i1) 

181 ... except StopIteration as si: 

182 ... print(type(si)) 

183 <class 'StopIteration'> 

184 

185 >>> a = [1, 2, 3] 

186 >>> reiterable(a) is a 

187 True 

188 

189 >>> a = (1, 2, 3) 

190 >>> reiterable(a) is a 

191 True 

192 

193 >>> a = {1, 2, 3} 

194 >>> reiterable(a) is a 

195 True 

196 

197 >>> a = {1: 1, 2: 2, 3: 3} 

198 >>> reiterable(a) is a 

199 True 

200 

201 >>> k = a.keys() 

202 >>> reiterable(k) is k 

203 True 

204 

205 >>> k = a.values() 

206 >>> reiterable(k) is k 

207 True 

208 

209 >>> tuple(reiterable((x for x in range(5)))) 

210 (0, 1, 2, 3, 4) 

211 

212 >>> try: 

213 ... reiterable(None) 

214 ... except TypeError as te: 

215 ... print(str(te)[:60]) 

216 source should be an instance of any in {typing.Iterable, typ 

217 

218 >>> try: 

219 ... reiterable(1) 

220 ... except TypeError as te: 

221 ... print(str(te)[:60]) 

222 source should be an instance of any in {typing.Iterable, typ 

223 

224 >>> type(merge_sorted_and_return_unique([1, 2, 3,], [2, 2])) 

225 <class 'generator'> 

226 

227 >>> type(reiterable(merge_sorted_and_return_unique([1, 2, 3,], [2, 2]))) 

228 <class 'pycommons.ds.sequences.__Reiterator'> 

229 """ 

230 if isinstance(source, Iterator): 

231 return __Reiterator(source) # solidify iterators into tuples 

232 if not isinstance(source, Iterable): 

233 raise type_error(source, "source", (Iterable, Iterator)) 

234 return source # other iterables can be returned as-is 

235 

236 

237def merge_sorted_and_return_unique( 

238 *seqs: Iterable[T]) -> Generator[T, None, None]: 

239 """ 

240 Merge sorted sequences of integers and return only unique values. 

241 

242 You can provide multiple sequences, all of which must be sorted. 

243 This function then merges them into a single sorted sequence which 

244 contains each elemenet at most once. 

245 A typical use case would be to combine the result of 

246 :func:`pycommons.math.primes.primes` with some pre-defined values into 

247 a sorted sequence. 

248 

249 Notice that the elements of the sequence must support the less-than 

250 operator, i.e., have a `__lt__` dunder method. Otherwise this function 

251 will crash. 

252 

253 The returned sequence is guaranteed to provide strictly increasing values. 

254 

255 :param seqs: the sequences, i.e., some instances of :class:`Iterable` or 

256 :class:`Iterator` 

257 :returns: a merged sequence of integers 

258 :raises TypeError: if any of the provided iterators or any of their 

259 elements is `None`, or if any of the elements in `seqs`is not an 

260 :class:`Iterable`. 

261 

262 >>> list(merge_sorted_and_return_unique([1, 2, 3,], [2, 2])) 

263 [1, 2, 3] 

264 

265 >>> from pycommons.math.primes import primes 

266 >>> list(merge_sorted_and_return_unique(primes(14), [1, 10])) 

267 [1, 2, 3, 5, 7, 10, 11, 13] 

268 

269 >>> list(merge_sorted_and_return_unique( 

270 ... primes(14), primes(17), [1, 2, 10, 100])) 

271 [1, 2, 3, 5, 7, 10, 11, 13, 17, 100] 

272 

273 >>> try: 

274 ... for _ in merge_sorted_and_return_unique(1): 

275 ... pass 

276 ... except TypeError as te: 

277 ... print(te) 

278 'int' object is not iterable 

279 

280 >>> try: 

281 ... for j in merge_sorted_and_return_unique([3], 1): 

282 ... print(j) 

283 ... except TypeError as te: 

284 ... print(te) 

285 'int' object is not iterable 

286 

287 >>> try: 

288 ... for j in merge_sorted_and_return_unique([None], [None]): 

289 ... print(j) 

290 ... except TypeError as te: 

291 ... print(te) 

292 Element must not be None. 

293 

294 >>> try: 

295 ... for j in merge_sorted_and_return_unique([None], [1]): 

296 ... print(j) 

297 ... except TypeError as te: 

298 ... print(te) 

299 '<' not supported between instances of 'NoneType' and 'int' 

300 

301 >>> try: 

302 ... for j in merge_sorted_and_return_unique(None, [1]): 

303 ... print(j) 

304 ... except TypeError as te: 

305 ... print(te) 

306 'NoneType' object is not iterable 

307 

308 >>> try: 

309 ... for j in merge_sorted_and_return_unique([print, len], [repr]): 

310 ... print(j) 

311 ... except TypeError as te: 

312 ... print(te) 

313 '<' not supported between instances of 'builtin_function_or_method' \ 

314and 'builtin_function_or_method' 

315 """ 

316 last: T | None = None 

317 for item in merge(*seqs): 

318 if item is None: 

319 raise TypeError("Element must not be None.") 

320 if (last is None) or (last < item): # type: ignore # noqa 

321 yield item 

322 last = item