@@ -87,16 +87,16 @@ impl HistoryMetaHandle {
87
87
self . node_id . clone ( ) ,
88
88
Duration :: from_secs ( 3 ) ,
89
89
) )
90
- . await
91
- . map_err ( |_e| "acquire semaphore failed from GlobalHistoryLog" ) ?;
90
+ . await
91
+ . map_err ( |_e| _e . to_string ( ) ) ?;
92
92
if interval == 0 {
93
93
return Ok ( Some ( acquired_guard) ) ;
94
94
}
95
95
if match ThreadTracker :: tracking_future (
96
96
self . meta_client
97
97
. get_kv ( & format ! ( "{}/last_timestamp" , meta_key) ) ,
98
98
)
99
- . await ?
99
+ . await ?
100
100
{
101
101
Some ( v) => {
102
102
let last: u64 = serde_json:: from_slice ( & v. data ) ?;
@@ -169,3 +169,78 @@ impl HistoryMetaHandle {
169
169
Ok ( ( ) )
170
170
}
171
171
}
172
+
173
+ #[ cfg( test) ]
174
+ mod tests {
175
+ use crate :: history_tables:: meta:: HistoryMetaHandle ;
176
+ use databend_common_meta_store:: MetaStoreProvider ;
177
+ use std:: ops:: Deref ;
178
+ use std:: time:: Duration ;
179
+ use tokio:: time:: timeout;
180
+
181
+ #[ tokio:: test( flavor = "multi_thread" ) ]
182
+ pub async fn test_history_table_permit_guard ( ) -> databend_common_exception:: Result < ( ) > {
183
+ let meta_store = MetaStoreProvider :: new ( Default :: default ( ) )
184
+ . create_meta_store ( )
185
+ . await
186
+ . unwrap ( ) ;
187
+ let meta_client = meta_store. deref ( ) . clone ( ) ;
188
+
189
+
190
+ let node_id = "test_node_123" . to_string ( ) ;
191
+ let meta_handle = HistoryMetaHandle :: new ( meta_client, node_id) ;
192
+
193
+ // Test 1: Basic permit acquisition with interval 0 (no rate limiting)
194
+ let meta_key = "test/history_table/permit_guard" ;
195
+ let guard_result = meta_handle. acquire_with_guard ( meta_key, 0 ) . await ?;
196
+ assert ! ( guard_result. is_some( ) , "Should acquire permit when interval is 0" ) ;
197
+
198
+ if let Some ( guard) = guard_result {
199
+ // Verify that the guard contains the correct meta key
200
+ assert_eq ! ( guard. meta_key, meta_key) ;
201
+ }
202
+
203
+ // Same meta key, because we set the interval to 0, it should not block
204
+ let guard_result2 = timeout ( Duration :: from_secs ( 1 ) , meta_handle. acquire_with_guard ( meta_key, 0 ) ) . await ;
205
+ assert ! ( guard_result2. is_ok( ) , "Should acquire permit again when interval is 0" ) ;
206
+ if let Some ( guard) = guard_result2. unwrap ( ) ? {
207
+ // Verify that the guard contains the correct meta key
208
+ assert_eq ! ( guard. meta_key, meta_key) ;
209
+ }
210
+
211
+ // Test 2: Permit acquisition with interval > 0 (rate limiting)
212
+ let meta_key_rate_limited = "test/history_table/permit_guard_rate_limited" ;
213
+ let interval_seconds = 2 ;
214
+
215
+ // First acquisition should succeed
216
+ let first_guard_result = meta_handle. acquire_with_guard ( meta_key_rate_limited, interval_seconds) . await ?;
217
+ assert ! ( first_guard_result. is_some( ) , "First permit acquisition should succeed" ) ;
218
+
219
+ // Drop the first guard to trigger timestamp update
220
+ drop ( first_guard_result) ;
221
+
222
+ // Immediate second acquisition should fail due to rate limiting
223
+ let second_guard_result = meta_handle. acquire_with_guard ( meta_key_rate_limited, interval_seconds) . await ?;
224
+ assert ! ( second_guard_result. is_none( ) , "Second permit acquisition should fail due to rate limiting" ) ;
225
+
226
+ // Test 3: Verify permit guard automatically updates timestamp on drop
227
+ let meta_key_timestamp = "test/history_table/permit_guard_timestamp" ;
228
+
229
+ // Get initial timestamp (should be None)
230
+ let initial_timestamp = meta_handle. get_u64_from_meta ( & format ! ( "{}/last_timestamp" , meta_key_timestamp) ) . await ?;
231
+ assert ! ( initial_timestamp. is_none( ) , "Initial timestamp should be None" ) ;
232
+
233
+ // Acquire permit with guard
234
+ let guard = meta_handle. acquire_with_guard ( meta_key_timestamp, 0 ) . await ?;
235
+ assert ! ( guard. is_some( ) , "Should acquire permit" ) ;
236
+
237
+ // Drop guard to trigger timestamp update
238
+ drop ( guard) ;
239
+
240
+ // Verify timestamp was updated
241
+ let updated_timestamp = meta_handle. get_u64_from_meta ( & format ! ( "{}/last_timestamp" , meta_key_timestamp) ) . await ?;
242
+ assert ! ( updated_timestamp. is_some( ) , "Timestamp should be updated after guard drop" ) ;
243
+
244
+ Ok ( ( ) )
245
+ }
246
+ }
0 commit comments