Coverage for pycommons / math / streams.py: 100%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 03:04 +0000

1""" 

2Tools for numerical data aggregation over streams. 

3 

4This module provides tools that can be used to aggregate data over streams of 

5numerical data. 

6Such tools should extend the class 

7:class:`~pycommons.math.streams.StreamAggregate`, which provides the tools to 

8:meth:`~pycommons.math.streams.StreamAggregate.add` data to the aggregation 

9procedure as well as to include whole sequence of data (via 

10:meth:`~pycommons.math.streams.StreamAggregate.update`) or to 

11:meth:`~pycommons.math.streams.StreamAggregate.reset` the computation. 

12It is recommended that subclasses should implement a method `result` 

13returning the current result. 

14 

15The class :class:`~pycommons.math.streams.StreamSum` is such a subclass of 

16:class:`~pycommons.math.streams.StreamAggregate`. It provides a running sum of 

17values over a stream of data, using a Kahan summation algorithm of the second 

18order. Its method 

19:meth:`~pycommons.math.streams.StreamSum.result` returns the current running 

20sum value. 

21""" 

22 

23from math import isfinite 

24from typing import Callable, Final, Iterable 

25 

26from pycommons.math.int_math import __DBL_INT_LIMIT_N_F as _DILNF 

27from pycommons.math.int_math import __DBL_INT_LIMIT_P_F as _DILPF 

28from pycommons.math.int_math import try_int_add 

29from pycommons.types import type_error 

30 

31 

32class StreamAggregate: 

33 """ 

34 The base class for stream aggregates. 

35 

36 This class provides a basic API for stream data aggregation. 

37 It is implemented by :class:`StreamSum`. 

38 """ 

39 

40 def reset(self) -> None: 

41 """ 

42 Reset this stream aggregate. 

43 

44 :raises NotImplementedError: because it is an abstract method 

45 

46 >>> ag = StreamAggregate() 

47 >>> try: 

48 ... ag.reset() 

49 ... except NotImplementedError: 

50 ... print("not implemented") 

51 not implemented 

52 """ 

53 raise NotImplementedError 

54 

55 def add(self, value: int | float) -> None: 

56 """ 

57 Add a value to the aggregate. 

58 

59 :param value: the value to aggregate 

60 

61 :raises NotImplementedError: because it is an abstract method 

62 

63 >>> ag = StreamAggregate() 

64 >>> try: 

65 ... ag.add(1) 

66 ... except NotImplementedError: 

67 ... print("not implemented") 

68 not implemented 

69 """ 

70 raise NotImplementedError 

71 

72 def update(self, data: Iterable[int | float | None]) -> None: 

73 """ 

74 Perform a stream update. 

75 

76 This function adds all the data to the stream while skipping `None` 

77 values. 

78 

79 :param data: the data, i.e., a stream of values. `None` values are 

80 skipped 

81 

82 >>> ag = StreamAggregate() 

83 >>> ag.add = lambda d: print(str(d)) 

84 >>> ag.update((1, 2, 3)) 

85 1 

86 2 

87 3 

88 >>> ag.update((1, None, 3)) 

89 1 

90 3 

91 """ 

92 ad: Final[Callable[[int | float], None]] = self.add # fast calls! 

93 for v in data: 

94 if v is not None: 

95 ad(v) 

96 

97 

98class StreamSum(StreamAggregate): 

99 """ 

100 The second-order Kahan-Babuška-Neumaier-Summation by Klein. 

101 

102 [1] A. Klein. A Generalized Kahan-Babuška-Summation-Algorithm. 

103 Computing 76:279-293. 2006. doi:10.1007/s00607-005-0139-x 

104 

105 >>> stream_sum = StreamSum() 

106 >>> stream_sum.update([1e18, 1, 1e36, -1e36, -1e18]) 

107 >>> stream_sum.result() 

108 1 

109 >>> stream_sum.reset() 

110 >>> stream_sum.update([1e18, 1, 1e36, -1e36, -1e18]) 

111 >>> stream_sum.result() 

112 1 

113 >>> stream_sum.reset() 

114 >>> stream_sum.update([1e18, 1, 1e36, None, -1e36, -1e18]) 

115 >>> stream_sum.result() 

116 1 

117 """ 

118 

119 def __init__(self) -> None: 

120 """Create the summation object.""" 

121 #: the running integer sum 

122 self.__i_sum: int = 0 

123 #: the running sum, an internal variable invisible from outside 

124 self.__sum: float | int = 0 

125 #: the first correction term, another internal variable 

126 self.__cs: float | int = 0 

127 #: the second correction term, another internal variable 

128 self.__ccs: float | int = 0 

129 #: did we record a value? 

130 self.__has_value: bool = False 

131 

132 def reset(self) -> None: 

133 """Reset this sum.""" 

134 self.__sum = 0 

135 self.__cs = 0 

136 self.__ccs = 0 

137 self.__i_sum = 0 

138 self.__has_value = False 

139 

140 def add(self, value: int | float) -> None: 

141 """ 

142 Add a value to the sum. 

143 

144 :param value: the value to add 

145 

146 >>> ss = StreamSum() 

147 >>> print(ss.result()) 

148 None 

149 >>> ss.add(1) 

150 >>> ss.result() 

151 1 

152 >>> ss.add(2.0) 

153 >>> ss.result() 

154 3 

155 >>> ss.add(1e43) 

156 >>> ss.result() 

157 1e+43 

158 >>> ss.add(-1e43) 

159 >>> ss.result() 

160 3 

161 >>> ss.reset() 

162 >>> print(ss.result()) 

163 None 

164 

165 >>> try: 

166 ... ss.add("x") 

167 ... except TypeError as te: 

168 ... print(te) 

169 value should be an instance of any in {float, int} but is str, \ 

170namely 'x'. 

171 

172 >>> from math import inf 

173 >>> try: 

174 ... ss.add(inf) 

175 ... except ValueError as ve: 

176 ... print(ve) 

177 Value must be finite, but is inf. 

178 """ 

179 self.__has_value = True 

180 if isinstance(value, int): 

181 self.__i_sum += value 

182 return 

183 if not isinstance(value, float): 

184 raise type_error(value, "value", (float, int)) 

185 if not isfinite(value): 

186 raise ValueError(f"Value must be finite, but is {value}.") 

187 if _DILNF <= value <= _DILPF: 

188 iv: Final[int] = int(value) 

189 if value == iv: 

190 self.__i_sum += iv 

191 return 

192 s: int | float = self.__sum # Get the current running sum. 

193 t: int | float = s + value # Compute the new sum value. 

194 c: int | float = (((s - t) + value) if abs(s) >= abs(value) 

195 else ((value - t) + s)) # The Neumaier tweak. 

196 self.__sum = t # Store the new sum value. 

197 cs: int | float = self.__cs # the current 1st-order correction 

198 t = cs + c # Compute the new first-order correction term. 

199 cc: int | float = (((cs - t) + c) if abs(cs) >= abs(c) 

200 else ((c - t) + cs)) # 2nd Neumaier tweak. 

201 self.__cs = t # Store the updated first-order correction term. 

202 self.__ccs += cc # Update the second-order correction. 

203 

204 def add_sum(self, ss: "StreamSum") -> None: 

205 """ 

206 Add another stream sum to this one. 

207 

208 :param ss: the other stream sum 

209 

210 >>> ss1 = StreamSum() 

211 >>> ss1.update((1, 1e20)) 

212 >>> ss2 = StreamSum() 

213 >>> ss2.update((5, -1e20)) 

214 >>> ss1.add_sum(ss2) 

215 >>> ss1.result() 

216 6 

217 

218 >>> try: 

219 ... ss1.add_sum("x") 

220 ... except TypeError as te: 

221 ... print(str(te)[:31]) 

222 other sum should be an instance 

223 """ 

224 if not isinstance(ss, StreamSum): 

225 raise type_error(ss, "other sum", StreamSum) 

226 if ss.__has_value: 

227 self.add(ss.__i_sum) 

228 self.add(ss.__sum) 

229 self.add(ss.__cs) 

230 self.add(ss.__ccs) 

231 

232 def result(self) -> int | float | None: 

233 """ 

234 Get the current result of the summation. 

235 

236 :return: the current result of the summation, or `None` if no value 

237 was added yet 

238 """ 

239 return try_int_add( 

240 self.__i_sum, self.__sum + self.__cs + self.__ccs) \ 

241 if self.__has_value else None