Coverage for pycommons / ds / sequences.py: 100%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 03:04 +0000

1"""Tools for working with sequences.""" 

2 

3from heapq import merge 

4from typing import Callable, Final, Generator, Iterable, Iterator, TypeVar 

5 

6from pycommons.types import type_error 

7 

8#: the type of the element of the sequences to process 

9T = TypeVar("T") 

10 

11 

12class __Reiterator(Iterable[T]): 

13 """The internal class for re-iteration.""" 

14 

15 def __init__(self, source: Iterator[T]) -> None: 

16 """ 

17 Create the re-iterator. 

18 

19 :param source: the source 

20 """ 

21 #: the original source 

22 self.__source: Final[Iterator[T]] = source 

23 #: we store all elements from source for re-iteration 

24 self.__more: Final[list[T]] = [] 

25 

26 def __iter__(self) -> Generator[T, None, None]: 

27 """ 

28 Generate the sequence of elements. 

29 

30 :returns: the generator 

31 """ 

32 get_length: Callable[[], int] = self.__more.__len__ 

33 get: Callable[[int], T] = self.__more.__getitem__ 

34 append: Final[Callable[[T], None]] = self.__more.append 

35 nexter: Final[Callable[[], T]] = self.__source.__next__ 

36 pos: int = 0 # The next position in __more. 

37 skip: int = -1 # The last element in __more we took from __source. 

38 try: # We always get to the StopITeration of __source. 

39 while True: # Until we reached the end of list and end of iter. 

40 ll: int = get_length() # Get length (may have changed). 

41 while pos < ll: # First, return all elements from __more. 

42 if pos != skip: # Skip = the element we returned last. 

43 yield get(pos) # Yield the __more list element. 

44 pos += 1 # Increment position. 

45 skip = get_length() # Get the length: May have changed! 

46 if skip > ll: # Maybe something changed since yield. 

47 skip = -1 # Invalidate skip, we instead return from list. 

48 continue 

49 other = nexter() # Get next actual iteration element. 

50 append(other) # Store the element in the internal list. 

51 yield other # Yield the element 

52 except StopIteration: 

53 pass 

54 

55 

56def reiterable(source: Iterable[T] | Iterator[T]) -> Iterable[T]: 

57 """ 

58 Ensure that an :class:`Iterable` can be iterated over multiple times. 

59 

60 This function will solidify an :class:`Iterator` into an 

61 :class:`Iterable`. In Python, :class:`Iterator` is a sub-class of 

62 :class:`Iterable`. This means that if your function accepts instances of 

63 :class:`Iterable` as input, it may expect to be able to iterate over them 

64 multiple times. However, if an :class:`Iterator` is passed in, which also 

65 is an instance of :class:`Iterable` and thus fulfills the function's type 

66 requirement, this is not the case. A typical example of this would be if 

67 a :class:`Generator` is passed in. A :class:`Generator` is an instance of 

68 :class:`Iterator`, which, in turn, is an instance of :class:`Iterable`. 

69 However, you can iterate over a :class:`Generator` only once. 

70 

71 For such single-use objects, a new :class:`Iterable` wrapper is created. 

72 This wrapper will iterate over the original sequence, but cache all 

73 elements in an internal :class:`list`. When you iterate over the sequence 

74 again, the elements in the :class:`list` will be used. This means that 

75 all elements of the original sequence will be stored in memory. However, 

76 they are only stored if/when they are actually accessed via the iteration 

77 sequence. If you do not iterate over them completely, they are not all 

78 stored. 

79 

80 This form of re-iterabling is useful if you maybe generate items from a 

81 slower sequence or do not plan to use all of them. If you want to use 

82 all elements several times anyway, it may be more efficient to just wrap 

83 the original source into a :class:`tuple`. But if, for example, your 

84 sequence is the result of iterating over a directory tree on the file 

85 system, or maybe if it comes from loading a file, then using 

86 :func:`reiterable` could be useful. 

87 

88 This is also true if you actually process the generated sequence in some 

89 way that may fail or terminate early. Then, first loading all data into 

90 a :class:`tuple` may be annoying if your first processed element after 

91 that causes a failure or early termination. The bit of overhead of 

92 :func:`reiterable` may then well be worth your while. 

93 

94 Of course, this can only work if the :class:`Iterator` is not otherwise 

95 used after calling this function. If you extract elements from the 

96 :class:`Iterator` by yourself otherwise, maybe via :func:`next`, then 

97 :func:`reiterable` cannot work. However, if you only apply :func:`next` 

98 or other looping paradigms to the :class:`Iterable` returned by 

99 :func:`reiterable`, then you can iterate as often as you want over a 

100 :class:`Generator`, for example. 

101 

102 :param source: the data source 

103 :returns: the resulting re-iterable iterator 

104 :raises TypeError: if `source` is neither an :class:`Iterable` nor an 

105 :class:`Iterator`. 

106 

107 >>> g = (i ** 2 for i in range(5)) 

108 >>> r = reiterable(g) 

109 >>> tuple(r) 

110 (0, 1, 4, 9, 16) 

111 >>> tuple(r) 

112 (0, 1, 4, 9, 16) 

113 >>> tuple(r) 

114 (0, 1, 4, 9, 16) 

115 >>> tuple(r) 

116 (0, 1, 4, 9, 16) 

117 >>> tuple(g) 

118 () 

119 

120 >>> g = (i ** 2 for i in range(5)) 

121 >>> r = reiterable(g) 

122 >>> i1 = iter(r) 

123 >>> i2 = iter(r) 

124 >>> next(i1) 

125 0 

126 >>> next(i2) 

127 0 

128 >>> next(i2) 

129 1 

130 >>> next(i1) 

131 1 

132 >>> next(i1) 

133 4 

134 >>> next(i1) 

135 9 

136 >>> next(i2) 

137 4 

138 >>> next(i2) 

139 9 

140 >>> i3 = iter(r) 

141 >>> next(i3) 

142 0 

143 >>> next(i3) 

144 1 

145 >>> next(i3) 

146 4 

147 >>> next(i3) 

148 9 

149 >>> next(i3) 

150 16 

151 >>> next(i2) 

152 16 

153 >>> try: 

154 ... next(i2) 

155 ... except StopIteration as si: 

156 ... print(type(si)) 

157 <class 'StopIteration'> 

158 >>> try: 

159 ... next(i3) 

160 ... except StopIteration as si: 

161 ... print(type(si)) 

162 <class 'StopIteration'> 

163 >>> next(i1) 

164 16 

165 >>> try: 

166 ... next(i1) 

167 ... except StopIteration as si: 

168 ... print(type(si)) 

169 <class 'StopIteration'> 

170 

171 >>> a = [1, 2, 3] 

172 >>> reiterable(a) is a 

173 True 

174 

175 >>> a = (1, 2, 3) 

176 >>> reiterable(a) is a 

177 True 

178 

179 >>> a = {1, 2, 3} 

180 >>> reiterable(a) is a 

181 True 

182 

183 >>> a = {1: 1, 2: 2, 3: 3} 

184 >>> reiterable(a) is a 

185 True 

186 

187 >>> k = a.keys() 

188 >>> reiterable(k) is k 

189 True 

190 

191 >>> k = a.values() 

192 >>> reiterable(k) is k 

193 True 

194 

195 >>> tuple(reiterable((x for x in range(5)))) 

196 (0, 1, 2, 3, 4) 

197 

198 >>> try: 

199 ... reiterable(None) 

200 ... except TypeError as te: 

201 ... print(str(te)[:60]) 

202 source should be an instance of any in {typing.Iterable, typ 

203 

204 >>> try: 

205 ... reiterable(1) 

206 ... except TypeError as te: 

207 ... print(str(te)[:60]) 

208 source should be an instance of any in {typing.Iterable, typ 

209 

210 >>> type(merge_sorted_and_return_unique([1, 2, 3,], [2, 2])) 

211 <class 'generator'> 

212 

213 >>> type(reiterable(merge_sorted_and_return_unique([1, 2, 3,], [2, 2]))) 

214 <class 'pycommons.ds.sequences.__Reiterator'> 

215 """ 

216 if isinstance(source, Iterator): 

217 return __Reiterator(source) # solidify iterators into tuples 

218 if not isinstance(source, Iterable): 

219 raise type_error(source, "source", (Iterable, Iterator)) 

220 return source # iterables can be returned as-is 

221 

222 

223def merge_sorted_and_return_unique( 

224 *seqs: Iterable[T]) -> Generator[T, None, None]: 

225 """ 

226 Merge sorted sequences of integers and return only unique values. 

227 

228 You can provide multiple sequences, all of which must be sorted. 

229 This function then merges them into a single sorted sequence which 

230 contains each elemenet at most once. 

231 A typical use case would be to combine the result of 

232 :func:`pycommons.math.primes.primes` with some pre-defined values into 

233 a sorted sequence. 

234 

235 Notice that the elements of the sequence must support the less-than 

236 operator, i.e., have a `__lt__` dunder method. Otherwise this function 

237 will crash. 

238 

239 The returned sequence is guaranteed to provide strictly increasing values. 

240 

241 :param seqs: the sequences, i.e., some instances of :class:`Iterable` or 

242 :class:`Iterator` 

243 :returns: a merged sequence of integers 

244 :raises TypeError: if any of the provided iterators or any of their 

245 elements is `None`, or if any of the elements in `seqs`is not an 

246 :class:`Iterable`. 

247 

248 >>> list(merge_sorted_and_return_unique([1, 2, 3,], [2, 2])) 

249 [1, 2, 3] 

250 

251 >>> from pycommons.math.primes import primes 

252 >>> list(merge_sorted_and_return_unique(primes(14), [1, 10])) 

253 [1, 2, 3, 5, 7, 10, 11, 13] 

254 

255 >>> list(merge_sorted_and_return_unique( 

256 ... primes(14), primes(17), [1, 2, 10, 100])) 

257 [1, 2, 3, 5, 7, 10, 11, 13, 17, 100] 

258 

259 >>> try: 

260 ... for _ in merge_sorted_and_return_unique(1): 

261 ... pass 

262 ... except TypeError as te: 

263 ... print(te) 

264 'int' object is not iterable 

265 

266 >>> try: 

267 ... for j in merge_sorted_and_return_unique([3], 1): 

268 ... print(j) 

269 ... except TypeError as te: 

270 ... print(te) 

271 'int' object is not iterable 

272 

273 >>> try: 

274 ... for j in merge_sorted_and_return_unique([None], [None]): 

275 ... print(j) 

276 ... except TypeError as te: 

277 ... print(te) 

278 Element must not be None. 

279 

280 >>> try: 

281 ... for j in merge_sorted_and_return_unique([None], [1]): 

282 ... print(j) 

283 ... except TypeError as te: 

284 ... print(te) 

285 '<' not supported between instances of 'NoneType' and 'int' 

286 

287 >>> try: 

288 ... for j in merge_sorted_and_return_unique(None, [1]): 

289 ... print(j) 

290 ... except TypeError as te: 

291 ... print(te) 

292 'NoneType' object is not iterable 

293 

294 >>> try: 

295 ... for j in merge_sorted_and_return_unique([print, len], [repr]): 

296 ... print(j) 

297 ... except TypeError as te: 

298 ... print(te) 

299 '<' not supported between instances of 'builtin_function_or_method' \ 

300and 'builtin_function_or_method' 

301 """ 

302 last: T | None = None 

303 for item in merge(*seqs): 

304 if item is None: 

305 raise TypeError("Element must not be None.") 

306 if (last is None) or (last < item): # type: ignore # noqa 

307 yield item 

308 last = item