Coverage for jstark / features / feature.py: 100%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-23 22:34 +0000

1"""Feature abstract base class 

2 

3All feature classes are derived from here 

4""" 

5 

6from abc import ABCMeta, abstractmethod 

7from datetime import date, timedelta, datetime 

8from typing import Callable 

9from dateutil.relativedelta import relativedelta 

10 

11 

12from pyspark.sql import Column 

13import pyspark.sql.functions as f 

14import pendulum 

15 

16from jstark.feature_period import FeaturePeriod, PeriodUnitOfMeasure 

17from jstark.features.first_and_last_date_of_period import FirstAndLastDateOfPeriod 

18from jstark.exceptions import AsAtIsNotADate 

19 

20 

21class Feature(metaclass=ABCMeta): 

22 def __init__( 

23 self, 

24 as_at: date, 

25 feature_period: FeaturePeriod, 

26 first_day_of_week: str | None = None, 

27 use_absolute_periods: bool = False, 

28 ) -> None: 

29 self.feature_period = feature_period 

30 if isinstance(as_at, datetime): 

31 import warnings 

32 

33 warnings.warn(f"as_at={as_at!r} was converted to a date") 

34 as_at = as_at.date() 

35 if not isinstance(as_at, date): 

36 raise AsAtIsNotADate 

37 self._as_at = as_at 

38 self._first_day_of_week = first_day_of_week 

39 self._use_absolute_periods = use_absolute_periods 

40 

41 @property 

42 def feature_period(self) -> FeaturePeriod: 

43 return self._feature_period 

44 

45 @feature_period.setter 

46 def feature_period(self, value) -> None: 

47 self._feature_period = value 

48 

49 @property 

50 def as_at(self) -> date: 

51 return self._as_at 

52 

53 @property 

54 def feature_name(self) -> str: 

55 suffix = ( 

56 self.column_metadata["period-absolute"] 

57 if self._use_absolute_periods 

58 else self.feature_period.mnemonic 

59 ) 

60 return f"{type(self).__name__}_{suffix}" 

61 

62 @property 

63 @abstractmethod 

64 def column(self) -> Column: 

65 """Complete definition of the column returned by this feature, 

66 replete with feature period filtering, metadata, default value 

67 and alias""" 

68 

69 @property 

70 @abstractmethod 

71 def description_subject(self) -> str: 

72 """Desciption of the feature that will be concatenated 

73 with an explanation of the feature period. 

74 """ 

75 

76 @property 

77 def commentary(self) -> str: 

78 return "No commentary supplied" 

79 

80 @abstractmethod 

81 def default_value(self) -> Column: 

82 """Default value of the feature, typically used when zero rows match 

83 the feature's feature_period 

84 """ 

85 

86 @abstractmethod 

87 def column_expression(self) -> Column: 

88 """The expression that defines the feature""" 

89 

90 @property 

91 def start_date(self) -> date: 

92 n_days_ago = self.as_at - timedelta(days=self.feature_period.start) 

93 n_weeks_ago = self.as_at - timedelta(weeks=self.feature_period.start) 

94 n_months_ago = self.as_at - relativedelta(months=self.feature_period.start) 

95 n_quarters_ago = self.as_at - relativedelta( 

96 months=self.feature_period.start * 3 

97 ) 

98 n_years_ago = self.as_at - relativedelta(years=self.feature_period.start) 

99 match self.feature_period.period_unit_of_measure: 

100 case PeriodUnitOfMeasure.DAY: 

101 return n_days_ago 

102 case PeriodUnitOfMeasure.WEEK: 

103 return FirstAndLastDateOfPeriod( 

104 n_weeks_ago, self._first_day_of_week 

105 ).first_date_in_week 

106 case PeriodUnitOfMeasure.MONTH: 

107 return FirstAndLastDateOfPeriod(n_months_ago).first_date_in_month 

108 case PeriodUnitOfMeasure.QUARTER: 

109 return FirstAndLastDateOfPeriod(n_quarters_ago).first_date_in_quarter 

110 case _: # PeriodUnitOfMeasure.YEAR: 

111 return FirstAndLastDateOfPeriod(n_years_ago).first_date_in_year 

112 

113 @property 

114 def end_date(self) -> date: 

115 n_days_ago = self.as_at - timedelta(days=self.feature_period.end) 

116 n_weeks_ago = self.as_at - timedelta(weeks=self.feature_period.end) 

117 n_months_ago = self.as_at - relativedelta(months=self.feature_period.end) 

118 n_quarters_ago = self.as_at - relativedelta(months=self.feature_period.end * 3) 

119 n_years_ago = self.as_at - relativedelta(years=self.feature_period.end) 

120 match self.feature_period.period_unit_of_measure: 

121 case PeriodUnitOfMeasure.DAY: 

122 last_day_of_period = n_days_ago 

123 case PeriodUnitOfMeasure.WEEK: 

124 last_day_of_period = FirstAndLastDateOfPeriod( 

125 n_weeks_ago, self._first_day_of_week 

126 ).last_date_in_week 

127 case PeriodUnitOfMeasure.MONTH: 

128 last_day_of_period = FirstAndLastDateOfPeriod( 

129 n_months_ago 

130 ).last_date_in_month 

131 case PeriodUnitOfMeasure.QUARTER: 

132 last_day_of_period = FirstAndLastDateOfPeriod( 

133 n_quarters_ago 

134 ).last_date_in_quarter 

135 case _: # PeriodUnitOfMeasure.YEAR: 

136 last_day_of_period = FirstAndLastDateOfPeriod( 

137 n_years_ago 

138 ).last_date_in_year 

139 # min() is used to ensure we don't return a date later than self.as_at 

140 return min(last_day_of_period, self.as_at) 

141 

142 @property 

143 def column_metadata(self) -> dict[str, str]: 

144 period_absolute_start_period: str = "" 

145 period_absolute_end_period: str = "" 

146 match self.feature_period.period_unit_of_measure: 

147 case PeriodUnitOfMeasure.DAY: 

148 period_absolute_start_period = ( 

149 pendulum.instance(self.as_at) 

150 .subtract(days=self.feature_period.start) 

151 .format("YYYYMMDD") 

152 ) 

153 period_absolute_end_period = ( 

154 pendulum.instance(self.as_at) 

155 .subtract(days=self.feature_period.end) 

156 .format("YYYYMMDD") 

157 ) 

158 case PeriodUnitOfMeasure.WEEK: 

159 period_absolute_start_period = self._week_label(self.start_date) 

160 end_week_start = FirstAndLastDateOfPeriod( 

161 pendulum.instance(self.as_at).subtract( 

162 weeks=self.feature_period.end 

163 ), 

164 self._first_day_of_week, 

165 ).first_date_in_week 

166 period_absolute_end_period = self._week_label(end_week_start) 

167 case PeriodUnitOfMeasure.MONTH: 

168 period_absolute_start_period = ( 

169 pendulum.instance(self.as_at) 

170 .subtract(months=self.feature_period.start) 

171 .format("YYYYMMM") 

172 ) 

173 period_absolute_end_period = ( 

174 pendulum.instance(self.as_at) 

175 .subtract(months=self.feature_period.end) 

176 .format("YYYYMMM") 

177 ) 

178 case PeriodUnitOfMeasure.QUARTER: 

179 dt_start = pendulum.instance(self.as_at).subtract( 

180 months=self.feature_period.start * 3 

181 ) 

182 period_absolute_start_period = f"{dt_start.year}Q{dt_start.quarter}" 

183 dt_end = pendulum.instance(self.as_at).subtract( 

184 months=self.feature_period.end * 3 

185 ) 

186 period_absolute_end_period = f"{dt_end.year}Q{dt_end.quarter}" 

187 case _: # PeriodUnitOfMeasure.YEAR: 

188 period_absolute_start_period = str( 

189 pendulum.instance(self.as_at) 

190 .subtract(years=self.feature_period.start) 

191 .year 

192 ) 

193 period_absolute_end_period = str( 

194 pendulum.instance(self.as_at) 

195 .subtract(years=self.feature_period.end) 

196 .year 

197 ) 

198 return { 

199 "created-with-love-by": "https://github.com/jamiekt/jstark", 

200 "start-date": self.start_date.strftime("%Y-%m-%d"), 

201 "end-date": self.end_date.strftime("%Y-%m-%d"), 

202 "description": ( 

203 f"{self.description_subject} between " 

204 + f"{self.start_date.strftime('%Y-%m-%d')} and " 

205 + f"{self.end_date.strftime('%Y-%m-%d')}" 

206 ), 

207 "generated-at": datetime.now().strftime("%Y-%m-%d"), 

208 "commentary": self.commentary, 

209 "name-stem": str(type(self).__name__), 

210 "period-absolute-start": period_absolute_start_period, 

211 "period-absolute-end": period_absolute_end_period, 

212 "period-absolute": period_absolute_start_period 

213 if period_absolute_start_period == period_absolute_end_period 

214 else f"{period_absolute_start_period}-{period_absolute_end_period}", 

215 } 

216 

217 def _week_label(self, week_start_date: date) -> str: 

218 """Convert the first day of a week to a label like 2026W13. 

219 

220 W01 of a year starts on the first occurrence of first_day_of_week 

221 on or before Jan 1 of that year. 

222 """ 

223 weekdays = [ 

224 "Monday", 

225 "Tuesday", 

226 "Wednesday", 

227 "Thursday", 

228 "Friday", 

229 "Saturday", 

230 "Sunday", 

231 ] 

232 first_day_of_week = self._first_day_of_week or "Monday" 

233 target_weekday = weekdays.index(first_day_of_week) 

234 year = week_start_date.year 

235 

236 jan1 = date(year, 1, 1) 

237 days_back = (jan1.weekday() - target_weekday) % 7 

238 w01_start = pendulum.instance(jan1).subtract(days=days_back) 

239 

240 jan1_next = date(year + 1, 1, 1) 

241 days_back_next = (jan1_next.weekday() - target_weekday) % 7 

242 w01_start_next = pendulum.instance(jan1_next).subtract(days=days_back_next) 

243 

244 if week_start_date >= w01_start_next: 

245 w01_start = w01_start_next 

246 year = year + 1 

247 

248 week_number = (week_start_date - w01_start).days // 7 + 1 

249 return f"{year}W{week_number:02d}" 

250 

251 def __repr__(self) -> str: 

252 return ( 

253 f"{self.__class__.__name__}" 

254 f"(as_at={self.as_at}" 

255 f", feature_period='{self.feature_period.mnemonic}'" 

256 f", first_day_of_week={self._first_day_of_week!r})" 

257 ) 

258 

259 

260class DerivedFeature(Feature, metaclass=ABCMeta): 

261 """A DerivedFeature is a feature that is calculated by combining 

262 data that has already been aggregated. For example, a derived 

263 feature called 'Average Gross Spend Per Basket' would be calculated 

264 by dividing the total GrossSpend by number of baskets (BasketCount) 

265 """ 

266 

267 @property 

268 def column(self) -> Column: 

269 return f.coalesce(self.column_expression(), self.default_value()).alias( 

270 self.feature_name, metadata=self.column_metadata 

271 ) 

272 

273 

274class BaseFeature(Feature, metaclass=ABCMeta): 

275 """A BaseFeature is a feature that is calculated by aggregating 

276 raw source data. That data may have been cleaned and transformed in 

277 some way, but typically the grain of that data is real occurrences 

278 of some activity. Examples of such data are lists of grocery 

279 transactions, phone calls or journeys. 

280 """ 

281 

282 def sum_aggregator(self, column: Column) -> Column: 

283 return f.sum(column) 

284 

285 def count_aggregator(self, column: Column) -> Column: 

286 return f.count(column) 

287 

288 def count_if_aggregator(self, column: Column) -> Column: 

289 return f.count_if(column) 

290 

291 def count_distinct_aggregator(self, column: Column) -> Column: 

292 return f.countDistinct(column) 

293 

294 def approx_count_distinct_aggregator(self, column: Column) -> Column: 

295 return f.approx_count_distinct(column) 

296 

297 def max_aggregator(self, column: Column) -> Column: 

298 return f.max(column) 

299 

300 def min_aggregator(self, column: Column) -> Column: 

301 return f.min(column) 

302 

303 def collect_set_aggregator(self, column: Column) -> Column: 

304 return f.collect_set(column) 

305 

306 @abstractmethod 

307 def aggregator(self) -> Callable[[Column], Column]: 

308 """Aggregator function""" 

309 

310 @property 

311 def column(self) -> Column: 

312 return f.coalesce( 

313 self.aggregator()( 

314 f.when( 

315 (f.to_date(f.col("Timestamp")) >= f.lit(self.start_date)) 

316 & (f.to_date(f.col("Timestamp")) <= f.lit(self.end_date)), 

317 self.column_expression(), 

318 ) 

319 ), 

320 self.default_value(), 

321 ).alias(self.feature_name, metadata=self.column_metadata)