Coverage for jstark / features / feature.py: 100%
153 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-23 22:34 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-23 22:34 +0000
1"""Feature abstract base class
3All feature classes are derived from here
4"""
6from abc import ABCMeta, abstractmethod
7from datetime import date, timedelta, datetime
8from typing import Callable
9from dateutil.relativedelta import relativedelta
12from pyspark.sql import Column
13import pyspark.sql.functions as f
14import pendulum
16from jstark.feature_period import FeaturePeriod, PeriodUnitOfMeasure
17from jstark.features.first_and_last_date_of_period import FirstAndLastDateOfPeriod
18from jstark.exceptions import AsAtIsNotADate
21class Feature(metaclass=ABCMeta):
22 def __init__(
23 self,
24 as_at: date,
25 feature_period: FeaturePeriod,
26 first_day_of_week: str | None = None,
27 use_absolute_periods: bool = False,
28 ) -> None:
29 self.feature_period = feature_period
30 if isinstance(as_at, datetime):
31 import warnings
33 warnings.warn(f"as_at={as_at!r} was converted to a date")
34 as_at = as_at.date()
35 if not isinstance(as_at, date):
36 raise AsAtIsNotADate
37 self._as_at = as_at
38 self._first_day_of_week = first_day_of_week
39 self._use_absolute_periods = use_absolute_periods
41 @property
42 def feature_period(self) -> FeaturePeriod:
43 return self._feature_period
45 @feature_period.setter
46 def feature_period(self, value) -> None:
47 self._feature_period = value
49 @property
50 def as_at(self) -> date:
51 return self._as_at
53 @property
54 def feature_name(self) -> str:
55 suffix = (
56 self.column_metadata["period-absolute"]
57 if self._use_absolute_periods
58 else self.feature_period.mnemonic
59 )
60 return f"{type(self).__name__}_{suffix}"
62 @property
63 @abstractmethod
64 def column(self) -> Column:
65 """Complete definition of the column returned by this feature,
66 replete with feature period filtering, metadata, default value
67 and alias"""
69 @property
70 @abstractmethod
71 def description_subject(self) -> str:
72 """Desciption of the feature that will be concatenated
73 with an explanation of the feature period.
74 """
76 @property
77 def commentary(self) -> str:
78 return "No commentary supplied"
80 @abstractmethod
81 def default_value(self) -> Column:
82 """Default value of the feature, typically used when zero rows match
83 the feature's feature_period
84 """
86 @abstractmethod
87 def column_expression(self) -> Column:
88 """The expression that defines the feature"""
90 @property
91 def start_date(self) -> date:
92 n_days_ago = self.as_at - timedelta(days=self.feature_period.start)
93 n_weeks_ago = self.as_at - timedelta(weeks=self.feature_period.start)
94 n_months_ago = self.as_at - relativedelta(months=self.feature_period.start)
95 n_quarters_ago = self.as_at - relativedelta(
96 months=self.feature_period.start * 3
97 )
98 n_years_ago = self.as_at - relativedelta(years=self.feature_period.start)
99 match self.feature_period.period_unit_of_measure:
100 case PeriodUnitOfMeasure.DAY:
101 return n_days_ago
102 case PeriodUnitOfMeasure.WEEK:
103 return FirstAndLastDateOfPeriod(
104 n_weeks_ago, self._first_day_of_week
105 ).first_date_in_week
106 case PeriodUnitOfMeasure.MONTH:
107 return FirstAndLastDateOfPeriod(n_months_ago).first_date_in_month
108 case PeriodUnitOfMeasure.QUARTER:
109 return FirstAndLastDateOfPeriod(n_quarters_ago).first_date_in_quarter
110 case _: # PeriodUnitOfMeasure.YEAR:
111 return FirstAndLastDateOfPeriod(n_years_ago).first_date_in_year
113 @property
114 def end_date(self) -> date:
115 n_days_ago = self.as_at - timedelta(days=self.feature_period.end)
116 n_weeks_ago = self.as_at - timedelta(weeks=self.feature_period.end)
117 n_months_ago = self.as_at - relativedelta(months=self.feature_period.end)
118 n_quarters_ago = self.as_at - relativedelta(months=self.feature_period.end * 3)
119 n_years_ago = self.as_at - relativedelta(years=self.feature_period.end)
120 match self.feature_period.period_unit_of_measure:
121 case PeriodUnitOfMeasure.DAY:
122 last_day_of_period = n_days_ago
123 case PeriodUnitOfMeasure.WEEK:
124 last_day_of_period = FirstAndLastDateOfPeriod(
125 n_weeks_ago, self._first_day_of_week
126 ).last_date_in_week
127 case PeriodUnitOfMeasure.MONTH:
128 last_day_of_period = FirstAndLastDateOfPeriod(
129 n_months_ago
130 ).last_date_in_month
131 case PeriodUnitOfMeasure.QUARTER:
132 last_day_of_period = FirstAndLastDateOfPeriod(
133 n_quarters_ago
134 ).last_date_in_quarter
135 case _: # PeriodUnitOfMeasure.YEAR:
136 last_day_of_period = FirstAndLastDateOfPeriod(
137 n_years_ago
138 ).last_date_in_year
139 # min() is used to ensure we don't return a date later than self.as_at
140 return min(last_day_of_period, self.as_at)
142 @property
143 def column_metadata(self) -> dict[str, str]:
144 period_absolute_start_period: str = ""
145 period_absolute_end_period: str = ""
146 match self.feature_period.period_unit_of_measure:
147 case PeriodUnitOfMeasure.DAY:
148 period_absolute_start_period = (
149 pendulum.instance(self.as_at)
150 .subtract(days=self.feature_period.start)
151 .format("YYYYMMDD")
152 )
153 period_absolute_end_period = (
154 pendulum.instance(self.as_at)
155 .subtract(days=self.feature_period.end)
156 .format("YYYYMMDD")
157 )
158 case PeriodUnitOfMeasure.WEEK:
159 period_absolute_start_period = self._week_label(self.start_date)
160 end_week_start = FirstAndLastDateOfPeriod(
161 pendulum.instance(self.as_at).subtract(
162 weeks=self.feature_period.end
163 ),
164 self._first_day_of_week,
165 ).first_date_in_week
166 period_absolute_end_period = self._week_label(end_week_start)
167 case PeriodUnitOfMeasure.MONTH:
168 period_absolute_start_period = (
169 pendulum.instance(self.as_at)
170 .subtract(months=self.feature_period.start)
171 .format("YYYYMMM")
172 )
173 period_absolute_end_period = (
174 pendulum.instance(self.as_at)
175 .subtract(months=self.feature_period.end)
176 .format("YYYYMMM")
177 )
178 case PeriodUnitOfMeasure.QUARTER:
179 dt_start = pendulum.instance(self.as_at).subtract(
180 months=self.feature_period.start * 3
181 )
182 period_absolute_start_period = f"{dt_start.year}Q{dt_start.quarter}"
183 dt_end = pendulum.instance(self.as_at).subtract(
184 months=self.feature_period.end * 3
185 )
186 period_absolute_end_period = f"{dt_end.year}Q{dt_end.quarter}"
187 case _: # PeriodUnitOfMeasure.YEAR:
188 period_absolute_start_period = str(
189 pendulum.instance(self.as_at)
190 .subtract(years=self.feature_period.start)
191 .year
192 )
193 period_absolute_end_period = str(
194 pendulum.instance(self.as_at)
195 .subtract(years=self.feature_period.end)
196 .year
197 )
198 return {
199 "created-with-love-by": "https://github.com/jamiekt/jstark",
200 "start-date": self.start_date.strftime("%Y-%m-%d"),
201 "end-date": self.end_date.strftime("%Y-%m-%d"),
202 "description": (
203 f"{self.description_subject} between "
204 + f"{self.start_date.strftime('%Y-%m-%d')} and "
205 + f"{self.end_date.strftime('%Y-%m-%d')}"
206 ),
207 "generated-at": datetime.now().strftime("%Y-%m-%d"),
208 "commentary": self.commentary,
209 "name-stem": str(type(self).__name__),
210 "period-absolute-start": period_absolute_start_period,
211 "period-absolute-end": period_absolute_end_period,
212 "period-absolute": period_absolute_start_period
213 if period_absolute_start_period == period_absolute_end_period
214 else f"{period_absolute_start_period}-{period_absolute_end_period}",
215 }
217 def _week_label(self, week_start_date: date) -> str:
218 """Convert the first day of a week to a label like 2026W13.
220 W01 of a year starts on the first occurrence of first_day_of_week
221 on or before Jan 1 of that year.
222 """
223 weekdays = [
224 "Monday",
225 "Tuesday",
226 "Wednesday",
227 "Thursday",
228 "Friday",
229 "Saturday",
230 "Sunday",
231 ]
232 first_day_of_week = self._first_day_of_week or "Monday"
233 target_weekday = weekdays.index(first_day_of_week)
234 year = week_start_date.year
236 jan1 = date(year, 1, 1)
237 days_back = (jan1.weekday() - target_weekday) % 7
238 w01_start = pendulum.instance(jan1).subtract(days=days_back)
240 jan1_next = date(year + 1, 1, 1)
241 days_back_next = (jan1_next.weekday() - target_weekday) % 7
242 w01_start_next = pendulum.instance(jan1_next).subtract(days=days_back_next)
244 if week_start_date >= w01_start_next:
245 w01_start = w01_start_next
246 year = year + 1
248 week_number = (week_start_date - w01_start).days // 7 + 1
249 return f"{year}W{week_number:02d}"
251 def __repr__(self) -> str:
252 return (
253 f"{self.__class__.__name__}"
254 f"(as_at={self.as_at}"
255 f", feature_period='{self.feature_period.mnemonic}'"
256 f", first_day_of_week={self._first_day_of_week!r})"
257 )
260class DerivedFeature(Feature, metaclass=ABCMeta):
261 """A DerivedFeature is a feature that is calculated by combining
262 data that has already been aggregated. For example, a derived
263 feature called 'Average Gross Spend Per Basket' would be calculated
264 by dividing the total GrossSpend by number of baskets (BasketCount)
265 """
267 @property
268 def column(self) -> Column:
269 return f.coalesce(self.column_expression(), self.default_value()).alias(
270 self.feature_name, metadata=self.column_metadata
271 )
274class BaseFeature(Feature, metaclass=ABCMeta):
275 """A BaseFeature is a feature that is calculated by aggregating
276 raw source data. That data may have been cleaned and transformed in
277 some way, but typically the grain of that data is real occurrences
278 of some activity. Examples of such data are lists of grocery
279 transactions, phone calls or journeys.
280 """
282 def sum_aggregator(self, column: Column) -> Column:
283 return f.sum(column)
285 def count_aggregator(self, column: Column) -> Column:
286 return f.count(column)
288 def count_if_aggregator(self, column: Column) -> Column:
289 return f.count_if(column)
291 def count_distinct_aggregator(self, column: Column) -> Column:
292 return f.countDistinct(column)
294 def approx_count_distinct_aggregator(self, column: Column) -> Column:
295 return f.approx_count_distinct(column)
297 def max_aggregator(self, column: Column) -> Column:
298 return f.max(column)
300 def min_aggregator(self, column: Column) -> Column:
301 return f.min(column)
303 def collect_set_aggregator(self, column: Column) -> Column:
304 return f.collect_set(column)
306 @abstractmethod
307 def aggregator(self) -> Callable[[Column], Column]:
308 """Aggregator function"""
310 @property
311 def column(self) -> Column:
312 return f.coalesce(
313 self.aggregator()(
314 f.when(
315 (f.to_date(f.col("Timestamp")) >= f.lit(self.start_date))
316 & (f.to_date(f.col("Timestamp")) <= f.lit(self.end_date)),
317 self.column_expression(),
318 )
319 ),
320 self.default_value(),
321 ).alias(self.feature_name, metadata=self.column_metadata)