@@ -9,7 +9,6 @@ use bytes::buf::Reader;
9
9
use bytes:: { Buf , Bytes } ;
10
10
use futures:: { StreamExt , TryStreamExt } ;
11
11
use prometheus:: { register_int_counter_vec_with_registry, IntCounterVec , Registry } ;
12
- use rand:: seq:: SliceRandom ;
13
12
use std:: borrow:: Borrow ;
14
13
use std:: future;
15
14
use std:: ops:: Range ;
@@ -58,84 +57,12 @@ impl ArchiveReaderMetrics {
58
57
}
59
58
}
60
59
61
- // ArchiveReaderBalancer selects archives for reading based on whether they can fulfill a checkpoint request
62
- #[ derive( Default , Clone ) ]
63
- pub struct ArchiveReaderBalancer {
64
- readers : Vec < Arc < ArchiveReader > > ,
65
- }
66
-
67
- impl ArchiveReaderBalancer {
68
- pub fn new ( configs : Vec < ArchiveReaderConfig > , registry : & Registry ) -> Result < Self > {
69
- let mut readers = vec ! [ ] ;
70
- let metrics = ArchiveReaderMetrics :: new ( registry) ;
71
- for config in configs. into_iter ( ) {
72
- readers. push ( Arc :: new ( ArchiveReader :: new ( config. clone ( ) , & metrics) ?) ) ;
73
- }
74
- Ok ( ArchiveReaderBalancer { readers } )
75
- }
76
- pub async fn get_archive_watermark ( & self ) -> Result < Option < u64 > > {
77
- let mut checkpoints: Vec < Result < CheckpointSequenceNumber > > = vec ! [ ] ;
78
- for reader in self
79
- . readers
80
- . iter ( )
81
- . filter ( |r| r. use_for_pruning_watermark ( ) )
82
- {
83
- let latest_checkpoint = reader. latest_available_checkpoint ( ) . await ;
84
- info ! (
85
- "Latest archived checkpoint in remote store: {:?} is: {:?}" ,
86
- reader. remote_store_identifier( ) ,
87
- latest_checkpoint
88
- ) ;
89
- checkpoints. push ( latest_checkpoint)
90
- }
91
- let checkpoints: Result < Vec < CheckpointSequenceNumber > > = checkpoints. into_iter ( ) . collect ( ) ;
92
- checkpoints. map ( |vec| vec. into_iter ( ) . min ( ) )
93
- }
94
- pub async fn pick_one_random (
95
- & self ,
96
- checkpoint_range : Range < CheckpointSequenceNumber > ,
97
- ) -> Option < Arc < ArchiveReader > > {
98
- let mut archives_with_complete_range = vec ! [ ] ;
99
- for reader in self . readers . iter ( ) {
100
- let latest_checkpoint = reader. latest_available_checkpoint ( ) . await . unwrap_or ( 0 ) ;
101
- if latest_checkpoint >= checkpoint_range. end {
102
- archives_with_complete_range. push ( reader. clone ( ) ) ;
103
- }
104
- }
105
- if !archives_with_complete_range. is_empty ( ) {
106
- return Some (
107
- archives_with_complete_range
108
- . choose ( & mut rand:: thread_rng ( ) )
109
- . unwrap ( )
110
- . clone ( ) ,
111
- ) ;
112
- }
113
- let mut archives_with_partial_range = vec ! [ ] ;
114
- for reader in self . readers . iter ( ) {
115
- let latest_checkpoint = reader. latest_available_checkpoint ( ) . await . unwrap_or ( 0 ) ;
116
- if latest_checkpoint >= checkpoint_range. start {
117
- archives_with_partial_range. push ( reader. clone ( ) ) ;
118
- }
119
- }
120
- if !archives_with_partial_range. is_empty ( ) {
121
- return Some (
122
- archives_with_partial_range
123
- . choose ( & mut rand:: thread_rng ( ) )
124
- . unwrap ( )
125
- . clone ( ) ,
126
- ) ;
127
- }
128
- None
129
- }
130
- }
131
-
132
60
#[ derive( Clone ) ]
133
61
pub struct ArchiveReader {
134
62
bucket : String ,
135
63
concurrency : usize ,
136
64
sender : Arc < Sender < ( ) > > ,
137
65
manifest : Arc < Mutex < Manifest > > ,
138
- use_for_pruning_watermark : bool ,
139
66
remote_object_store : Arc < dyn ObjectStoreGetExt > ,
140
67
archive_reader_metrics : Arc < ArchiveReaderMetrics > ,
141
68
}
@@ -161,7 +88,6 @@ impl ArchiveReader {
161
88
manifest,
162
89
sender : Arc :: new ( sender) ,
163
90
remote_object_store,
164
- use_for_pruning_watermark : config. use_for_pruning_watermark ,
165
91
concurrency : config. download_concurrency . get ( ) ,
166
92
archive_reader_metrics : metrics. clone ( ) ,
167
93
} )
@@ -524,10 +450,6 @@ impl ArchiveReader {
524
450
. context ( "No checkpoint data in archive" )
525
451
}
526
452
527
- pub fn use_for_pruning_watermark ( & self ) -> bool {
528
- self . use_for_pruning_watermark
529
- }
530
-
531
453
pub fn remote_store_identifier ( & self ) -> String {
532
454
self . remote_object_store . to_string ( )
533
455
}
@@ -565,7 +487,7 @@ impl ArchiveReader {
565
487
}
566
488
567
489
/// Insert checkpoint summary if it doesn't already exist after verifying it
568
- fn get_or_insert_verified_checkpoint < S > (
490
+ pub fn get_or_insert_verified_checkpoint < S > (
569
491
store : & S ,
570
492
certified_checkpoint : CertifiedCheckpointSummary ,
571
493
verify : bool ,
0 commit comments