Skip to content

Commit ae69233

Browse files
committed
feat(service): implement CountryQueryService for complex country data queries
- Add CountryQueryService to handle advanced queries on country data - Implement caching mechanism with TTL for frequently requested queries - Support filtering by active sources, headlines, and text search - Build MongoDB aggregation pipeline based on query parameters - Implement pagination and sorting for query results - Add methods for cache cleanup and service disposal
1 parent bda2264 commit ae69233

File tree

1 file changed

+310
-0
lines changed

1 file changed

+310
-0
lines changed
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
import 'dart:async';
2+
import 'dart:collection'; // Added for SplayTreeMap
3+
import 'dart:convert';
4+
5+
import 'package:collection/collection.dart';
6+
import 'package:core/core.dart';
7+
import 'package:data_repository/data_repository.dart';
8+
import 'package:logging/logging.dart';
9+
import 'package:mongo_dart/mongo_dart.dart';
10+
11+
/// {@template country_query_service}
12+
/// A service responsible for executing complex queries on country data,
13+
/// including filtering by active sources and headlines, and supporting
14+
/// compound filters with text search.
15+
///
16+
/// This service also implements robust in-memory caching with a configurable
17+
/// Time-To-Live (TTL) to optimize performance for frequently requested queries.
18+
/// {@endtemplate}
19+
class CountryQueryService {
20+
/// {@macro country_query_service}
21+
CountryQueryService({
22+
required DataRepository<Headline> headlineRepository,
23+
required DataRepository<Source> sourceRepository,
24+
required DataRepository<Country> countryRepository,
25+
required Logger log,
26+
Duration cacheDuration = const Duration(minutes: 15),
27+
}) : _headlineRepository = headlineRepository,
28+
_sourceRepository = sourceRepository,
29+
_countryRepository = countryRepository,
30+
_log = log,
31+
_cacheDuration = cacheDuration {
32+
_cleanupTimer = Timer.periodic(const Duration(minutes: 5), (_) {
33+
_cleanupCache();
34+
});
35+
_log.info('CountryQueryService initialized with cache duration: $cacheDuration');
36+
}
37+
38+
final DataRepository<Headline> _headlineRepository;
39+
final DataRepository<Source> _sourceRepository;
40+
final DataRepository<Country> _countryRepository;
41+
final Logger _log;
42+
final Duration _cacheDuration;
43+
44+
final Map<String, ({PaginatedResponse<Country> data, DateTime expiry})>
45+
_cache = {};
46+
Timer? _cleanupTimer;
47+
bool _isDisposed = false;
48+
49+
/// Retrieves a paginated list of countries based on the provided filters,
50+
/// including special filters for active sources and headlines, and text search.
51+
///
52+
/// This method supports compound filtering by combining `q` (text search),
53+
/// `hasActiveSources`, `hasActiveHeadlines`, and other standard filters.
54+
/// Results are cached to improve performance.
55+
///
56+
/// - [filter]: A map containing query conditions. Special keys like
57+
/// `hasActiveSources` and `hasActiveHeadlines` trigger aggregation logic.
58+
/// The `q` key triggers a text search on country names.
59+
/// - [pagination]: Optional pagination parameters.
60+
/// - [sort]: Optional sorting options.
61+
///
62+
/// Throws [OperationFailedException] for unexpected errors during query
63+
/// execution or cache operations.
64+
Future<PaginatedResponse<Country>> getFilteredCountries({
65+
required Map<String, dynamic> filter,
66+
PaginationOptions? pagination,
67+
List<SortOption>? sort,
68+
}) async {
69+
if (_isDisposed) {
70+
_log.warning('Attempted to query on disposed service.');
71+
throw const OperationFailedException('Service is disposed.');
72+
}
73+
74+
final cacheKey = _generateCacheKey(filter, pagination, sort);
75+
final cachedEntry = _cache[cacheKey];
76+
77+
if (cachedEntry != null && DateTime.now().isBefore(cachedEntry.expiry)) {
78+
_log.finer('Returning cached result for key: $cacheKey');
79+
return cachedEntry.data;
80+
}
81+
82+
_log.info('Executing new query for countries with filter: $filter');
83+
try {
84+
final pipeline = _buildAggregationPipeline(filter, pagination, sort);
85+
final aggregationResult = await _countryRepository.aggregate(
86+
pipeline: pipeline,
87+
);
88+
89+
// MongoDB aggregation returns a list of maps. We need to convert these
90+
// back into Country objects.
91+
final List<Country> countries = aggregationResult
92+
.map((json) => Country.fromJson(json))
93+
.toList();
94+
95+
// For aggregation queries, pagination and hasMore need to be handled
96+
// manually if not directly supported by the aggregation stages.
97+
// For simplicity, we'll assume the aggregation pipeline handles limit/skip
98+
// and we'll determine hasMore based on if we fetched more than the limit.
99+
final int limit = pagination?.limit ?? countries.length;
100+
final bool hasMore = countries.length > limit;
101+
final List<Country> paginatedCountries =
102+
countries.take(limit).toList();
103+
104+
final response = PaginatedResponse<Country>(
105+
items: paginatedCountries,
106+
cursor: null, // Aggregation doesn't typically return a cursor directly
107+
hasMore: hasMore,
108+
);
109+
110+
_cache[cacheKey] = (data: response, expiry: DateTime.now().add(_cacheDuration));
111+
_log.finer('Cached new result for key: $cacheKey');
112+
113+
return response;
114+
} on HttpException {
115+
rethrow; // Propagate known HTTP exceptions
116+
} catch (e, s) {
117+
_log.severe('Error fetching filtered countries: $e', e, s);
118+
throw OperationFailedException(
119+
'Failed to retrieve filtered countries: $e',
120+
);
121+
}
122+
}
123+
124+
/// Builds the MongoDB aggregation pipeline based on the provided filters.
125+
List<Map<String, dynamic>> _buildAggregationPipeline(
126+
Map<String, dynamic> filter,
127+
PaginationOptions? pagination,
128+
List<SortOption>? sort,
129+
) {
130+
final pipeline = <Map<String, dynamic>>[];
131+
final compoundMatchStages = <Map<String, dynamic>>[];
132+
133+
// --- Stage 1: Initial Match for active status (if applicable) ---
134+
// All countries should be active by default for these queries
135+
compoundMatchStages.add({
136+
'status': ContentStatus.active.name,
137+
});
138+
139+
// --- Stage 2: Handle `hasActiveSources` filter ---
140+
if (filter['hasActiveSources'] == true) {
141+
pipeline.add({
142+
r'$lookup': {
143+
'from': 'sources',
144+
'localField': '_id',
145+
'foreignField': 'headquarters._id',
146+
'as': 'matchingSources',
147+
},
148+
});
149+
pipeline.add({
150+
r'$match': {
151+
'matchingSources': {r'$ne': []}, // Ensure there's at least one source
152+
'matchingSources.status': ContentStatus.active.name,
153+
},
154+
});
155+
}
156+
157+
// --- Stage 3: Handle `hasActiveHeadlines` filter ---
158+
if (filter['hasActiveHeadlines'] == true) {
159+
pipeline.add({
160+
r'$lookup': {
161+
'from': 'headlines',
162+
'localField': r'_id',
163+
'foreignField': r'eventCountry._id',
164+
'as': 'matchingHeadlines',
165+
},
166+
});
167+
pipeline.add({
168+
r'$match': {
169+
'matchingHeadlines': {r'$ne': []}, // Ensure there's at least one headline
170+
'matchingHeadlines.status': ContentStatus.active.name,
171+
},
172+
});
173+
}
174+
175+
// --- Stage 4: Handle `q` (text search) filter ---
176+
final qValue = filter['q'];
177+
if (qValue is String && qValue.isNotEmpty) {
178+
compoundMatchStages.add({
179+
r'$text': {r'$search': qValue},
180+
});
181+
}
182+
183+
// --- Stage 5: Handle other standard filters ---
184+
filter.forEach((key, value) {
185+
if (key != 'q' && key != 'hasActiveSources' && key != 'hasActiveHeadlines') {
186+
compoundMatchStages.add({key: value});
187+
}
188+
});
189+
190+
// Combine all compound match stages
191+
if (compoundMatchStages.isNotEmpty) {
192+
pipeline.add({r'$match': {r'$and': compoundMatchStages}});
193+
}
194+
195+
// --- Stage 6: Project to original Country structure and ensure uniqueness ---
196+
// After lookups and matches, we might have duplicate countries if they
197+
// matched multiple sources/headlines. We need to group them back to unique countries.
198+
pipeline.add({
199+
r'$group': {
200+
r'_id': r'$_id', // Group by the original country ID
201+
'doc': {r'$first': r'$$ROOT'}, // Take the first full document
202+
},
203+
});
204+
pipeline.add({
205+
r'$replaceRoot': {
206+
'newRoot': r'$doc', // Replace root with the original document
207+
},
208+
});
209+
210+
// --- Stage 7: Sorting ---
211+
if (sort != null && sort.isNotEmpty) {
212+
final sortStage = <String, dynamic>{};
213+
for (final option in sort) {
214+
sortStage[option.field] = option.order == SortOrder.asc ? 1 : -1;
215+
}
216+
pipeline.add({r'$sort': sortStage});
217+
}
218+
219+
// --- Stage 8: Pagination (Skip and Limit) ---
220+
if (pagination?.cursor != null) {
221+
// For cursor-based pagination, we'd typically need a more complex
222+
// aggregation that sorts by the cursor field and then skips.
223+
// For simplicity, this example assumes offset-based pagination or
224+
// that the client handles cursor logic.
225+
_log.warning(
226+
'Cursor-based pagination is not fully implemented for aggregation '
227+
'queries in CountryQueryService. Only limit/skip is supported.',
228+
);
229+
}
230+
if (pagination?.limit != null) {
231+
// Fetch one more than the limit to determine 'hasMore'
232+
pipeline.add({r'$limit': pagination!.limit! + 1});
233+
}
234+
235+
// Project to match the Country model's JSON structure if necessary
236+
// (e.g., if _id was used, map it back to id)
237+
pipeline.add({
238+
r'$project': {
239+
r'_id': 0, // Exclude _id
240+
'id': {r'$toString': r'$_id'}, // Map _id back to id
241+
'isoCode': r'$isoCode',
242+
'name': r'$name',
243+
'flagUrl': r'$flagUrl',
244+
'createdAt': r'$createdAt',
245+
'updatedAt': r'$updatedAt',
246+
'status': r'$status',
247+
// Ensure other fields are projected if they were modified or needed
248+
},
249+
});
250+
251+
return pipeline;
252+
}
253+
254+
/// Generates a unique cache key from the query parameters.
255+
String _generateCacheKey(
256+
Map<String, dynamic> filter,
257+
PaginationOptions? pagination,
258+
List<SortOption>? sort,
259+
) {
260+
final sortedFilter = SplayTreeMap<String, dynamic>.from(filter);
261+
final List<SortOption>? sortedSort;
262+
if (sort != null) {
263+
sortedSort = List<SortOption>.from(sort)
264+
..sort((a, b) => a.field.compareTo(b.field));
265+
} else {
266+
sortedSort = null;
267+
}
268+
269+
final keyData = {
270+
'filter': sortedFilter,
271+
'pagination': {
272+
'cursor': pagination?.cursor,
273+
'limit': pagination?.limit,
274+
},
275+
'sort': sortedSort?.map((s) => '${s.field}:${s.order.name}').toList(),
276+
};
277+
return json.encode(keyData);
278+
}
279+
280+
/// Cleans up expired entries from the in-memory cache.
281+
void _cleanupCache() {
282+
if (_isDisposed) return;
283+
284+
final now = DateTime.now();
285+
final expiredKeys = <String>[];
286+
287+
_cache.forEach((key, value) {
288+
if (now.isAfter(value.expiry)) {
289+
expiredKeys.add(key);
290+
}
291+
});
292+
293+
if (expiredKeys.isNotEmpty) {
294+
expiredKeys.forEach(_cache.remove);
295+
_log.info('Cleaned up ${expiredKeys.length} expired cache entries.');
296+
} else {
297+
_log.finer('Cache cleanup ran, no expired entries found.');
298+
}
299+
}
300+
301+
/// Disposes of resources, specifically the periodic cache cleanup timer.
302+
void dispose() {
303+
if (!_isDisposed) {
304+
_isDisposed = true;
305+
_cleanupTimer?.cancel();
306+
_cache.clear();
307+
_log.info('CountryQueryService disposed.');
308+
}
309+
}
310+
}

0 commit comments

Comments
 (0)