Skip to content

Commit 5713917

Browse files
author
“ramfox”
committed
chore: upgrade to latest iroh main, and add n0-watcher dependency
1 parent 3711890 commit 5713917

File tree

11 files changed

+710
-834
lines changed

11 files changed

+710
-834
lines changed

Cargo.lock

Lines changed: 664 additions & 802 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ derive_more = { version = "2.0.1", features = ["from", "try_from", "into", "debu
1111
futures-lite = "2.6.0"
1212
quinn = { package = "iroh-quinn", version = "0.13.0" }
1313
n0-future = "0.1.2"
14+
n0-watcher = "0.2.0"
1415
range-collections = { version = "0.4.6", features = ["serde"] }
1516
redb = "2.4.0"
1617
smallvec = { version = "1", features = ["serde", "const_new"] }
@@ -28,10 +29,10 @@ chrono = "0.4.39"
2829
nested_enum_utils = "0.1.0"
2930
ref-cast = "1.0.24"
3031
arrayvec = "0.7.6"
31-
iroh = "0.35.0"
32+
iroh = "0.35"
3233
self_cell = "1.1.0"
3334
genawaiter = { version = "0.99.1", features = ["futures03"] }
34-
iroh-base = "0.35.0"
35+
iroh-base = "0.35"
3536
reflink-copy = "0.1.24"
3637
irpc = { version = "0.3.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream"], default-features = false }
3738
irpc-derive = { version = "0.3.0", default-features = false }
@@ -57,3 +58,7 @@ walkdir = "2.5.0"
5758
hide-proto-docs = []
5859
metrics = []
5960
default = ["hide-proto-docs"]
61+
62+
[patch.crates-io]
63+
iroh = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
64+
iroh-base = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }

src/api/blobs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ impl Blobs {
430430
&self,
431431
hash: Hash,
432432
ranges: ChunkRanges,
433-
stream: &mut quinn::RecvStream,
433+
stream: &mut iroh::endpoint::RecvStream,
434434
) -> RequestResult<()> {
435435
let reader = TokioStreamReader::new(stream);
436436
self.import_bao_reader(hash, ranges, reader).await?;

src/api/downloader.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ impl DialNode {
630630
Ok(Err(e)) => {
631631
warn!("Failed to connect to node {}: {}", self.id, e);
632632
*guard = SlotState::AttemptFailed(SystemTime::now());
633-
Err(e)
633+
Err(e.into())
634634
}
635635
Err(e) => {
636636
warn!("Failed to connect to node {}: {}", self.id, e);
@@ -690,6 +690,7 @@ mod tests {
690690

691691
use bao_tree::ChunkRanges;
692692
use n0_future::StreamExt;
693+
use n0_watcher::Watcher;
693694
use testresult::TestResult;
694695

695696
use crate::{
@@ -710,9 +711,9 @@ mod tests {
710711
let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
711712
let tt1 = store1.add_slice("hello world").await?;
712713
let tt2 = store2.add_slice("hello world 2").await?;
713-
let node1_addr = r1.endpoint().node_addr().await?;
714+
let node1_addr = r1.endpoint().node_addr().initialized().await?;
714715
let node1_id = node1_addr.node_id;
715-
let node2_addr = r2.endpoint().node_addr().await?;
716+
let node2_addr = r2.endpoint().node_addr().initialized().await?;
716717
let node2_id = node2_addr.node_id;
717718
let swarm = Downloader::new(&store3, r3.endpoint());
718719
r3.endpoint().add_node_addr(node1_addr.clone())?;
@@ -749,9 +750,9 @@ mod tests {
749750
format: crate::BlobFormat::HashSeq,
750751
})
751752
.await?;
752-
let node1_addr = r1.endpoint().node_addr().await?;
753+
let node1_addr = r1.endpoint().node_addr().initialized().await?;
753754
let node1_id = node1_addr.node_id;
754-
let node2_addr = r2.endpoint().node_addr().await?;
755+
let node2_addr = r2.endpoint().node_addr().initialized().await?;
755756
let node2_id = node2_addr.node_id;
756757
let swarm = Downloader::new(&store3, r3.endpoint());
757758
r3.endpoint().add_node_addr(node1_addr.clone())?;
@@ -818,9 +819,9 @@ mod tests {
818819
format: crate::BlobFormat::HashSeq,
819820
})
820821
.await?;
821-
let node1_addr = r1.endpoint().node_addr().await?;
822+
let node1_addr = r1.endpoint().node_addr().initialized().await?;
822823
let node1_id = node1_addr.node_id;
823-
let node2_addr = r2.endpoint().node_addr().await?;
824+
let node2_addr = r2.endpoint().node_addr().initialized().await?;
824825
let node2_id = node2_addr.node_id;
825826
let swarm = Downloader::new(&store3, r3.endpoint());
826827
r3.endpoint().add_node_addr(node1_addr.clone())?;

src/api/remote.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
//!
33
//! The entry point is the [`Download`] struct.
44
use genawaiter::sync::{Co, Gen};
5+
use iroh::endpoint::SendStream;
56
use irpc::util::{AsyncReadVarintExt, WriteVarintExt};
67
use n0_future::{Stream, StreamExt, io};
7-
use quinn::SendStream;
88
use ref_cast::RefCast;
99

1010
use super::blobs::Bitfield;

src/get.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ pub mod fsm {
253253
RequestTooBig,
254254
/// Error when writing the request to the [`SendStream`].
255255
#[error("write: {0}")]
256-
Write(#[from] quinn::WriteError),
256+
Write(#[from] iroh::endpoint::WriteError),
257257
/// Quic connection is closed.
258258
#[error("closed")]
259259
Closed(#[from] quinn::ClosedStream),

src/net_protocol.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,12 @@
3838
3939
use std::{fmt::Debug, sync::Arc};
4040

41-
use anyhow::Result;
42-
use iroh::{Endpoint, endpoint::Connection, protocol::ProtocolHandler};
43-
use n0_future::future::Boxed as BoxedFuture;
41+
use iroh::{
42+
Endpoint,
43+
endpoint::Connection,
44+
protocol::{AcceptError, ProtocolHandler},
45+
};
46+
use n0_watcher::Watcher;
4447
use tokio::sync::mpsc;
4548
use tracing::error;
4649

@@ -89,14 +92,17 @@ impl Blobs {
8992
/// just a convenience method to create a ticket from content and the address of this node.
9093
pub async fn ticket(&self, content: impl Into<HashAndFormat>) -> anyhow::Result<BlobTicket> {
9194
let content = content.into();
92-
let addr = self.inner.endpoint.node_addr().await?;
95+
let addr = self.inner.endpoint.node_addr().initialized().await?;
9396
let ticket = BlobTicket::new(addr, content.hash, content.format);
9497
Ok(ticket)
9598
}
9699
}
97100

98101
impl ProtocolHandler for Blobs {
99-
fn accept(&self, conn: Connection) -> BoxedFuture<Result<()>> {
102+
fn accept(
103+
&self,
104+
conn: Connection,
105+
) -> impl Future<Output = std::result::Result<(), AcceptError>> + Send {
100106
let store = self.store().clone();
101107
let events = self.inner.events.clone();
102108

@@ -106,7 +112,7 @@ impl ProtocolHandler for Blobs {
106112
})
107113
}
108114

109-
fn shutdown(&self) -> BoxedFuture<()> {
115+
fn shutdown(&self) -> impl Future<Output = ()> + Send {
110116
let store = self.store().clone();
111117
Box::pin(async move {
112118
if let Err(cause) = store.shutdown().await {

src/protocol.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ pub enum RequestType {
437437

438438
impl Request {
439439
pub async fn read_async(
440-
reader: &mut CountingReader<&mut quinn::RecvStream>,
440+
reader: &mut CountingReader<&mut iroh::endpoint::RecvStream>,
441441
) -> io::Result<Self> {
442442
let request_type = reader.read_u8().await?;
443443
let request_type: RequestType = postcard::from_bytes(std::slice::from_ref(&request_type))

src/provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ impl<R> CountingReader<R> {
732732
}
733733
}
734734

735-
impl CountingReader<&mut quinn::RecvStream> {
735+
impl CountingReader<&mut iroh::endpoint::RecvStream> {
736736
pub async fn read_to_end_as<T: DeserializeOwned>(&mut self, max_size: usize) -> io::Result<T> {
737737
let data = self
738738
.inner

src/tests.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use bytes::Bytes;
55
use iroh::{Endpoint, NodeId, protocol::Router};
66
use irpc::RpcMessage;
77
use n0_future::{StreamExt, task::AbortOnDropHandle};
8+
use n0_watcher::Watcher;
89
use tempfile::TempDir;
910
use testresult::TestResult;
1011
use tokio::sync::{mpsc, watch};
@@ -226,7 +227,7 @@ async fn two_nodes_get_blobs(
226227
for size in sizes {
227228
tts.push(store1.add_bytes(test_data(size)).await?);
228229
}
229-
let addr1 = r1.endpoint().node_addr().await?;
230+
let addr1 = r1.endpoint().node_addr().initialized().await?;
230231
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
231232
for size in sizes {
232233
let hash = Hash::new(test_data(size));
@@ -259,7 +260,7 @@ async fn two_nodes_observe(
259260
let size = 1024 * 1024 * 8 + 1;
260261
let data = test_data(size);
261262
let (hash, bao) = create_n0_bao(&data, &ChunkRanges::all())?;
262-
let addr1 = r1.endpoint().node_addr().await?;
263+
let addr1 = r1.endpoint().node_addr().initialized().await?;
263264
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
264265
let mut stream = store2
265266
.remote()
@@ -308,7 +309,7 @@ async fn two_nodes_get_many(
308309
tts.push(store1.add_bytes(test_data(size)).await?);
309310
}
310311
let hashes = tts.iter().map(|tt| tt.hash).collect::<Vec<_>>();
311-
let addr1 = r1.endpoint().node_addr().await?;
312+
let addr1 = r1.endpoint().node_addr().initialized().await?;
312313
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
313314
store2
314315
.remote()
@@ -381,7 +382,7 @@ async fn two_nodes_push_blobs(
381382
for size in sizes {
382383
tts.push(store1.add_bytes(test_data(size)).await?);
383384
}
384-
let addr2 = r2.endpoint().node_addr().await?;
385+
let addr2 = r2.endpoint().node_addr().initialized().await?;
385386
let conn = r1.endpoint().connect(addr2, crate::ALPN).await?;
386387
for size in sizes {
387388
let hash = Hash::new(test_data(size));
@@ -542,7 +543,7 @@ async fn two_nodes_hash_seq(
542543
r2: Router,
543544
store2: &Store,
544545
) -> TestResult<()> {
545-
let addr1 = r1.endpoint().node_addr().await?;
546+
let addr1 = r1.endpoint().node_addr().initialized().await?;
546547
let sizes = INTERESTING_SIZES;
547548
let root = add_test_hash_seq(store1, sizes).await?;
548549
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
@@ -569,7 +570,7 @@ async fn two_nodes_hash_seq_mem() -> TestResult<()> {
569570
async fn two_nodes_hash_seq_progress() -> TestResult<()> {
570571
tracing_subscriber::fmt::try_init().ok();
571572
let (_testdir, (r1, store1, _), (r2, store2, _)) = two_node_test_setup_fs().await?;
572-
let addr1 = r1.endpoint().node_addr().await?;
573+
let addr1 = r1.endpoint().node_addr().initialized().await?;
573574
let sizes = INTERESTING_SIZES;
574575
let root = add_test_hash_seq(&store1, sizes).await?;
575576
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
@@ -605,7 +606,7 @@ async fn node_serve_hash_seq() -> TestResult<()> {
605606
let r1 = Router::builder(endpoint)
606607
.accept(crate::protocol::ALPN, blobs)
607608
.spawn();
608-
let addr1 = r1.endpoint().node_addr().await?;
609+
let addr1 = r1.endpoint().node_addr().initialized().await?;
609610
info!("node addr: {addr1:?}");
610611
let endpoint2 = Endpoint::builder().discovery_n0().bind().await?;
611612
let conn = endpoint2.connect(addr1, crate::protocol::ALPN).await?;
@@ -636,7 +637,7 @@ async fn node_serve_blobs() -> TestResult<()> {
636637
let r1 = Router::builder(endpoint)
637638
.accept(crate::protocol::ALPN, blobs)
638639
.spawn();
639-
let addr1 = r1.endpoint().node_addr().await?;
640+
let addr1 = r1.endpoint().node_addr().initialized().await?;
640641
info!("node addr: {addr1:?}");
641642
let endpoint2 = Endpoint::builder().discovery_n0().bind().await?;
642643
let conn = endpoint2.connect(addr1, crate::protocol::ALPN).await?;
@@ -678,7 +679,7 @@ async fn node_smoke(store: &Store) -> TestResult<()> {
678679
let r1 = Router::builder(endpoint)
679680
.accept(crate::protocol::ALPN, blobs)
680681
.spawn();
681-
let addr1 = r1.endpoint().node_addr().await?;
682+
let addr1 = r1.endpoint().node_addr().initialized().await?;
682683
info!("node addr: {addr1:?}");
683684
let endpoint2 = Endpoint::builder().discovery_n0().bind().await?;
684685
let conn = endpoint2.connect(addr1, crate::protocol::ALPN).await?;

src/ticket.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ impl Ticket for BlobTicket {
7979
postcard::to_stdvec(&data).expect("postcard serialization failed")
8080
}
8181

82-
fn from_bytes(bytes: &[u8]) -> std::result::Result<Self, ticket::Error> {
83-
let res: TicketWireFormat = postcard::from_bytes(bytes).map_err(ticket::Error::Postcard)?;
82+
fn from_bytes(bytes: &[u8]) -> std::result::Result<Self, ticket::ParseError> {
83+
let res: TicketWireFormat =
84+
postcard::from_bytes(bytes).map_err(ticket::ParseError::Postcard)?;
8485
let TicketWireFormat::Variant0(Variant0BlobTicket { node, format, hash }) = res;
8586
Ok(Self {
8687
node: NodeAddr {
@@ -95,7 +96,7 @@ impl Ticket for BlobTicket {
9596
}
9697

9798
impl FromStr for BlobTicket {
98-
type Err = ticket::Error;
99+
type Err = ticket::ParseError;
99100

100101
fn from_str(s: &str) -> Result<Self, Self::Err> {
101102
Ticket::deserialize(s)

0 commit comments

Comments
 (0)