Coverage for jstark / features / feature.py: 100%
154 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-30 09:29 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-30 09:29 +0000
1"""Feature abstract base class
3All feature classes are derived from here
4"""
6from abc import ABCMeta, abstractmethod
7from datetime import date, timedelta, datetime
8from typing import Callable
9from pyspark.sql import Column
10import pyspark.sql.functions as f
11import pendulum
13from jstark.feature_period import FeaturePeriod, PeriodUnitOfMeasure
14from jstark.features.first_and_last_date_of_period import FirstAndLastDateOfPeriod
15from jstark.exceptions import AsAtIsNotADate
18class Feature(metaclass=ABCMeta):
19 def __init__(
20 self,
21 as_at: date,
22 feature_period: FeaturePeriod,
23 first_day_of_week: str | None = None,
24 use_absolute_periods: bool = False,
25 ) -> None:
26 self.feature_period = feature_period
27 if isinstance(as_at, datetime):
28 import warnings
30 warnings.warn(f"as_at={as_at!r} was converted to a date")
31 as_at = as_at.date()
32 if not isinstance(as_at, date):
33 raise AsAtIsNotADate
34 self._as_at = as_at
35 self._first_day_of_week = first_day_of_week
36 self._use_absolute_periods = use_absolute_periods
38 @property
39 def feature_period(self) -> FeaturePeriod:
40 return self._feature_period
42 @feature_period.setter
43 def feature_period(self, value) -> None:
44 self._feature_period = value
46 @property
47 def as_at(self) -> date:
48 return self._as_at
50 @property
51 def feature_name(self) -> str:
52 suffix = (
53 self.column_metadata["period-absolute"]
54 if self._use_absolute_periods
55 else self.feature_period.mnemonic
56 )
57 return f"{type(self).__name__}_{suffix}"
59 @property
60 @abstractmethod
61 def column(self) -> Column:
62 """Complete definition of the column returned by this feature,
63 replete with feature period filtering, metadata, default value
64 and alias"""
66 @property
67 @abstractmethod
68 def description_subject(self) -> str:
69 """Desciption of the feature that will be concatenated
70 with an explanation of the feature period.
71 """
73 @property
74 def commentary(self) -> str:
75 return "No commentary supplied"
77 @abstractmethod
78 def default_value(self) -> Column:
79 """Default value of the feature, typically used when zero rows match
80 the feature's feature_period
81 """
83 @abstractmethod
84 def column_expression(self) -> Column:
85 """The expression that defines the feature"""
87 @property
88 def start_date(self) -> date:
89 p_as_at = pendulum.date(self.as_at.year, self.as_at.month, self.as_at.day)
90 n_days_ago = self.as_at - timedelta(days=self.feature_period.start)
91 n_weeks_ago = self.as_at - timedelta(weeks=self.feature_period.start)
92 n_months_ago = p_as_at.subtract(months=self.feature_period.start)
93 n_quarters_ago = p_as_at.subtract(months=self.feature_period.start * 3)
94 n_years_ago = p_as_at.subtract(years=self.feature_period.start)
95 match self.feature_period.period_unit_of_measure:
96 case PeriodUnitOfMeasure.DAY:
97 return n_days_ago
98 case PeriodUnitOfMeasure.WEEK:
99 return FirstAndLastDateOfPeriod(
100 n_weeks_ago, self._first_day_of_week
101 ).first_date_in_week
102 case PeriodUnitOfMeasure.MONTH:
103 return FirstAndLastDateOfPeriod(n_months_ago).first_date_in_month
104 case PeriodUnitOfMeasure.QUARTER:
105 return FirstAndLastDateOfPeriod(n_quarters_ago).first_date_in_quarter
106 case _: # PeriodUnitOfMeasure.YEAR:
107 return FirstAndLastDateOfPeriod(n_years_ago).first_date_in_year
109 @property
110 def end_date(self) -> date:
111 p_as_at = pendulum.date(self.as_at.year, self.as_at.month, self.as_at.day)
112 n_days_ago = self.as_at - timedelta(days=self.feature_period.end)
113 n_weeks_ago = self.as_at - timedelta(weeks=self.feature_period.end)
114 n_months_ago = p_as_at.subtract(months=self.feature_period.end)
115 n_quarters_ago = p_as_at.subtract(months=self.feature_period.end * 3)
116 n_years_ago = p_as_at.subtract(years=self.feature_period.end)
117 match self.feature_period.period_unit_of_measure:
118 case PeriodUnitOfMeasure.DAY:
119 last_day_of_period = n_days_ago
120 case PeriodUnitOfMeasure.WEEK:
121 last_day_of_period = FirstAndLastDateOfPeriod(
122 n_weeks_ago, self._first_day_of_week
123 ).last_date_in_week
124 case PeriodUnitOfMeasure.MONTH:
125 last_day_of_period = FirstAndLastDateOfPeriod(
126 n_months_ago
127 ).last_date_in_month
128 case PeriodUnitOfMeasure.QUARTER:
129 last_day_of_period = FirstAndLastDateOfPeriod(
130 n_quarters_ago
131 ).last_date_in_quarter
132 case _: # PeriodUnitOfMeasure.YEAR:
133 last_day_of_period = FirstAndLastDateOfPeriod(
134 n_years_ago
135 ).last_date_in_year
136 # min() is used to ensure we don't return a date later than self.as_at
137 return min(last_day_of_period, self.as_at)
139 @property
140 def column_metadata(self) -> dict[str, str]:
141 period_absolute_start_period: str = ""
142 period_absolute_end_period: str = ""
143 match self.feature_period.period_unit_of_measure:
144 case PeriodUnitOfMeasure.DAY:
145 period_absolute_start_period = (
146 pendulum.instance(self.as_at)
147 .subtract(days=self.feature_period.start)
148 .format("YYYYMMDD")
149 )
150 period_absolute_end_period = (
151 pendulum.instance(self.as_at)
152 .subtract(days=self.feature_period.end)
153 .format("YYYYMMDD")
154 )
155 case PeriodUnitOfMeasure.WEEK:
156 period_absolute_start_period = self._week_label(self.start_date)
157 end_week_start = FirstAndLastDateOfPeriod(
158 pendulum.instance(self.as_at).subtract(
159 weeks=self.feature_period.end
160 ),
161 self._first_day_of_week,
162 ).first_date_in_week
163 period_absolute_end_period = self._week_label(end_week_start)
164 case PeriodUnitOfMeasure.MONTH:
165 period_absolute_start_period = (
166 pendulum.instance(self.as_at)
167 .subtract(months=self.feature_period.start)
168 .format("YYYYMMM")
169 )
170 period_absolute_end_period = (
171 pendulum.instance(self.as_at)
172 .subtract(months=self.feature_period.end)
173 .format("YYYYMMM")
174 )
175 case PeriodUnitOfMeasure.QUARTER:
176 dt_start = pendulum.instance(self.as_at).subtract(
177 months=self.feature_period.start * 3
178 )
179 period_absolute_start_period = f"{dt_start.year}Q{dt_start.quarter}"
180 dt_end = pendulum.instance(self.as_at).subtract(
181 months=self.feature_period.end * 3
182 )
183 period_absolute_end_period = f"{dt_end.year}Q{dt_end.quarter}"
184 case _: # PeriodUnitOfMeasure.YEAR:
185 period_absolute_start_period = str(
186 pendulum.instance(self.as_at)
187 .subtract(years=self.feature_period.start)
188 .year
189 )
190 period_absolute_end_period = str(
191 pendulum.instance(self.as_at)
192 .subtract(years=self.feature_period.end)
193 .year
194 )
195 return {
196 "created-with-love-by": "https://github.com/jamiekt/jstark",
197 "start-date": self.start_date.strftime("%Y-%m-%d"),
198 "end-date": self.end_date.strftime("%Y-%m-%d"),
199 "description": (
200 f"{self.description_subject} between "
201 + f"{self.start_date.strftime('%Y-%m-%d')} and "
202 + f"{self.end_date.strftime('%Y-%m-%d')}"
203 ),
204 "generated-at": datetime.now().strftime("%Y-%m-%d"),
205 "commentary": self.commentary,
206 "name-stem": str(type(self).__name__),
207 "period-absolute-start": period_absolute_start_period,
208 "period-absolute-end": period_absolute_end_period,
209 "period-absolute": period_absolute_start_period
210 if period_absolute_start_period == period_absolute_end_period
211 else f"{period_absolute_start_period}-{period_absolute_end_period}",
212 }
214 def _week_label(self, week_start_date: date) -> str:
215 """Convert the first day of a week to a label like 2026W13.
217 W01 of a year starts on the first occurrence of first_day_of_week
218 on or before Jan 1 of that year.
219 """
220 weekdays = [
221 "Monday",
222 "Tuesday",
223 "Wednesday",
224 "Thursday",
225 "Friday",
226 "Saturday",
227 "Sunday",
228 ]
229 first_day_of_week = self._first_day_of_week or "Monday"
230 target_weekday = weekdays.index(first_day_of_week)
231 year = week_start_date.year
233 jan1 = date(year, 1, 1)
234 days_back = (jan1.weekday() - target_weekday) % 7
235 w01_start = pendulum.instance(jan1).subtract(days=days_back)
237 jan1_next = date(year + 1, 1, 1)
238 days_back_next = (jan1_next.weekday() - target_weekday) % 7
239 w01_start_next = pendulum.instance(jan1_next).subtract(days=days_back_next)
241 if week_start_date >= w01_start_next:
242 w01_start = w01_start_next
243 year = year + 1
245 week_number = (week_start_date - w01_start).days // 7 + 1
246 return f"{year}W{week_number:02d}"
248 def __repr__(self) -> str:
249 return (
250 f"{self.__class__.__name__}"
251 f"(as_at={self.as_at}"
252 f", feature_period='{self.feature_period.mnemonic}'"
253 f", first_day_of_week={self._first_day_of_week!r})"
254 )
257class DerivedFeature(Feature, metaclass=ABCMeta):
258 """A DerivedFeature is a feature that is calculated by combining
259 data that has already been aggregated. For example, a derived
260 feature called 'Average Gross Spend Per Basket' would be calculated
261 by dividing the total GrossSpend by number of baskets (BasketCount)
262 """
264 @property
265 def column(self) -> Column:
266 return f.coalesce(self.column_expression(), self.default_value()).alias(
267 self.feature_name, metadata=self.column_metadata
268 )
271class BaseFeature(Feature, metaclass=ABCMeta):
272 """A BaseFeature is a feature that is calculated by aggregating
273 raw source data. That data may have been cleaned and transformed in
274 some way, but typically the grain of that data is real occurrences
275 of some activity. Examples of such data are lists of grocery
276 transactions, phone calls or journeys.
277 """
279 def sum_aggregator(self, column: Column) -> Column:
280 return f.sum(column)
282 def count_aggregator(self, column: Column) -> Column:
283 return f.count(column)
285 def count_if_aggregator(self, column: Column) -> Column:
286 return f.count_if(column)
288 def count_distinct_aggregator(self, column: Column) -> Column:
289 return f.countDistinct(column)
291 def approx_count_distinct_aggregator(self, column: Column) -> Column:
292 return f.approx_count_distinct(column)
294 def max_aggregator(self, column: Column) -> Column:
295 return f.max(column)
297 def min_aggregator(self, column: Column) -> Column:
298 return f.min(column)
300 def collect_set_aggregator(self, column: Column) -> Column:
301 return f.collect_set(column)
303 @abstractmethod
304 def aggregator(self) -> Callable[[Column], Column]:
305 """Aggregator function"""
307 @property
308 def column(self) -> Column:
309 return f.coalesce(
310 self.aggregator()(
311 f.when(
312 (f.to_date(f.col("Timestamp")) >= f.lit(self.start_date))
313 & (f.to_date(f.col("Timestamp")) <= f.lit(self.end_date)),
314 self.column_expression(),
315 )
316 ),
317 self.default_value(),
318 ).alias(self.feature_name, metadata=self.column_metadata)