Skip to content

Commit ad70ea5

Browse files
committed
add BridgeMonitor and handle url change event
1 parent e3de950 commit ad70ea5

File tree

8 files changed

+502
-16
lines changed

8 files changed

+502
-16
lines changed

crates/sui-bridge/src/client/bridge_authority_aggregator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub struct BridgeAuthorityAggregator {
3131

3232
impl BridgeAuthorityAggregator {
3333
pub fn new(committee: Arc<BridgeCommittee>) -> Self {
34-
let clients = committee
34+
let clients: BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>> = committee
3535
.members()
3636
.iter()
3737
.filter_map(|(name, authority)| {

crates/sui-bridge/src/e2e_tests/basic.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ async fn test_bridge_from_eth_to_sui_to_eth() {
8585
.expect("Recipient should have received ETH coin now")
8686
.clone();
8787
assert_eq!(eth_coin.balance, sui_amount);
88+
info!("Eth to sui bridge transfer finished");
8889

8990
// Now let the recipient send the coin back to ETH
9091
let eth_address_1 = EthAddress::random();

crates/sui-bridge/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod eth_syncer;
1313
pub mod eth_transaction_builder;
1414
pub mod events;
1515
pub mod metrics;
16+
pub mod monitor;
1617
pub mod node;
1718
pub mod orchestrator;
1819
pub mod server;

crates/sui-bridge/src/monitor.rs

Lines changed: 368 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,368 @@
1+
// Copyright (c) Mysten Labs, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! `BridgeMonitor` receives all `SuiBridgeEvent` and handles them accordingly.
5+
6+
use arc_swap::ArcSwap;
7+
use std::sync::Arc;
8+
use tokio::time::Duration;
9+
10+
use crate::client::bridge_authority_aggregator::BridgeAuthorityAggregator;
11+
use crate::crypto::BridgeAuthorityPublicKeyBytes;
12+
use crate::events::CommitteeMemberUrlUpdateEvent;
13+
use crate::events::SuiBridgeEvent;
14+
use crate::retry_with_max_elapsed_time;
15+
use crate::sui_client::{SuiClient, SuiClientInner};
16+
use crate::types::BridgeCommittee;
17+
use tracing::{error, info, warn};
18+
19+
const REFRESH_COMMITTEE_RETRY_TIMES: u64 = 3;
20+
21+
pub struct BridgeMonitor<C> {
22+
sui_client: Arc<SuiClient<C>>,
23+
monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
24+
bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
25+
}
26+
27+
impl<C> BridgeMonitor<C>
28+
where
29+
C: SuiClientInner + 'static,
30+
{
31+
pub fn new(
32+
sui_client: Arc<SuiClient<C>>,
33+
monitor_rx: mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
34+
bridge_auth_agg: Arc<ArcSwap<BridgeAuthorityAggregator>>,
35+
) -> Self {
36+
Self {
37+
sui_client,
38+
monitor_rx,
39+
bridge_auth_agg,
40+
}
41+
}
42+
43+
pub async fn run(self) {
44+
tracing::info!("Starting BridgeMonitor");
45+
let Self {
46+
sui_client,
47+
mut monitor_rx,
48+
bridge_auth_agg,
49+
} = self;
50+
51+
while let Some(events) = monitor_rx.recv().await {
52+
match events {
53+
SuiBridgeEvent::SuiToEthTokenBridgeV1(_) => (),
54+
SuiBridgeEvent::TokenTransferApproved(_) => (),
55+
SuiBridgeEvent::TokenTransferClaimed(_) => (),
56+
SuiBridgeEvent::TokenTransferAlreadyApproved(_) => (),
57+
SuiBridgeEvent::TokenTransferAlreadyClaimed(_) => (),
58+
SuiBridgeEvent::TokenTransferLimitExceed(_) => {
59+
// TODO
60+
}
61+
SuiBridgeEvent::EmergencyOpEvent(_) => {
62+
// TODO
63+
}
64+
SuiBridgeEvent::CommitteeMemberRegistration(_) => (),
65+
SuiBridgeEvent::CommitteeUpdateEvent(_) => (),
66+
SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(event) => {
67+
info!("Received CommitteeMemberUrlUpdateEvent: {:?}", event);
68+
let new_committee = get_latest_bridge_committee_with_url_update_event(
69+
sui_client.clone(),
70+
event,
71+
Duration::from_secs(10),
72+
)
73+
.await;
74+
bridge_auth_agg.store(Arc::new(BridgeAuthorityAggregator::new(Arc::new(
75+
new_committee,
76+
))));
77+
info!("Committee updated");
78+
}
79+
SuiBridgeEvent::BlocklistValidatorEvent(_) => {
80+
// TODO
81+
}
82+
SuiBridgeEvent::TokenRegistrationEvent(_) => (),
83+
SuiBridgeEvent::NewTokenEvent(_) => {
84+
// TODO
85+
}
86+
SuiBridgeEvent::UpdateTokenPriceEvent(_) => (),
87+
}
88+
}
89+
90+
panic!("BridgeMonitor channel was closed unexpectedly");
91+
}
92+
}
93+
94+
async fn get_latest_bridge_committee_with_url_update_event<C: SuiClientInner>(
95+
sui_client: Arc<SuiClient<C>>,
96+
event: CommitteeMemberUrlUpdateEvent,
97+
staleness_retry_interval: Duration,
98+
) -> BridgeCommittee {
99+
let mut remaining_retry_times = REFRESH_COMMITTEE_RETRY_TIMES;
100+
loop {
101+
let Ok(Ok(committee)) = retry_with_max_elapsed_time!(
102+
sui_client.get_bridge_committee(),
103+
Duration::from_secs(600)
104+
) else {
105+
error!("Failed to get bridge committee after retry");
106+
continue;
107+
};
108+
let member = committee.member(&BridgeAuthorityPublicKeyBytes::from(&event.member));
109+
if member.is_none() {
110+
// This is possible when a node is processing an older event while the member quitted at a later point, which is fine.
111+
// Or fullnode returns a stale committee that the member hasn't joined, which is rare and tricy to handle so we just log it.
112+
warn!(
113+
"Committee member not found in the committee: {:?}",
114+
event.member
115+
);
116+
return committee;
117+
}
118+
if member.unwrap().base_url == event.new_url {
119+
return committee;
120+
}
121+
// If url does not match, it could be:
122+
// 1. the query is sent to a stale fullnode that does not have the latest data yet
123+
// 2. the node is processing an older message, and the latest url has changed again
124+
// In either case, we retry a few times. If it still fails to match, we assume it's the latter case.
125+
tokio::time::sleep(staleness_retry_interval).await;
126+
remaining_retry_times -= 1;
127+
if remaining_retry_times == 0 {
128+
warn!(
129+
"Committee member url {:?} does not match onchain record {:?} after retry",
130+
event.member, member
131+
);
132+
return committee;
133+
}
134+
}
135+
}
136+
137+
#[cfg(test)]
138+
mod tests {
139+
use super::*;
140+
use crate::events::init_all_struct_tags;
141+
use crate::test_utils::{
142+
bridge_committee_to_bridge_committee_summary, get_test_authority_and_key,
143+
};
144+
use fastcrypto::traits::KeyPair;
145+
use prometheus::Registry;
146+
use sui_types::base_types::SuiAddress;
147+
use sui_types::bridge::BridgeCommitteeSummary;
148+
use sui_types::bridge::MoveTypeCommitteeMember;
149+
use sui_types::crypto::get_key_pair;
150+
151+
use crate::{sui_mock_client::SuiMockClient, types::BridgeCommittee};
152+
use sui_types::crypto::ToFromBytes;
153+
154+
#[tokio::test]
155+
async fn test_get_latest_bridge_committee_with_url_update_event() {
156+
telemetry_subscribers::init_for_testing();
157+
let sui_client_mock = SuiMockClient::default();
158+
let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
159+
let (_, kp): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
160+
let pk = kp.public().clone();
161+
let pk_as_bytes = BridgeAuthorityPublicKeyBytes::from(&pk);
162+
let pk_bytes = pk_as_bytes.as_bytes().to_vec();
163+
let event = CommitteeMemberUrlUpdateEvent {
164+
member: pk,
165+
new_url: "http://new.url".to_string(),
166+
};
167+
let summary = BridgeCommitteeSummary {
168+
members: vec![(
169+
pk_bytes.clone(),
170+
MoveTypeCommitteeMember {
171+
sui_address: SuiAddress::random_for_testing_only(),
172+
bridge_pubkey_bytes: pk_bytes.clone(),
173+
voting_power: 10000,
174+
http_rest_url: "http://new.url".to_string().as_bytes().to_vec(),
175+
blocklisted: false,
176+
},
177+
)],
178+
member_registration: vec![],
179+
last_committee_update_epoch: 0,
180+
};
181+
182+
// Test the regular case, the onchain url matches
183+
sui_client_mock.set_bridge_committee(summary.clone());
184+
let timer = std::time::Instant::now();
185+
let committee = get_latest_bridge_committee_with_url_update_event(
186+
sui_client.clone(),
187+
event.clone(),
188+
Duration::from_secs(2),
189+
)
190+
.await;
191+
assert_eq!(
192+
committee.member(&pk_as_bytes).unwrap().base_url,
193+
"http://new.url"
194+
);
195+
assert!(timer.elapsed().as_millis() < 500);
196+
197+
// Test the case where the onchain url is older. Then update onchain url in 1 second.
198+
// Since the retry interval is 2 seconds, it should return the next retry.
199+
let old_summary = BridgeCommitteeSummary {
200+
members: vec![(
201+
pk_bytes.clone(),
202+
MoveTypeCommitteeMember {
203+
sui_address: SuiAddress::random_for_testing_only(),
204+
bridge_pubkey_bytes: pk_bytes.clone(),
205+
voting_power: 10000,
206+
http_rest_url: "http://old.url".to_string().as_bytes().to_vec(),
207+
blocklisted: false,
208+
},
209+
)],
210+
member_registration: vec![],
211+
last_committee_update_epoch: 0,
212+
};
213+
sui_client_mock.set_bridge_committee(old_summary.clone());
214+
let timer = std::time::Instant::now();
215+
// update the url to "http://new.url" in 1 second
216+
let sui_client_mock_clone = sui_client_mock.clone();
217+
tokio::spawn(async move {
218+
tokio::time::sleep(Duration::from_secs(1)).await;
219+
sui_client_mock_clone.set_bridge_committee(summary.clone());
220+
});
221+
let committee = get_latest_bridge_committee_with_url_update_event(
222+
sui_client.clone(),
223+
event.clone(),
224+
Duration::from_secs(2),
225+
)
226+
.await;
227+
assert_eq!(
228+
committee.member(&pk_as_bytes).unwrap().base_url,
229+
"http://new.url"
230+
);
231+
let elapsed = timer.elapsed().as_millis();
232+
assert!(elapsed > 1000 && elapsed < 3000);
233+
234+
// Test the case where the onchain url is newer. It should retry up to
235+
// REFRESH_COMMITTEE_RETRY_TIMES time then return the onchain record.
236+
let newer_summary = BridgeCommitteeSummary {
237+
members: vec![(
238+
pk_bytes.clone(),
239+
MoveTypeCommitteeMember {
240+
sui_address: SuiAddress::random_for_testing_only(),
241+
bridge_pubkey_bytes: pk_bytes.clone(),
242+
voting_power: 10000,
243+
http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
244+
blocklisted: false,
245+
},
246+
)],
247+
member_registration: vec![],
248+
last_committee_update_epoch: 0,
249+
};
250+
sui_client_mock.set_bridge_committee(newer_summary.clone());
251+
let timer = std::time::Instant::now();
252+
let committee = get_latest_bridge_committee_with_url_update_event(
253+
sui_client.clone(),
254+
event.clone(),
255+
Duration::from_millis(500),
256+
)
257+
.await;
258+
assert_eq!(
259+
committee.member(&pk_as_bytes).unwrap().base_url,
260+
"http://newer.url"
261+
);
262+
let elapsed = timer.elapsed().as_millis();
263+
assert!(elapsed > 500 * REFRESH_COMMITTEE_RETRY_TIMES as u128);
264+
265+
// Test the case where the member is not found in the committee
266+
// It should return the onchain record.
267+
let (_, kp2): (_, fastcrypto::secp256k1::Secp256k1KeyPair) = get_key_pair();
268+
let pk2 = kp2.public().clone();
269+
let pk_as_bytes2 = BridgeAuthorityPublicKeyBytes::from(&pk2);
270+
let pk_bytes2 = pk_as_bytes2.as_bytes().to_vec();
271+
let newer_summary = BridgeCommitteeSummary {
272+
members: vec![(
273+
pk_bytes2.clone(),
274+
MoveTypeCommitteeMember {
275+
sui_address: SuiAddress::random_for_testing_only(),
276+
bridge_pubkey_bytes: pk_bytes2.clone(),
277+
voting_power: 10000,
278+
http_rest_url: "http://newer.url".to_string().as_bytes().to_vec(),
279+
blocklisted: false,
280+
},
281+
)],
282+
member_registration: vec![],
283+
last_committee_update_epoch: 0,
284+
};
285+
sui_client_mock.set_bridge_committee(newer_summary.clone());
286+
let timer = std::time::Instant::now();
287+
let committee = get_latest_bridge_committee_with_url_update_event(
288+
sui_client.clone(),
289+
event.clone(),
290+
Duration::from_secs(1),
291+
)
292+
.await;
293+
assert_eq!(
294+
committee.member(&pk_as_bytes2).unwrap().base_url,
295+
"http://newer.url"
296+
);
297+
assert!(committee.member(&pk_as_bytes).is_none());
298+
let elapsed = timer.elapsed().as_millis();
299+
assert!(elapsed < 1000);
300+
}
301+
302+
#[tokio::test]
303+
async fn test_update_bridge_authority_aggregation_with_url_change_event() {
304+
let (monitor_tx, monitor_rx, sui_client_mock, sui_client) = setup();
305+
let mut authorities = vec![
306+
get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
307+
get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
308+
get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
309+
get_test_authority_and_key(2500, 0 /* port, dummy value */).0,
310+
];
311+
let old_committee = BridgeCommittee::new(authorities.clone()).unwrap();
312+
let agg = Arc::new(ArcSwap::new(Arc::new(BridgeAuthorityAggregator::new(
313+
Arc::new(old_committee),
314+
))));
315+
let _handle = tokio::task::spawn(
316+
BridgeMonitor::new(sui_client.clone(), monitor_rx, agg.clone()).run(),
317+
);
318+
let new_url = "http://new.url".to_string();
319+
authorities[0].base_url = new_url.clone();
320+
let new_committee = BridgeCommittee::new(authorities.clone()).unwrap();
321+
let new_committee_summary =
322+
bridge_committee_to_bridge_committee_summary(new_committee.clone());
323+
sui_client_mock.set_bridge_committee(new_committee_summary.clone());
324+
monitor_tx
325+
.send(SuiBridgeEvent::CommitteeMemberUrlUpdateEvent(
326+
CommitteeMemberUrlUpdateEvent {
327+
member: authorities[0].pubkey.clone(),
328+
new_url: new_url.clone(),
329+
},
330+
))
331+
.await
332+
.unwrap();
333+
// Wait for the monitor to process the event
334+
tokio::time::sleep(Duration::from_secs(1)).await;
335+
// Now expect the committee to be updated
336+
assert_eq!(
337+
agg.load()
338+
.committee
339+
.member(&BridgeAuthorityPublicKeyBytes::from(&authorities[0].pubkey))
340+
.unwrap()
341+
.base_url,
342+
new_url
343+
);
344+
}
345+
346+
fn setup() -> (
347+
mysten_metrics::metered_channel::Sender<SuiBridgeEvent>,
348+
mysten_metrics::metered_channel::Receiver<SuiBridgeEvent>,
349+
SuiMockClient,
350+
Arc<SuiClient<SuiMockClient>>,
351+
) {
352+
telemetry_subscribers::init_for_testing();
353+
let registry = Registry::new();
354+
mysten_metrics::init_metrics(&registry);
355+
init_all_struct_tags();
356+
357+
let sui_client_mock = SuiMockClient::default();
358+
let sui_client = Arc::new(SuiClient::new_for_testing(sui_client_mock.clone()));
359+
let (monitor_tx, monitor_rx) = mysten_metrics::metered_channel::channel(
360+
10000,
361+
&mysten_metrics::get_metrics()
362+
.unwrap()
363+
.channel_inflight
364+
.with_label_values(&["monitor_queue"]),
365+
);
366+
(monitor_tx, monitor_rx, sui_client_mock, sui_client)
367+
}
368+
}

0 commit comments

Comments
 (0)