Skip to content

WIP refactor: convert thiserror to snafu and remove anyhow in public apis #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,476 changes: 653 additions & 823 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 16 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ bao-tree = { version = "0.15.1", features = ["experimental-mixed", "tokio_fsm",
bytes = { version = "1", features = ["serde"] }
derive_more = { version = "2.0.1", features = ["from", "try_from", "into", "debug", "display", "deref", "deref_mut"] }
futures-lite = "2.6.0"
quinn = { package = "iroh-quinn", version = "0.13.0" }
quinn = { package = "iroh-quinn", version = "0.14.0" }
n0-future = "0.1.2"
n0-snafu = "0.2.0"
n0-watcher = "0.2.0"
range-collections = { version = "0.4.6", features = ["serde"] }
redb = "2.4.0"
smallvec = { version = "1", features = ["serde", "const_new"] }
thiserror = "2.0.11"
snafu = "0.8.5"
tokio = { version = "1.43.0", features = ["full"] }
tokio-util = { version = "0.7.13", features = ["full"] }
tracing = "0.1.41"
Expand All @@ -25,19 +27,20 @@ serde = "1.0.217"
postcard = { version = "1.1.1", features = ["experimental-derive", "use-std"] }
data-encoding = "2.8.0"
chrono = "0.4.39"
nested_enum_utils = "0.1.0"
nested_enum_utils = "0.2.1"
ref-cast = "1.0.24"
arrayvec = "0.7.6"
iroh = "0.35.0"
iroh = "0.35"
self_cell = "1.1.0"
genawaiter = { version = "0.99.1", features = ["futures03"] }
iroh-base = "0.35.0"
iroh-base = "0.35"
reflink-copy = "0.1.24"
irpc = { version = "0.3.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream"], default-features = false }
irpc-derive = { version = "0.3.0", default-features = false }
irpc = { version = "0.4.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream"], default-features = false }
irpc-derive = { version = "0.4.0", default-features = false }
iroh-metrics = { version = "0.32.0" }
hashlink = "0.10.0"
futures-buffered = "0.2.11"
thiserror = "2.0.12"

[dev-dependencies]
clap = { version = "4.5.31", features = ["derive"] }
Expand All @@ -57,3 +60,9 @@ walkdir = "2.5.0"
hide-proto-docs = []
metrics = []
default = ["hide-proto-docs"]

[patch.crates-io]
iroh = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
iroh-base = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
irpc = { git = "https://github.com/n0-computer/irpc.git", branch = "iroh-quinn-latest" }
irpc-derive = { git = "https://github.com/n0-computer/irpc.git", branch = "iroh-quinn-latest" }
38 changes: 19 additions & 19 deletions src/api/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use bao_tree::{
use bytes::Bytes;
use genawaiter::sync::Gen;
use iroh_io::{AsyncStreamReader, TokioStreamReader};
use irpc::channel::{oneshot, spsc};
use irpc::channel::{mpsc, oneshot};
use n0_future::{Stream, StreamExt, future, stream};
use quinn::SendStream;
use range_collections::{RangeSet2, range_set::RangeSetRange};
Expand Down Expand Up @@ -225,7 +225,7 @@ impl Blobs {
};
let client = self.client.clone();
let stream = Gen::new(|co| async move {
let (mut sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
Ok(x) => x,
Err(cause) => {
co.yield_(AddProgressItem::Error(cause.into())).await;
Expand Down Expand Up @@ -338,7 +338,7 @@ impl Blobs {
trace!("{:?}", options);
if options.hash == Hash::EMPTY {
return ObserveProgress::new(async move {
let (mut tx, rx) = spsc::channel(1);
let (tx, rx) = mpsc::channel(1);
tx.send(Bitfield::complete(0)).await.ok();
Ok(rx)
});
Expand Down Expand Up @@ -405,7 +405,7 @@ impl Blobs {
let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
let mut decoder = ResponseDecoder::new(hash.into(), ranges, tree, reader);
let options = ImportBaoOptions { hash, size };
let mut handle = self.import_bao_with_opts(options, 32).await?;
let handle = self.import_bao_with_opts(options, 32).await?;
let driver = async move {
let reader = loop {
match decoder.next().await {
Expand All @@ -430,7 +430,7 @@ impl Blobs {
&self,
hash: Hash,
ranges: ChunkRanges,
stream: &mut quinn::RecvStream,
stream: &mut iroh::endpoint::RecvStream,
) -> RequestResult<()> {
let reader = TokioStreamReader::new(stream);
self.import_bao_reader(hash, ranges, reader).await?;
Expand Down Expand Up @@ -509,7 +509,7 @@ impl<'a> BatchAddProgress<'a> {
pub struct Batch<'a> {
scope: Scope,
blobs: &'a Blobs,
_tx: spsc::Sender<BatchResponse>,
_tx: mpsc::Sender<BatchResponse>,
}

impl<'a> Batch<'a> {
Expand Down Expand Up @@ -647,7 +647,7 @@ impl<'a> AddProgress<'a> {
/// Calling [`ObserveProgress::aggregated`] will return a stream of states,
/// where each state is the current state at the time of the update.
pub struct ObserveProgress {
inner: future::Boxed<irpc::Result<spsc::Receiver<Bitfield>>>,
inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
}

impl IntoFuture for ObserveProgress {
Expand All @@ -668,7 +668,7 @@ impl IntoFuture for ObserveProgress {

impl ObserveProgress {
fn new(
fut: impl Future<Output = irpc::Result<spsc::Receiver<Bitfield>>> + Send + 'static,
fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
) -> Self {
Self {
inner: Box::pin(fut),
Expand Down Expand Up @@ -710,7 +710,7 @@ impl ObserveProgress {
/// It also implements [`IntoFuture`], so you can await it to get the size of the
/// exported blob.
pub struct ExportProgress {
inner: future::Boxed<irpc::Result<spsc::Receiver<ExportProgressItem>>>,
inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
}

impl IntoFuture for ExportProgress {
Expand All @@ -725,7 +725,7 @@ impl IntoFuture for ExportProgress {

impl ExportProgress {
fn new(
fut: impl Future<Output = irpc::Result<spsc::Receiver<ExportProgressItem>>> + Send + 'static,
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
) -> Self {
Self {
inner: Box::pin(fut),
Expand Down Expand Up @@ -768,15 +768,15 @@ impl ExportProgress {

/// A handle for an ongoing bao import operation.
pub struct ImportBaoHandle {
pub tx: spsc::Sender<BaoContentItem>,
pub tx: mpsc::Sender<BaoContentItem>,
pub rx: oneshot::Receiver<super::Result<()>>,
}

impl ImportBaoHandle {
pub(crate) async fn new(
fut: impl Future<
Output = irpc::Result<(
spsc::Sender<BaoContentItem>,
mpsc::Sender<BaoContentItem>,
oneshot::Receiver<super::Result<()>>,
)>,
> + Send
Expand All @@ -789,20 +789,20 @@ impl ImportBaoHandle {

/// A progress handle for a blobs list operation.
pub struct BlobsListProgress {
inner: future::Boxed<irpc::Result<spsc::Receiver<super::Result<Hash>>>>,
inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
}

impl BlobsListProgress {
fn new(
fut: impl Future<Output = irpc::Result<spsc::Receiver<super::Result<Hash>>>> + Send + 'static,
fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
) -> Self {
Self {
inner: Box::pin(fut),
}
}

pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
let mut rx: spsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
let mut hashes = Vec::new();
while let Some(item) = rx.recv().await? {
hashes.push(item?);
Expand All @@ -829,13 +829,13 @@ impl BlobsListProgress {
/// You can get access to the underlying stream using the [`ExportBaoResult::stream`] method.
pub struct ExportRangesProgress {
ranges: RangeSet2<u64>,
inner: future::Boxed<irpc::Result<spsc::Receiver<ExportRangesItem>>>,
inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
}

impl ExportRangesProgress {
fn new(
ranges: RangeSet2<u64>,
fut: impl Future<Output = irpc::Result<spsc::Receiver<ExportRangesItem>>> + Send + 'static,
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
) -> Self {
Self {
ranges,
Expand Down Expand Up @@ -909,12 +909,12 @@ impl ExportRangesProgress {
///
/// You can get access to the underlying stream using the [`ExportBaoResult::stream`] method.
pub struct ExportBaoProgress {
inner: future::Boxed<irpc::Result<spsc::Receiver<EncodedItem>>>,
inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
}

impl ExportBaoProgress {
fn new(
fut: impl Future<Output = irpc::Result<spsc::Receiver<EncodedItem>>> + Send + 'static,
fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
) -> Self {
Self {
inner: Box::pin(fut),
Expand Down
28 changes: 15 additions & 13 deletions src/api/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::{
use anyhow::bail;
use genawaiter::sync::Gen;
use iroh::{Endpoint, NodeId, endpoint::Connection};
use irpc::channel::spsc;
use irpc_derive::rpc_requests;
use n0_future::{BufferedStreamExt, Stream, StreamExt, future, stream};
use rand::seq::SliceRandom;
Expand Down Expand Up @@ -43,7 +42,7 @@ impl irpc::Service for DownloaderService {}
#[rpc_requests(DownloaderService, message = SwarmMsg, alias = "Msg")]
#[derive(Debug, Serialize, Deserialize)]
enum SwarmProtocol {
#[rpc(tx = spsc::Sender<DownloadProgessItem>)]
#[rpc(tx = irpc::channel::mpsc::Sender<DownloadProgessItem>)]
Download(DownloadRequest),
}

Expand Down Expand Up @@ -115,7 +114,7 @@ async fn handle_download_impl(
store: Store,
pool: ConnectionPool,
request: DownloadRequest,
tx: &mut spsc::Sender<DownloadProgessItem>,
tx: &mut irpc::channel::mpsc::Sender<DownloadProgessItem>,
) -> anyhow::Result<()> {
match request.strategy {
SplitStrategy::Split => handle_download_split_impl(store, pool, request, tx).await?,
Expand All @@ -136,7 +135,7 @@ async fn handle_download_split_impl(
store: Store,
pool: ConnectionPool,
request: DownloadRequest,
tx: &mut spsc::Sender<DownloadProgessItem>,
tx: &mut irpc::channel::mpsc::Sender<DownloadProgessItem>,
) -> anyhow::Result<()> {
let providers = request.providers;
let requests = split_request(&request.request, &providers, &pool, &store, Drain).await?;
Expand Down Expand Up @@ -309,11 +308,13 @@ impl<'de> Deserialize<'de> for DownloadRequest {
pub type DownloadOptions = DownloadRequest;

pub struct DownloadProgress {
fut: future::Boxed<irpc::Result<spsc::Receiver<DownloadProgessItem>>>,
fut: future::Boxed<irpc::Result<irpc::channel::mpsc::Receiver<DownloadProgessItem>>>,
}

impl DownloadProgress {
fn new(fut: future::Boxed<irpc::Result<spsc::Receiver<DownloadProgessItem>>>) -> Self {
fn new(
fut: future::Boxed<irpc::Result<irpc::channel::mpsc::Receiver<DownloadProgessItem>>>,
) -> Self {
Self { fut }
}

Expand Down Expand Up @@ -630,7 +631,7 @@ impl DialNode {
Ok(Err(e)) => {
warn!("Failed to connect to node {}: {}", self.id, e);
*guard = SlotState::AttemptFailed(SystemTime::now());
Err(e)
Err(e.into())
}
Err(e) => {
warn!("Failed to connect to node {}: {}", self.id, e);
Expand Down Expand Up @@ -690,6 +691,7 @@ mod tests {

use bao_tree::ChunkRanges;
use n0_future::StreamExt;
use n0_watcher::Watcher;
use testresult::TestResult;

use crate::{
Expand All @@ -710,9 +712,9 @@ mod tests {
let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
let tt1 = store1.add_slice("hello world").await?;
let tt2 = store2.add_slice("hello world 2").await?;
let node1_addr = r1.endpoint().node_addr().await?;
let node1_addr = r1.endpoint().node_addr().initialized().await?;
let node1_id = node1_addr.node_id;
let node2_addr = r2.endpoint().node_addr().await?;
let node2_addr = r2.endpoint().node_addr().initialized().await?;
let node2_id = node2_addr.node_id;
let swarm = Downloader::new(&store3, r3.endpoint());
r3.endpoint().add_node_addr(node1_addr.clone())?;
Expand Down Expand Up @@ -749,9 +751,9 @@ mod tests {
format: crate::BlobFormat::HashSeq,
})
.await?;
let node1_addr = r1.endpoint().node_addr().await?;
let node1_addr = r1.endpoint().node_addr().initialized().await?;
let node1_id = node1_addr.node_id;
let node2_addr = r2.endpoint().node_addr().await?;
let node2_addr = r2.endpoint().node_addr().initialized().await?;
let node2_id = node2_addr.node_id;
let swarm = Downloader::new(&store3, r3.endpoint());
r3.endpoint().add_node_addr(node1_addr.clone())?;
Expand Down Expand Up @@ -818,9 +820,9 @@ mod tests {
format: crate::BlobFormat::HashSeq,
})
.await?;
let node1_addr = r1.endpoint().node_addr().await?;
let node1_addr = r1.endpoint().node_addr().initialized().await?;
let node1_id = node1_addr.node_id;
let node2_addr = r2.endpoint().node_addr().await?;
let node2_addr = r2.endpoint().node_addr().initialized().await?;
let node2_id = node2_addr.node_id;
let swarm = Downloader::new(&store3, r3.endpoint());
r3.endpoint().add_node_addr(node1_addr.clone())?;
Expand Down
22 changes: 11 additions & 11 deletions src/api/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use bao_tree::{
io::{BaoContentItem, Leaf, mixed::EncodedItem},
};
use bytes::Bytes;
use irpc::channel::{oneshot, spsc};
use irpc::channel::{mpsc, oneshot};
use irpc_derive::rpc_requests;
use n0_future::Stream;
use range_collections::RangeSet2;
Expand Down Expand Up @@ -91,29 +91,29 @@ impl irpc::Service for StoreService {}
#[rpc_requests(StoreService, message = Command, alias = "Msg")]
#[derive(Debug, Serialize, Deserialize)]
pub enum Request {
#[rpc(tx = spsc::Sender<super::Result<Hash>>)]
#[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
ListBlobs(ListRequest),
#[rpc(tx = oneshot::Sender<Scope>, rx = spsc::Receiver<BatchResponse>)]
#[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
Batch(BatchRequest),
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
DeleteBlobs(BlobDeleteRequest),
#[rpc(rx = spsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
#[rpc(rx = mpsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
ImportBao(ImportBaoRequest),
#[rpc(tx = spsc::Sender<EncodedItem>)]
#[rpc(tx = mpsc::Sender<EncodedItem>)]
ExportBao(ExportBaoRequest),
#[rpc(tx = spsc::Sender<ExportRangesItem>)]
#[rpc(tx = mpsc::Sender<ExportRangesItem>)]
ExportRanges(ExportRangesRequest),
#[rpc(tx = spsc::Sender<Bitfield>)]
#[rpc(tx = mpsc::Sender<Bitfield>)]
Observe(ObserveRequest),
#[rpc(tx = oneshot::Sender<BlobStatus>)]
BlobStatus(BlobStatusRequest),
#[rpc(tx = spsc::Sender<AddProgressItem>)]
#[rpc(tx = mpsc::Sender<AddProgressItem>)]
ImportBytes(ImportBytesRequest),
#[rpc(rx = spsc::Receiver<ImportByteStreamUpdate>, tx = spsc::Sender<AddProgressItem>)]
#[rpc(rx = mpsc::Receiver<ImportByteStreamUpdate>, tx = mpsc::Sender<AddProgressItem>)]
ImportByteStream(ImportByteStreamRequest),
#[rpc(tx = spsc::Sender<AddProgressItem>)]
#[rpc(tx = mpsc::Sender<AddProgressItem>)]
ImportPath(ImportPathRequest),
#[rpc(tx = spsc::Sender<ExportProgressItem>)]
#[rpc(tx = mpsc::Sender<ExportProgressItem>)]
ExportPath(ExportPathRequest),
#[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
ListTags(ListTagsRequest),
Expand Down
4 changes: 2 additions & 2 deletions src/api/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
//!
//! The entry point is the [`Download`] struct.
use genawaiter::sync::{Co, Gen};
use iroh::endpoint::SendStream;
use irpc::util::{AsyncReadVarintExt, WriteVarintExt};
use n0_future::{Stream, StreamExt, io};
use quinn::SendStream;
use ref_cast::RefCast;

use super::blobs::Bitfield;
Expand Down Expand Up @@ -860,7 +860,7 @@ async fn get_blob_ranges_impl(
};
let buffer_size = get_buffer_size(size);
trace!(%size, %buffer_size, "get blob");
let mut handle = store
let handle = store
.import_bao(hash, size, buffer_size)
.await
.map_err(|e| GetError::LocalFailure(e.into()))?;
Expand Down
Loading