@@ -45,6 +45,7 @@ pub enum FetchError {
45
45
46
46
pub type FetchResult = Result < FetchData , FetchError > ;
47
47
48
+ #[ derive( Clone ) ]
48
49
pub enum FetchData {
49
50
Raw ( Bytes ) ,
50
51
CheckpointData ( CheckpointData ) ,
@@ -148,8 +149,7 @@ impl IngestionClient {
148
149
FetchData :: Raw ( bytes) => {
149
150
self . metrics . total_ingested_bytes . inc_by ( bytes. len ( ) as u64 ) ;
150
151
Blob :: from_bytes ( & bytes) . map_err ( |e| {
151
- self . metrics . inc_retry (
152
- checkpoint,
152
+ self . metrics . inc_permanent (
153
153
"deserialization" ,
154
154
IngestionError :: DeserializationError ( checkpoint, e) ,
155
155
)
@@ -214,3 +214,183 @@ impl IngestionClient {
214
214
Ok ( Arc :: new ( data) )
215
215
}
216
216
}
217
+
218
+ #[ cfg( test) ]
219
+ mod tests {
220
+ use prometheus:: Registry ;
221
+ use std:: collections:: HashMap ;
222
+ use std:: sync:: Arc ;
223
+ use std:: time:: Duration ;
224
+ use tokio:: sync:: Mutex ;
225
+ use tokio_util:: bytes:: Bytes ;
226
+
227
+ use crate :: ingestion:: test_utils:: test_checkpoint_data;
228
+
229
+ use super :: * ;
230
+
231
+ /// Mock implementation of IngestionClientTrait for testing
232
+ #[ derive( Default ) ]
233
+ struct MockIngestionClient {
234
+ checkpoints : Arc < Mutex < HashMap < u64 , FetchData > > > ,
235
+ transient_failures : Arc < Mutex < HashMap < u64 , usize > > > ,
236
+ not_found_failures : Arc < Mutex < HashMap < u64 , usize > > > ,
237
+ }
238
+
239
+ #[ async_trait:: async_trait]
240
+ impl IngestionClientTrait for MockIngestionClient {
241
+ async fn fetch ( & self , checkpoint : u64 ) -> FetchResult {
242
+ // Check for not found failures
243
+ if let Some ( remaining) = self . not_found_failures . lock ( ) . await . get_mut ( & checkpoint) {
244
+ if * remaining > 0 {
245
+ * remaining -= 1 ;
246
+ return Err ( FetchError :: NotFound ) ;
247
+ }
248
+ }
249
+
250
+ // Check for transient failures
251
+ if let Some ( remaining) = self . transient_failures . lock ( ) . await . get_mut ( & checkpoint) {
252
+ if * remaining > 0 {
253
+ * remaining -= 1 ;
254
+ return Err ( FetchError :: Transient {
255
+ reason : "mock_transient_error" ,
256
+ error : anyhow:: anyhow!( "Mock transient error" ) ,
257
+ } ) ;
258
+ }
259
+ }
260
+
261
+ // Return the checkpoint data if it exists
262
+ if let Some ( data) = self . checkpoints . lock ( ) . await . get ( & checkpoint) {
263
+ Ok ( data. clone ( ) )
264
+ } else {
265
+ Err ( FetchError :: NotFound )
266
+ }
267
+ }
268
+ }
269
+
270
+ fn setup_test ( ) -> ( IngestionClient , Arc < MockIngestionClient > ) {
271
+ let registry = Registry :: new_custom ( Some ( "test" . to_string ( ) ) , None ) . unwrap ( ) ;
272
+ let metrics = IndexerMetrics :: new ( & registry) ;
273
+ let mock_client = Arc :: new ( MockIngestionClient :: default ( ) ) ;
274
+ let client = IngestionClient :: new_impl ( mock_client. clone ( ) , metrics) ;
275
+ ( client, mock_client)
276
+ }
277
+
278
+ #[ tokio:: test]
279
+ async fn test_fetch_raw_bytes_success ( ) {
280
+ let ( client, mock) = setup_test ( ) ;
281
+
282
+ // Create test data using test_checkpoint_data
283
+ let bytes = Bytes :: from ( test_checkpoint_data ( 1 ) ) ;
284
+ mock. checkpoints
285
+ . lock ( )
286
+ . await
287
+ . insert ( 1 , FetchData :: Raw ( bytes. clone ( ) ) ) ;
288
+
289
+ // Fetch and verify
290
+ let result = client. fetch ( 1 ) . await . unwrap ( ) ;
291
+ assert_eq ! ( result. checkpoint_summary. sequence_number( ) , & 1 ) ;
292
+ }
293
+
294
+ #[ tokio:: test]
295
+ async fn test_fetch_checkpoint_data_success ( ) {
296
+ let ( client, mock) = setup_test ( ) ;
297
+
298
+ // Create test data using test_checkpoint_data
299
+ let bytes = test_checkpoint_data ( 1 ) ;
300
+ let checkpoint_data: CheckpointData = Blob :: from_bytes ( & bytes) . unwrap ( ) ;
301
+ mock. checkpoints
302
+ . lock ( )
303
+ . await
304
+ . insert ( 1 , FetchData :: CheckpointData ( checkpoint_data. clone ( ) ) ) ;
305
+
306
+ // Fetch and verify
307
+ let result = client. fetch ( 1 ) . await . unwrap ( ) ;
308
+ assert_eq ! ( result. checkpoint_summary. sequence_number( ) , & 1 ) ;
309
+ }
310
+
311
+ #[ tokio:: test]
312
+ async fn test_fetch_not_found ( ) {
313
+ let ( client, _) = setup_test ( ) ;
314
+
315
+ // Try to fetch non-existent checkpoint
316
+ let result = client. fetch ( 1 ) . await ;
317
+ assert ! ( matches!( result, Err ( IngestionError :: NotFound ( 1 ) ) ) ) ;
318
+ }
319
+
320
+ #[ tokio:: test]
321
+ async fn test_fetch_transient_error_with_retry ( ) {
322
+ let ( client, mock) = setup_test ( ) ;
323
+
324
+ // Create test data using test_checkpoint_data
325
+ let bytes = test_checkpoint_data ( 1 ) ;
326
+ let checkpoint_data: CheckpointData = Blob :: from_bytes ( & bytes) . unwrap ( ) ;
327
+
328
+ // Add checkpoint to mock with 2 transient failures
329
+ mock. checkpoints
330
+ . lock ( )
331
+ . await
332
+ . insert ( 1 , FetchData :: CheckpointData ( checkpoint_data. clone ( ) ) ) ;
333
+ mock. transient_failures . lock ( ) . await . insert ( 1 , 2 ) ;
334
+
335
+ // Fetch and verify it succeeds after retries
336
+ let result = client. fetch ( 1 ) . await . unwrap ( ) ;
337
+ assert_eq ! ( * result. checkpoint_summary. sequence_number( ) , 1 ) ;
338
+ }
339
+
340
+ #[ tokio:: test]
341
+ async fn test_wait_for_checkpoint_with_retry ( ) {
342
+ let ( client, mock) = setup_test ( ) ;
343
+
344
+ // Create test data using test_checkpoint_data
345
+ let bytes = test_checkpoint_data ( 1 ) ;
346
+ let checkpoint_data: CheckpointData = Blob :: from_bytes ( & bytes) . unwrap ( ) ;
347
+
348
+ // Add checkpoint to mock with 1 not_found failures
349
+ mock. checkpoints
350
+ . lock ( )
351
+ . await
352
+ . insert ( 1 , FetchData :: CheckpointData ( checkpoint_data) ) ;
353
+ mock. not_found_failures . lock ( ) . await . insert ( 1 , 1 ) ;
354
+
355
+ // Wait for checkpoint with short retry interval
356
+ let result = client. wait_for ( 1 , Duration :: from_millis ( 50 ) ) . await . unwrap ( ) ;
357
+ assert_eq ! ( result. checkpoint_summary. sequence_number( ) , & 1 ) ;
358
+ }
359
+
360
+ #[ tokio:: test]
361
+ async fn test_wait_for_checkpoint_instant ( ) {
362
+ let ( client, mock) = setup_test ( ) ;
363
+
364
+ // Create test data using test_checkpoint_data
365
+ let bytes = test_checkpoint_data ( 1 ) ;
366
+ let checkpoint_data: CheckpointData = Blob :: from_bytes ( & bytes) . unwrap ( ) ;
367
+
368
+ // Add checkpoint to mock with no failures - data should be available immediately
369
+ mock. checkpoints
370
+ . lock ( )
371
+ . await
372
+ . insert ( 1 , FetchData :: CheckpointData ( checkpoint_data) ) ;
373
+
374
+ // Wait for checkpoint with short retry interval
375
+ let result = client. wait_for ( 1 , Duration :: from_millis ( 50 ) ) . await . unwrap ( ) ;
376
+ assert_eq ! ( result. checkpoint_summary. sequence_number( ) , & 1 ) ;
377
+ }
378
+
379
+ #[ tokio:: test]
380
+ async fn test_wait_for_checkpoint_permanent_deserialization_error ( ) {
381
+ let ( client, mock) = setup_test ( ) ;
382
+
383
+ // Add invalid data that will cause a deserialization error
384
+ mock. checkpoints
385
+ . lock ( )
386
+ . await
387
+ . insert ( 1 , FetchData :: Raw ( Bytes :: from ( "invalid data" ) ) ) ;
388
+
389
+ // Wait for checkpoint should fail immediately with a permanent error
390
+ let result = client. wait_for ( 1 , Duration :: from_millis ( 50 ) ) . await ;
391
+ assert ! ( matches!(
392
+ result,
393
+ Err ( IngestionError :: DeserializationError ( 1 , _) )
394
+ ) ) ;
395
+ }
396
+ }
0 commit comments