Coverage for jstark / features / feature.py: 100%

154 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-30 09:29 +0000

1"""Feature abstract base class 

2 

3All feature classes are derived from here 

4""" 

5 

6from abc import ABCMeta, abstractmethod 

7from datetime import date, timedelta, datetime 

8from typing import Callable 

9from pyspark.sql import Column 

10import pyspark.sql.functions as f 

11import pendulum 

12 

13from jstark.feature_period import FeaturePeriod, PeriodUnitOfMeasure 

14from jstark.features.first_and_last_date_of_period import FirstAndLastDateOfPeriod 

15from jstark.exceptions import AsAtIsNotADate 

16 

17 

18class Feature(metaclass=ABCMeta): 

19 def __init__( 

20 self, 

21 as_at: date, 

22 feature_period: FeaturePeriod, 

23 first_day_of_week: str | None = None, 

24 use_absolute_periods: bool = False, 

25 ) -> None: 

26 self.feature_period = feature_period 

27 if isinstance(as_at, datetime): 

28 import warnings 

29 

30 warnings.warn(f"as_at={as_at!r} was converted to a date") 

31 as_at = as_at.date() 

32 if not isinstance(as_at, date): 

33 raise AsAtIsNotADate 

34 self._as_at = as_at 

35 self._first_day_of_week = first_day_of_week 

36 self._use_absolute_periods = use_absolute_periods 

37 

38 @property 

39 def feature_period(self) -> FeaturePeriod: 

40 return self._feature_period 

41 

42 @feature_period.setter 

43 def feature_period(self, value) -> None: 

44 self._feature_period = value 

45 

46 @property 

47 def as_at(self) -> date: 

48 return self._as_at 

49 

50 @property 

51 def feature_name(self) -> str: 

52 suffix = ( 

53 self.column_metadata["period-absolute"] 

54 if self._use_absolute_periods 

55 else self.feature_period.mnemonic 

56 ) 

57 return f"{type(self).__name__}_{suffix}" 

58 

59 @property 

60 @abstractmethod 

61 def column(self) -> Column: 

62 """Complete definition of the column returned by this feature, 

63 replete with feature period filtering, metadata, default value 

64 and alias""" 

65 

66 @property 

67 @abstractmethod 

68 def description_subject(self) -> str: 

69 """Desciption of the feature that will be concatenated 

70 with an explanation of the feature period. 

71 """ 

72 

73 @property 

74 def commentary(self) -> str: 

75 return "No commentary supplied" 

76 

77 @abstractmethod 

78 def default_value(self) -> Column: 

79 """Default value of the feature, typically used when zero rows match 

80 the feature's feature_period 

81 """ 

82 

83 @abstractmethod 

84 def column_expression(self) -> Column: 

85 """The expression that defines the feature""" 

86 

87 @property 

88 def start_date(self) -> date: 

89 p_as_at = pendulum.date(self.as_at.year, self.as_at.month, self.as_at.day) 

90 n_days_ago = self.as_at - timedelta(days=self.feature_period.start) 

91 n_weeks_ago = self.as_at - timedelta(weeks=self.feature_period.start) 

92 n_months_ago = p_as_at.subtract(months=self.feature_period.start) 

93 n_quarters_ago = p_as_at.subtract(months=self.feature_period.start * 3) 

94 n_years_ago = p_as_at.subtract(years=self.feature_period.start) 

95 match self.feature_period.period_unit_of_measure: 

96 case PeriodUnitOfMeasure.DAY: 

97 return n_days_ago 

98 case PeriodUnitOfMeasure.WEEK: 

99 return FirstAndLastDateOfPeriod( 

100 n_weeks_ago, self._first_day_of_week 

101 ).first_date_in_week 

102 case PeriodUnitOfMeasure.MONTH: 

103 return FirstAndLastDateOfPeriod(n_months_ago).first_date_in_month 

104 case PeriodUnitOfMeasure.QUARTER: 

105 return FirstAndLastDateOfPeriod(n_quarters_ago).first_date_in_quarter 

106 case _: # PeriodUnitOfMeasure.YEAR: 

107 return FirstAndLastDateOfPeriod(n_years_ago).first_date_in_year 

108 

109 @property 

110 def end_date(self) -> date: 

111 p_as_at = pendulum.date(self.as_at.year, self.as_at.month, self.as_at.day) 

112 n_days_ago = self.as_at - timedelta(days=self.feature_period.end) 

113 n_weeks_ago = self.as_at - timedelta(weeks=self.feature_period.end) 

114 n_months_ago = p_as_at.subtract(months=self.feature_period.end) 

115 n_quarters_ago = p_as_at.subtract(months=self.feature_period.end * 3) 

116 n_years_ago = p_as_at.subtract(years=self.feature_period.end) 

117 match self.feature_period.period_unit_of_measure: 

118 case PeriodUnitOfMeasure.DAY: 

119 last_day_of_period = n_days_ago 

120 case PeriodUnitOfMeasure.WEEK: 

121 last_day_of_period = FirstAndLastDateOfPeriod( 

122 n_weeks_ago, self._first_day_of_week 

123 ).last_date_in_week 

124 case PeriodUnitOfMeasure.MONTH: 

125 last_day_of_period = FirstAndLastDateOfPeriod( 

126 n_months_ago 

127 ).last_date_in_month 

128 case PeriodUnitOfMeasure.QUARTER: 

129 last_day_of_period = FirstAndLastDateOfPeriod( 

130 n_quarters_ago 

131 ).last_date_in_quarter 

132 case _: # PeriodUnitOfMeasure.YEAR: 

133 last_day_of_period = FirstAndLastDateOfPeriod( 

134 n_years_ago 

135 ).last_date_in_year 

136 # min() is used to ensure we don't return a date later than self.as_at 

137 return min(last_day_of_period, self.as_at) 

138 

139 @property 

140 def column_metadata(self) -> dict[str, str]: 

141 period_absolute_start_period: str = "" 

142 period_absolute_end_period: str = "" 

143 match self.feature_period.period_unit_of_measure: 

144 case PeriodUnitOfMeasure.DAY: 

145 period_absolute_start_period = ( 

146 pendulum.instance(self.as_at) 

147 .subtract(days=self.feature_period.start) 

148 .format("YYYYMMDD") 

149 ) 

150 period_absolute_end_period = ( 

151 pendulum.instance(self.as_at) 

152 .subtract(days=self.feature_period.end) 

153 .format("YYYYMMDD") 

154 ) 

155 case PeriodUnitOfMeasure.WEEK: 

156 period_absolute_start_period = self._week_label(self.start_date) 

157 end_week_start = FirstAndLastDateOfPeriod( 

158 pendulum.instance(self.as_at).subtract( 

159 weeks=self.feature_period.end 

160 ), 

161 self._first_day_of_week, 

162 ).first_date_in_week 

163 period_absolute_end_period = self._week_label(end_week_start) 

164 case PeriodUnitOfMeasure.MONTH: 

165 period_absolute_start_period = ( 

166 pendulum.instance(self.as_at) 

167 .subtract(months=self.feature_period.start) 

168 .format("YYYYMMM") 

169 ) 

170 period_absolute_end_period = ( 

171 pendulum.instance(self.as_at) 

172 .subtract(months=self.feature_period.end) 

173 .format("YYYYMMM") 

174 ) 

175 case PeriodUnitOfMeasure.QUARTER: 

176 dt_start = pendulum.instance(self.as_at).subtract( 

177 months=self.feature_period.start * 3 

178 ) 

179 period_absolute_start_period = f"{dt_start.year}Q{dt_start.quarter}" 

180 dt_end = pendulum.instance(self.as_at).subtract( 

181 months=self.feature_period.end * 3 

182 ) 

183 period_absolute_end_period = f"{dt_end.year}Q{dt_end.quarter}" 

184 case _: # PeriodUnitOfMeasure.YEAR: 

185 period_absolute_start_period = str( 

186 pendulum.instance(self.as_at) 

187 .subtract(years=self.feature_period.start) 

188 .year 

189 ) 

190 period_absolute_end_period = str( 

191 pendulum.instance(self.as_at) 

192 .subtract(years=self.feature_period.end) 

193 .year 

194 ) 

195 return { 

196 "created-with-love-by": "https://github.com/jamiekt/jstark", 

197 "start-date": self.start_date.strftime("%Y-%m-%d"), 

198 "end-date": self.end_date.strftime("%Y-%m-%d"), 

199 "description": ( 

200 f"{self.description_subject} between " 

201 + f"{self.start_date.strftime('%Y-%m-%d')} and " 

202 + f"{self.end_date.strftime('%Y-%m-%d')}" 

203 ), 

204 "generated-at": datetime.now().strftime("%Y-%m-%d"), 

205 "commentary": self.commentary, 

206 "name-stem": str(type(self).__name__), 

207 "period-absolute-start": period_absolute_start_period, 

208 "period-absolute-end": period_absolute_end_period, 

209 "period-absolute": period_absolute_start_period 

210 if period_absolute_start_period == period_absolute_end_period 

211 else f"{period_absolute_start_period}-{period_absolute_end_period}", 

212 } 

213 

214 def _week_label(self, week_start_date: date) -> str: 

215 """Convert the first day of a week to a label like 2026W13. 

216 

217 W01 of a year starts on the first occurrence of first_day_of_week 

218 on or before Jan 1 of that year. 

219 """ 

220 weekdays = [ 

221 "Monday", 

222 "Tuesday", 

223 "Wednesday", 

224 "Thursday", 

225 "Friday", 

226 "Saturday", 

227 "Sunday", 

228 ] 

229 first_day_of_week = self._first_day_of_week or "Monday" 

230 target_weekday = weekdays.index(first_day_of_week) 

231 year = week_start_date.year 

232 

233 jan1 = date(year, 1, 1) 

234 days_back = (jan1.weekday() - target_weekday) % 7 

235 w01_start = pendulum.instance(jan1).subtract(days=days_back) 

236 

237 jan1_next = date(year + 1, 1, 1) 

238 days_back_next = (jan1_next.weekday() - target_weekday) % 7 

239 w01_start_next = pendulum.instance(jan1_next).subtract(days=days_back_next) 

240 

241 if week_start_date >= w01_start_next: 

242 w01_start = w01_start_next 

243 year = year + 1 

244 

245 week_number = (week_start_date - w01_start).days // 7 + 1 

246 return f"{year}W{week_number:02d}" 

247 

248 def __repr__(self) -> str: 

249 return ( 

250 f"{self.__class__.__name__}" 

251 f"(as_at={self.as_at}" 

252 f", feature_period='{self.feature_period.mnemonic}'" 

253 f", first_day_of_week={self._first_day_of_week!r})" 

254 ) 

255 

256 

257class DerivedFeature(Feature, metaclass=ABCMeta): 

258 """A DerivedFeature is a feature that is calculated by combining 

259 data that has already been aggregated. For example, a derived 

260 feature called 'Average Gross Spend Per Basket' would be calculated 

261 by dividing the total GrossSpend by number of baskets (BasketCount) 

262 """ 

263 

264 @property 

265 def column(self) -> Column: 

266 return f.coalesce(self.column_expression(), self.default_value()).alias( 

267 self.feature_name, metadata=self.column_metadata 

268 ) 

269 

270 

271class BaseFeature(Feature, metaclass=ABCMeta): 

272 """A BaseFeature is a feature that is calculated by aggregating 

273 raw source data. That data may have been cleaned and transformed in 

274 some way, but typically the grain of that data is real occurrences 

275 of some activity. Examples of such data are lists of grocery 

276 transactions, phone calls or journeys. 

277 """ 

278 

279 def sum_aggregator(self, column: Column) -> Column: 

280 return f.sum(column) 

281 

282 def count_aggregator(self, column: Column) -> Column: 

283 return f.count(column) 

284 

285 def count_if_aggregator(self, column: Column) -> Column: 

286 return f.count_if(column) 

287 

288 def count_distinct_aggregator(self, column: Column) -> Column: 

289 return f.countDistinct(column) 

290 

291 def approx_count_distinct_aggregator(self, column: Column) -> Column: 

292 return f.approx_count_distinct(column) 

293 

294 def max_aggregator(self, column: Column) -> Column: 

295 return f.max(column) 

296 

297 def min_aggregator(self, column: Column) -> Column: 

298 return f.min(column) 

299 

300 def collect_set_aggregator(self, column: Column) -> Column: 

301 return f.collect_set(column) 

302 

303 @abstractmethod 

304 def aggregator(self) -> Callable[[Column], Column]: 

305 """Aggregator function""" 

306 

307 @property 

308 def column(self) -> Column: 

309 return f.coalesce( 

310 self.aggregator()( 

311 f.when( 

312 (f.to_date(f.col("Timestamp")) >= f.lit(self.start_date)) 

313 & (f.to_date(f.col("Timestamp")) <= f.lit(self.end_date)), 

314 self.column_expression(), 

315 ) 

316 ), 

317 self.default_value(), 

318 ).alias(self.feature_name, metadata=self.column_metadata)