Skip to content

Commit 0611f76

Browse files
committed
Keep anyhow internally for now
1 parent 844c970 commit 0611f76

File tree

4 files changed

+64
-85
lines changed

4 files changed

+64
-85
lines changed

src/api/remote.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ use n0_future::{Stream, StreamExt, io};
88
use n0_snafu::SpanTrace;
99
use nested_enum_utils::common_fields;
1010
use ref_cast::RefCast;
11-
use snafu::{Backtrace, Snafu};
11+
use snafu::{Backtrace, IntoError, Snafu};
1212

1313
use super::blobs::Bitfield;
1414
use crate::{
1515
api::{ApiClient, blobs::WriteProgress},
16-
get::{GetError, GetResult, Stats, fsm::DecodeError},
16+
get::{BadRequestSnafu, GetError, GetResult, LocalFailureSnafu, Stats, fsm::DecodeError},
1717
protocol::{
1818
GetManyRequest, MAX_MESSAGE_SIZE, ObserveItem, ObserveRequest, PushRequest, Request,
1919
RequestType,
@@ -94,9 +94,7 @@ impl GetProgress {
9494

9595
pub async fn complete(self) -> GetResult<Stats> {
9696
just_result(self.stream()).await.unwrap_or_else(|| {
97-
Err(GetError::LocalFailure {
98-
source: anyhow::anyhow!("stream closed without result"),
99-
})
97+
Err(LocalFailureSnafu.into_error(anyhow::anyhow!("stream closed without result")))
10098
})
10199
}
102100
}
@@ -505,15 +503,15 @@ impl Remote {
505503
let local = self
506504
.local(content)
507505
.await
508-
.map_err(|source| GetError::LocalFailure { source })?;
506+
.map_err(|e| LocalFailureSnafu.into_error(e))?;
509507
if local.is_complete() {
510508
return Ok(Default::default());
511509
}
512510
let request = local.missing();
513511
let conn = conn
514512
.connection()
515513
.await
516-
.map_err(|source| GetError::LocalFailure { source })?;
514+
.map_err(|e| LocalFailureSnafu.into_error(e))?;
517515
let stats = self.execute_get_sink(conn, request, progress).await?;
518516
Ok(stats)
519517
}
@@ -682,9 +680,9 @@ impl Remote {
682680
store
683681
.get_bytes(root)
684682
.await
685-
.map_err(|e| GetError::LocalFailure { source: e.into() })?,
683+
.map_err(|e| LocalFailureSnafu.into_error(e.into()))?,
686684
)
687-
.map_err(|source| GetError::BadRequest { source })?;
685+
.map_err(|source| BadRequestSnafu.into_error(source))?;
688686
// let mut hash_seq = LazyHashSeq::new(store.blobs().clone(), root);
689687
loop {
690688
let at_start_child = match next_child {
@@ -889,7 +887,7 @@ async fn get_blob_ranges_impl(
889887
let handle = store
890888
.import_bao(hash, size, buffer_size)
891889
.await
892-
.map_err(|e| GetError::LocalFailure { source: e.into() })?;
890+
.map_err(|e| LocalFailureSnafu.into_error(e.into()))?;
893891
let write = async move {
894892
GetResult::Ok(loop {
895893
match content.next().await {
@@ -898,7 +896,7 @@ async fn get_blob_ranges_impl(
898896
progress
899897
.send(next.stats().payload_bytes_read)
900898
.await
901-
.map_err(|e| GetError::LocalFailure { source: e.into() })?;
899+
.map_err(|e| LocalFailureSnafu.into_error(e.into()))?;
902900
handle.tx.send(item).await?;
903901
content = next;
904902
}
@@ -910,8 +908,8 @@ async fn get_blob_ranges_impl(
910908
})
911909
};
912910
let complete = async move {
913-
handle.rx.await.map_err(|e| GetError::LocalFailure {
914-
source: anyhow::anyhow!("error reading from import stream: {e}"),
911+
handle.rx.await.map_err(|e| {
912+
LocalFailureSnafu.into_error(anyhow::anyhow!("error reading from import stream: {e}"))
915913
})
916914
};
917915
let (_, end) = tokio::try_join!(complete, write)?;

src/get.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::{Hash, protocol::ChunkRangesSeq, store::IROH_BLOCK_SIZE};
3737

3838
mod error;
3939
pub mod request;
40+
pub(crate) use error::{BadRequestSnafu, LocalFailureSnafu};
4041
pub use error::{GetError, GetResult};
4142

4243
type WrappedRecvStream = TokioStreamReader<RecvStream>;
@@ -98,8 +99,11 @@ pub mod fsm {
9899
use iroh_io::{AsyncSliceWriter, AsyncStreamReader, TokioStreamReader};
99100

100101
use super::*;
101-
use crate::protocol::{
102-
GetManyRequest, GetRequest, MAX_MESSAGE_SIZE, NonEmptyRequestRangeSpecIter, Request,
102+
use crate::{
103+
get::error::BadRequestSnafu,
104+
protocol::{
105+
GetManyRequest, GetRequest, MAX_MESSAGE_SIZE, NonEmptyRequestRangeSpecIter, Request,
106+
},
103107
};
104108

105109
self_cell::self_cell! {
@@ -128,10 +132,8 @@ pub mod fsm {
128132
let start = Instant::now();
129133
let (mut writer, reader) = connection.open_bi().await?;
130134
let request = Request::GetMany(request);
131-
let request_bytes =
132-
postcard::to_stdvec(&request).map_err(|source| GetError::BadRequest {
133-
source: source.into(),
134-
})?;
135+
let request_bytes = postcard::to_stdvec(&request)
136+
.map_err(|source| BadRequestSnafu.into_error(source.into()))?;
135137
writer.write_all(&request_bytes).await?;
136138
writer.finish()?;
137139
let Request::GetMany(request) = request else {

src/get/error.rs

Lines changed: 40 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use iroh::endpoint::{self, ClosedStream};
44
use n0_snafu::SpanTrace;
55
use nested_enum_utils::common_fields;
6-
use snafu::{Backtrace, Snafu};
6+
use snafu::{Backtrace, IntoError, Snafu};
77

88
#[common_fields({
99
backtrace: Option<Backtrace>,
@@ -17,6 +17,7 @@ pub enum GetNotFoundError {
1717

1818
/// Failures for a get operation
1919
#[derive(Debug, Snafu)]
20+
#[snafu(visibility(pub(crate)))]
2021
pub enum GetError {
2122
/// Hash not found, or a requested chunk for the hash not found.
2223
#[snafu(display("Data for hash not found"))]
@@ -44,17 +45,13 @@ pub type GetResult<T> = std::result::Result<T, GetError>;
4445

4546
impl From<irpc::channel::SendError> for GetError {
4647
fn from(value: irpc::channel::SendError) -> Self {
47-
Self::LocalFailure {
48-
source: value.into(),
49-
}
48+
LocalFailureSnafu.into_error(value.into())
5049
}
5150
}
5251

5352
impl<T: Send + Sync + 'static> From<tokio::sync::mpsc::error::SendError<T>> for GetError {
5453
fn from(value: tokio::sync::mpsc::error::SendError<T>) -> Self {
55-
Self::LocalFailure {
56-
source: value.into(),
57-
}
54+
LocalFailureSnafu.into_error(value.into())
5855
}
5956
}
6057

@@ -66,40 +63,40 @@ impl From<endpoint::ConnectionError> for GetError {
6663
e @ ConnectionError::VersionMismatch => {
6764
// > The peer doesn't implement any supported version
6865
// unsupported version is likely a long time error, so this peer is not usable
69-
GetError::NoncompliantNode { source: e.into() }
66+
NoncompliantNodeSnafu.into_error(e.into())
7067
}
7168
e @ ConnectionError::TransportError(_) => {
7269
// > The peer violated the QUIC specification as understood by this implementation
7370
// bad peer we don't want to keep around
74-
GetError::NoncompliantNode { source: e.into() }
71+
NoncompliantNodeSnafu.into_error(e.into())
7572
}
7673
e @ ConnectionError::ConnectionClosed(_) => {
7774
// > The peer's QUIC stack aborted the connection automatically
7875
// peer might be disconnecting or otherwise unavailable, drop it
79-
GetError::Io { source: e.into() }
76+
IoSnafu.into_error(e.into())
8077
}
8178
e @ ConnectionError::ApplicationClosed(_) => {
8279
// > The peer closed the connection
8380
// peer might be disconnecting or otherwise unavailable, drop it
84-
GetError::Io { source: e.into() }
81+
IoSnafu.into_error(e.into())
8582
}
8683
e @ ConnectionError::Reset => {
8784
// > The peer is unable to continue processing this connection, usually due to having restarted
88-
GetError::RemoteReset { source: e.into() }
85+
RemoteResetSnafu.into_error(e.into())
8986
}
9087
e @ ConnectionError::TimedOut => {
9188
// > Communication with the peer has lapsed for longer than the negotiated idle timeout
92-
GetError::Io { source: e.into() }
89+
IoSnafu.into_error(e.into())
9390
}
9491
e @ ConnectionError::LocallyClosed => {
9592
// > The local application closed the connection
9693
// TODO(@divma): don't see how this is reachable but let's just not use the peer
97-
GetError::Io { source: e.into() }
94+
IoSnafu.into_error(e.into())
9895
}
9996
e @ ConnectionError::CidsExhausted => {
10097
// > The connection could not be created because not enough of the CID space
10198
// > is available
102-
GetError::Io { source: e.into() }
99+
IoSnafu.into_error(e.into())
103100
}
104101
}
105102
}
@@ -109,38 +106,32 @@ impl From<endpoint::ReadError> for GetError {
109106
fn from(value: endpoint::ReadError) -> Self {
110107
use endpoint::ReadError;
111108
match value {
112-
e @ ReadError::Reset(_) => GetError::RemoteReset { source: e.into() },
109+
e @ ReadError::Reset(_) => RemoteResetSnafu.into_error(e.into()),
113110
ReadError::ConnectionLost(conn_error) => conn_error.into(),
114111
ReadError::ClosedStream
115112
| ReadError::IllegalOrderedRead
116113
| ReadError::ZeroRttRejected => {
117114
// all these errors indicate the peer is not usable at this moment
118-
GetError::Io {
119-
source: value.into(),
120-
}
115+
IoSnafu.into_error(value.into())
121116
}
122117
}
123118
}
124119
}
125120
impl From<ClosedStream> for GetError {
126121
fn from(value: ClosedStream) -> Self {
127-
GetError::Io {
128-
source: value.into(),
129-
}
122+
IoSnafu.into_error(value.into())
130123
}
131124
}
132125

133126
impl From<quinn::WriteError> for GetError {
134127
fn from(value: quinn::WriteError) -> Self {
135128
use quinn::WriteError;
136129
match value {
137-
e @ WriteError::Stopped(_) => GetError::RemoteReset { source: e.into() },
130+
e @ WriteError::Stopped(_) => RemoteResetSnafu.into_error(e.into()),
138131
WriteError::ConnectionLost(conn_error) => conn_error.into(),
139132
WriteError::ClosedStream | WriteError::ZeroRttRejected => {
140133
// all these errors indicate the peer is not usable at this moment
141-
GetError::Io {
142-
source: value.into(),
143-
}
134+
IoSnafu.into_error(value.into())
144135
}
145136
}
146137
}
@@ -152,17 +143,17 @@ impl From<crate::get::fsm::ConnectedNextError> for GetError {
152143
match value {
153144
e @ PostcardSer { .. } => {
154145
// serialization errors indicate something wrong with the request itself
155-
GetError::BadRequest { source: e.into() }
146+
BadRequestSnafu.into_error(e.into())
156147
}
157148
e @ RequestTooBig { .. } => {
158149
// request will never be sent, drop it
159-
GetError::BadRequest { source: e.into() }
150+
BadRequestSnafu.into_error(e.into())
160151
}
161152
Write { source, .. } => source.into(),
162153
Closed { source, .. } => source.into(),
163154
e @ Io { .. } => {
164155
// io errors are likely recoverable
165-
GetError::Io { source: e.into() }
156+
IoSnafu.into_error(e.into())
166157
}
167158
}
168159
}
@@ -178,17 +169,15 @@ impl From<crate::get::fsm::AtBlobHeaderNextError> for GetError {
178169
} => {
179170
// > This indicates that the provider does not have the requested data.
180171
// peer might have the data later, simply retry it
181-
GetError::NotFound {
182-
source: GetNotFoundError::AtBlobHeader {
183-
backtrace,
184-
span_trace,
185-
},
186-
}
172+
NotFoundSnafu.into_error(GetNotFoundError::AtBlobHeader {
173+
backtrace,
174+
span_trace,
175+
})
187176
}
188177
EndpointRead { source, .. } => source.into(),
189178
e @ Io { .. } => {
190179
// io errors are likely recoverable
191-
GetError::Io { source: e.into() }
180+
IoSnafu.into_error(e.into())
192181
}
193182
}
194183
}
@@ -202,41 +191,35 @@ impl From<crate::get::fsm::DecodeError> for GetError {
202191
ChunkNotFound {
203192
backtrace,
204193
span_trace,
205-
} => GetError::NotFound {
206-
source: GetNotFoundError::AtBlobHeader {
207-
backtrace,
208-
span_trace,
209-
},
210-
},
194+
} => NotFoundSnafu.into_error(GetNotFoundError::AtBlobHeader {
195+
backtrace,
196+
span_trace,
197+
}),
211198
ParentNotFound {
212199
backtrace,
213200
span_trace,
214201
..
215-
} => GetError::NotFound {
216-
source: GetNotFoundError::AtBlobHeader {
217-
backtrace,
218-
span_trace,
219-
},
220-
},
202+
} => NotFoundSnafu.into_error(GetNotFoundError::AtBlobHeader {
203+
backtrace,
204+
span_trace,
205+
}),
221206
LeafNotFound {
222207
backtrace,
223208
span_trace,
224209
..
225-
} => GetError::NotFound {
226-
source: GetNotFoundError::AtBlobHeader {
227-
backtrace,
228-
span_trace,
229-
},
230-
},
210+
} => NotFoundSnafu.into_error(GetNotFoundError::AtBlobHeader {
211+
backtrace,
212+
span_trace,
213+
}),
231214
e @ ParentHashMismatch { .. } => {
232215
// TODO(@divma): did the peer sent wrong data? is it corrupted? did we sent a wrong
233216
// request?
234-
GetError::NoncompliantNode { source: e.into() }
217+
NoncompliantNodeSnafu.into_error(e.into())
235218
}
236219
e @ LeafHashMismatch { .. } => {
237220
// TODO(@divma): did the peer sent wrong data? is it corrupted? did we sent a wrong
238221
// request?
239-
GetError::NoncompliantNode { source: e.into() }
222+
NoncompliantNodeSnafu.into_error(e.into())
240223
}
241224
Read { source, .. } => source.into(),
242225
DecodeIo { source, .. } => source.into(),
@@ -248,8 +231,6 @@ impl From<std::io::Error> for GetError {
248231
fn from(value: std::io::Error) -> Self {
249232
// generally consider io errors recoverable
250233
// we might want to revisit this at some point
251-
GetError::Io {
252-
source: value.into(),
253-
}
234+
IoSnafu.into_error(value.into())
254235
}
255236
}

0 commit comments

Comments
 (0)