Coverage for pycommons / ds / sequences.py: 100%

45 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-06-03 00:58 +0000

1"""Tools for working with sequences.""" 

2 

3from heapq import merge 

4from typing import Callable, Final, Generator, Iterable, Iterator, TypeVar 

5 

6from pycommons.types import type_error 

7 

8#: the type of the element of the sequences to process 

9T = TypeVar("T") 

10 

11 

12class __Reiterator(Iterable[T]): 

13 """The internal class for re-iteration.""" 

14 

15 def __init__(self, source: Iterator[T]) -> None: 

16 """ 

17 Create the re-iterator. 

18 

19 :param source: the source 

20 """ 

21 #: the original source 

22 self.__source: Final[Iterator[T]] = source 

23 #: we store all elements from source for re-iteration 

24 self.__more: Final[list[T]] = [] 

25 

26 def __iter__(self) -> Generator[T, None, None]: 

27 """ 

28 Generate the sequence of elements. 

29 

30 :returns: the generator 

31 """ 

32 get_length: Callable[[], int] = self.__more.__len__ 

33 get: Callable[[int], T] = self.__more.__getitem__ 

34 append: Final[Callable[[T], None]] = self.__more.append 

35 nexter: Final[Callable[[], T]] = self.__source.__next__ 

36 pos: int = 0 # The next position in __more. 

37 skip: int = -1 # The last element in __more we took from __source. 

38 try: # We always get to the StopIteration of __source. 

39 while True: # Until we reached the end of list and end of iter. 

40 ll: int = get_length() # Get length (may have changed). 

41 while pos < ll: # First, return all elements from __more. 

42 if pos != skip: # Skip = the element we returned last. 

43 yield get(pos) # Yield the __more list element. 

44 pos += 1 # Increment position. 

45 skip = get_length() # Get the length: May have changed! 

46 if skip > ll: # Maybe something changed since yield. 

47 skip = -1 # Invalidate skip, we instead return from list. 

48 continue 

49 other = nexter() # Get next actual iteration element. 

50 append(other) # Store the element in the internal list. 

51 yield other # Yield the element 

52 except StopIteration: 

53 pass 

54 

55 

56def reiterable(source: Iterable[T] | Iterator[T]) -> Iterable[T]: 

57 """ 

58 Ensure that an :class:`Iterable` can be iterated over multiple times. 

59 

60 This function will solidify an :class:`Iterator` into an 

61 :class:`Iterable`. In Python, :class:`Iterator` is a sub-class of 

62 :class:`Iterable`. This means that if your function accepts instances of 

63 :class:`Iterable` as input, it may expect to be able to iterate over them 

64 multiple times. However, if an :class:`Iterator` is passed in, which also 

65 is an instance of :class:`Iterable` and thus fulfills the function's type 

66 requirement, this is not the case. A typical example of this would be if 

67 a :class:`Generator` is passed in. A :class:`Generator` is an instance of 

68 :class:`Iterator`, which, in turn, is an instance of :class:`Iterable`. 

69 However, you can iterate over a :class:`Generator` only once. 

70 

71 For such single-use objects, a new :class:`Iterable` wrapper is created. 

72 This wrapper will iterate over the original sequence, but cache all 

73 elements in an internal :class:`list`. When you iterate over the sequence 

74 again, the elements in the :class:`list` will be used. This means that 

75 all elements of the original sequence will be stored in memory. However, 

76 they are only stored if/when they are actually accessed via the iteration 

77 sequence. If you do not iterate over them completely, they are not all 

78 stored. 

79 

80 This form of re-iterabling is useful if you maybe generate items from a 

81 slower sequence or do not plan to use all of them. If you want to use 

82 all elements several times anyway, it may be more efficient to just wrap 

83 the original source into a :class:`tuple`. But if, for example, your 

84 sequence is the result of iterating over a directory tree on the file 

85 system, or maybe if it comes from loading a file, then using 

86 :func:`reiterable` could be useful. 

87 

88 This is also true if you actually process the generated sequence in some 

89 way that may fail or terminate early. Then, first loading all data into 

90 a :class:`tuple` may be annoying if your first processed element after 

91 that causes a failure or early termination. The bit of overhead of 

92 :func:`reiterable` may then well be worth your while. 

93 

94 Of course, this can only work if the :class:`Iterator` is not otherwise 

95 used after calling this function. If you extract elements from the 

96 :class:`Iterator` by yourself otherwise, maybe via :func:`next`, then 

97 :func:`reiterable` cannot work. However, if you only apply :func:`next` 

98 or other looping paradigms to the :class:`Iterable` returned by 

99 :func:`reiterable`, then you can iterate as often as you want over a 

100 :class:`Generator`, for example. 

101 

102 :param source: the data source 

103 :returns: the resulting re-iterable iterator 

104 :raises TypeError: if `source` is neither an :class:`Iterable` nor an 

105 :class:`Iterator`. 

106 

107 >>> ri = reiterable(iter(range(4))) 

108 >>> for i in ri: 

109 ... print(i) 

110 0 

111 1 

112 2 

113 3 

114 >>> for i in ri: 

115 ... print(i) 

116 0 

117 1 

118 2 

119 3 

120 

121 >>> g = (i ** 2 for i in range(5)) 

122 >>> r = reiterable(g) 

123 >>> tuple(r) 

124 (0, 1, 4, 9, 16) 

125 >>> tuple(r) 

126 (0, 1, 4, 9, 16) 

127 >>> tuple(r) 

128 (0, 1, 4, 9, 16) 

129 >>> tuple(r) 

130 (0, 1, 4, 9, 16) 

131 >>> tuple(g) 

132 () 

133 

134 >>> g = (i ** 2 for i in range(5)) 

135 >>> r = reiterable(g) 

136 >>> i1 = iter(r) 

137 >>> i2 = iter(r) 

138 >>> next(i1) 

139 0 

140 >>> next(i2) 

141 0 

142 >>> next(i2) 

143 1 

144 >>> next(i1) 

145 1 

146 >>> next(i1) 

147 4 

148 >>> next(i1) 

149 9 

150 >>> next(i2) 

151 4 

152 >>> next(i2) 

153 9 

154 >>> i3 = iter(r) 

155 >>> next(i3) 

156 0 

157 >>> next(i3) 

158 1 

159 >>> next(i3) 

160 4 

161 >>> next(i3) 

162 9 

163 >>> next(i3) 

164 16 

165 >>> next(i2) 

166 16 

167 >>> try: 

168 ... next(i2) 

169 ... except StopIteration as si: 

170 ... print(type(si)) 

171 <class 'StopIteration'> 

172 >>> try: 

173 ... next(i3) 

174 ... except StopIteration as si: 

175 ... print(type(si)) 

176 <class 'StopIteration'> 

177 >>> next(i1) 

178 16 

179 >>> try: 

180 ... next(i1) 

181 ... except StopIteration as si: 

182 ... print(type(si)) 

183 <class 'StopIteration'> 

184 

185 >>> a = [1, 2, 3] 

186 >>> reiterable(a) is a 

187 True 

188 

189 >>> a = (1, 2, 3) 

190 >>> reiterable(a) is a 

191 True 

192 

193 >>> a = {1, 2, 3} 

194 >>> reiterable(a) is a 

195 True 

196 

197 >>> a = {1: 1, 2: 2, 3: 3} 

198 >>> reiterable(a) is a 

199 True 

200 

201 >>> k = a.keys() 

202 >>> reiterable(k) is k 

203 True 

204 

205 >>> k = a.values() 

206 >>> reiterable(k) is k 

207 True 

208 

209 >>> tuple(reiterable((x for x in range(5)))) 

210 (0, 1, 2, 3, 4) 

211 

212 >>> try: 

213 ... reiterable(None) 

214 ... except TypeError as te: 

215 ... print(str(te)[:60]) 

216 source should be an instance of any in {typing.Iterable, typ 

217 

218 >>> try: 

219 ... reiterable(1) 

220 ... except TypeError as te: 

221 ... print(str(te)[:60]) 

222 source should be an instance of any in {typing.Iterable, typ 

223 

224 >>> type(merge_sorted_and_return_unique([1, 2, 3,], [2, 2])) 

225 <class 'generator'> 

226 

227 >>> type(reiterable(merge_sorted_and_return_unique([1, 2, 3,], [2, 2]))) 

228 <class 'pycommons.ds.sequences.__Reiterator'> 

229 

230 >>> def myfunc(): 

231 ... if hasattr(myfunc, "a"): 

232 ... s = getattr(myfunc, "a") 

233 ... else: 

234 ... s = 0 

235 ... for i in range(s, s + 3): 

236 ... yield i 

237 ... setattr(myfunc, "a", s + 3) 

238 >>> ri = reiterable(myfunc()) 

239 >>> tuple(ri) 

240 (0, 1, 2) 

241 >>> tuple(ri) 

242 (0, 1, 2) 

243 >>> ri = reiterable(myfunc()) 

244 >>> tuple(ri) 

245 (3, 4, 5) 

246 >>> tuple(ri) 

247 (3, 4, 5) 

248 """ 

249 if isinstance(source, Iterator): 

250 return __Reiterator(source) # solidify iterators into tuples 

251 if not isinstance(source, Iterable): 

252 raise type_error(source, "source", (Iterable, Iterator)) 

253 return source # other iterables can be returned as-is 

254 

255 

256def merge_sorted_and_return_unique( 

257 *seqs: Iterable[T]) -> Generator[T, None, None]: 

258 """ 

259 Merge sorted sequences of integers and return only unique values. 

260 

261 You can provide multiple sequences, all of which must be sorted. 

262 This function then merges them into a single sorted sequence which 

263 contains each elemenet at most once. 

264 A typical use case would be to combine the result of 

265 :func:`pycommons.math.primes.primes` with some pre-defined values into 

266 a sorted sequence. 

267 

268 Notice that the elements of the sequence must support the less-than 

269 operator, i.e., have a `__lt__` dunder method. Otherwise this function 

270 will crash. 

271 

272 The returned sequence is guaranteed to provide strictly increasing values. 

273 

274 :param seqs: the sequences, i.e., some instances of :class:`Iterable` or 

275 :class:`Iterator` 

276 :returns: a merged sequence of integers 

277 :raises TypeError: if any of the provided iterators or any of their 

278 elements is `None`, or if any of the elements in `seqs`is not an 

279 :class:`Iterable`. 

280 

281 >>> list(merge_sorted_and_return_unique([1, 2, 3,], [2, 2])) 

282 [1, 2, 3] 

283 

284 >>> from pycommons.math.primes import primes 

285 >>> list(merge_sorted_and_return_unique(primes(14), [1, 10])) 

286 [1, 2, 3, 5, 7, 10, 11, 13] 

287 

288 >>> list(merge_sorted_and_return_unique( 

289 ... primes(14), primes(17), [1, 2, 10, 100])) 

290 [1, 2, 3, 5, 7, 10, 11, 13, 17, 100] 

291 

292 >>> try: 

293 ... for _ in merge_sorted_and_return_unique(1): 

294 ... pass 

295 ... except TypeError as te: 

296 ... print(te) 

297 'int' object is not iterable 

298 

299 >>> try: 

300 ... for j in merge_sorted_and_return_unique([3], 1): 

301 ... print(j) 

302 ... except TypeError as te: 

303 ... print(te) 

304 'int' object is not iterable 

305 

306 >>> try: 

307 ... for j in merge_sorted_and_return_unique([None], [None]): 

308 ... print(j) 

309 ... except TypeError as te: 

310 ... print(te) 

311 Element must not be None. 

312 

313 >>> try: 

314 ... for j in merge_sorted_and_return_unique([None], [1]): 

315 ... print(j) 

316 ... except TypeError as te: 

317 ... print(te) 

318 '<' not supported between instances of 'NoneType' and 'int' 

319 

320 >>> try: 

321 ... for j in merge_sorted_and_return_unique(None, [1]): 

322 ... print(j) 

323 ... except TypeError as te: 

324 ... print(te) 

325 'NoneType' object is not iterable 

326 

327 >>> try: 

328 ... for j in merge_sorted_and_return_unique([print, len], [repr]): 

329 ... print(j) 

330 ... except TypeError as te: 

331 ... print(te) 

332 '<' not supported between instances of 'builtin_function_or_method' \ 

333and 'builtin_function_or_method' 

334 """ 

335 last: T | None = None 

336 for item in merge(*seqs): 

337 if item is None: 

338 raise TypeError("Element must not be None.") 

339 if (last is None) or (last < item): # type: ignore # noqa 

340 yield item 

341 last = item