Skip to content

Commit 11d0537

Browse files
committed
Update to latest irpc
1 parent 3711890 commit 11d0537

File tree

16 files changed

+128
-130
lines changed

16 files changed

+128
-130
lines changed

Cargo.lock

Lines changed: 5 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ self_cell = "1.1.0"
3333
genawaiter = { version = "0.99.1", features = ["futures03"] }
3434
iroh-base = "0.35.0"
3535
reflink-copy = "0.1.24"
36-
irpc = { version = "0.3.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream"], default-features = false }
37-
irpc-derive = { version = "0.3.0", default-features = false }
36+
irpc = { version = "0.4.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream"], default-features = false }
37+
irpc-derive = { version = "0.4.0", default-features = false }
3838
iroh-metrics = { version = "0.32.0" }
3939
hashlink = "0.10.0"
4040
futures-buffered = "0.2.11"

src/api/blobs.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use bao_tree::{
2424
use bytes::Bytes;
2525
use genawaiter::sync::Gen;
2626
use iroh_io::{AsyncStreamReader, TokioStreamReader};
27-
use irpc::channel::{oneshot, spsc};
27+
use irpc::channel::{mpsc, oneshot};
2828
use n0_future::{Stream, StreamExt, future, stream};
2929
use quinn::SendStream;
3030
use range_collections::{RangeSet2, range_set::RangeSetRange};
@@ -225,7 +225,7 @@ impl Blobs {
225225
};
226226
let client = self.client.clone();
227227
let stream = Gen::new(|co| async move {
228-
let (mut sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
228+
let (sender, mut receiver) = match client.bidi_streaming(inner, 32, 32).await {
229229
Ok(x) => x,
230230
Err(cause) => {
231231
co.yield_(AddProgressItem::Error(cause.into())).await;
@@ -338,7 +338,7 @@ impl Blobs {
338338
trace!("{:?}", options);
339339
if options.hash == Hash::EMPTY {
340340
return ObserveProgress::new(async move {
341-
let (mut tx, rx) = spsc::channel(1);
341+
let (tx, rx) = mpsc::channel(1);
342342
tx.send(Bitfield::complete(0)).await.ok();
343343
Ok(rx)
344344
});
@@ -405,7 +405,7 @@ impl Blobs {
405405
let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
406406
let mut decoder = ResponseDecoder::new(hash.into(), ranges, tree, reader);
407407
let options = ImportBaoOptions { hash, size };
408-
let mut handle = self.import_bao_with_opts(options, 32).await?;
408+
let handle = self.import_bao_with_opts(options, 32).await?;
409409
let driver = async move {
410410
let reader = loop {
411411
match decoder.next().await {
@@ -509,7 +509,7 @@ impl<'a> BatchAddProgress<'a> {
509509
pub struct Batch<'a> {
510510
scope: Scope,
511511
blobs: &'a Blobs,
512-
_tx: spsc::Sender<BatchResponse>,
512+
_tx: mpsc::Sender<BatchResponse>,
513513
}
514514

515515
impl<'a> Batch<'a> {
@@ -647,7 +647,7 @@ impl<'a> AddProgress<'a> {
647647
/// Calling [`ObserveProgress::aggregated`] will return a stream of states,
648648
/// where each state is the current state at the time of the update.
649649
pub struct ObserveProgress {
650-
inner: future::Boxed<irpc::Result<spsc::Receiver<Bitfield>>>,
650+
inner: future::Boxed<irpc::Result<mpsc::Receiver<Bitfield>>>,
651651
}
652652

653653
impl IntoFuture for ObserveProgress {
@@ -668,7 +668,7 @@ impl IntoFuture for ObserveProgress {
668668

669669
impl ObserveProgress {
670670
fn new(
671-
fut: impl Future<Output = irpc::Result<spsc::Receiver<Bitfield>>> + Send + 'static,
671+
fut: impl Future<Output = irpc::Result<mpsc::Receiver<Bitfield>>> + Send + 'static,
672672
) -> Self {
673673
Self {
674674
inner: Box::pin(fut),
@@ -710,7 +710,7 @@ impl ObserveProgress {
710710
/// It also implements [`IntoFuture`], so you can await it to get the size of the
711711
/// exported blob.
712712
pub struct ExportProgress {
713-
inner: future::Boxed<irpc::Result<spsc::Receiver<ExportProgressItem>>>,
713+
inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportProgressItem>>>,
714714
}
715715

716716
impl IntoFuture for ExportProgress {
@@ -725,7 +725,7 @@ impl IntoFuture for ExportProgress {
725725

726726
impl ExportProgress {
727727
fn new(
728-
fut: impl Future<Output = irpc::Result<spsc::Receiver<ExportProgressItem>>> + Send + 'static,
728+
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportProgressItem>>> + Send + 'static,
729729
) -> Self {
730730
Self {
731731
inner: Box::pin(fut),
@@ -768,15 +768,15 @@ impl ExportProgress {
768768

769769
/// A handle for an ongoing bao import operation.
770770
pub struct ImportBaoHandle {
771-
pub tx: spsc::Sender<BaoContentItem>,
771+
pub tx: mpsc::Sender<BaoContentItem>,
772772
pub rx: oneshot::Receiver<super::Result<()>>,
773773
}
774774

775775
impl ImportBaoHandle {
776776
pub(crate) async fn new(
777777
fut: impl Future<
778778
Output = irpc::Result<(
779-
spsc::Sender<BaoContentItem>,
779+
mpsc::Sender<BaoContentItem>,
780780
oneshot::Receiver<super::Result<()>>,
781781
)>,
782782
> + Send
@@ -789,20 +789,20 @@ impl ImportBaoHandle {
789789

790790
/// A progress handle for a blobs list operation.
791791
pub struct BlobsListProgress {
792-
inner: future::Boxed<irpc::Result<spsc::Receiver<super::Result<Hash>>>>,
792+
inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
793793
}
794794

795795
impl BlobsListProgress {
796796
fn new(
797-
fut: impl Future<Output = irpc::Result<spsc::Receiver<super::Result<Hash>>>> + Send + 'static,
797+
fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
798798
) -> Self {
799799
Self {
800800
inner: Box::pin(fut),
801801
}
802802
}
803803

804804
pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
805-
let mut rx: spsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
805+
let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
806806
let mut hashes = Vec::new();
807807
while let Some(item) = rx.recv().await? {
808808
hashes.push(item?);
@@ -829,13 +829,13 @@ impl BlobsListProgress {
829829
/// You can get access to the underlying stream using the [`ExportBaoResult::stream`] method.
830830
pub struct ExportRangesProgress {
831831
ranges: RangeSet2<u64>,
832-
inner: future::Boxed<irpc::Result<spsc::Receiver<ExportRangesItem>>>,
832+
inner: future::Boxed<irpc::Result<mpsc::Receiver<ExportRangesItem>>>,
833833
}
834834

835835
impl ExportRangesProgress {
836836
fn new(
837837
ranges: RangeSet2<u64>,
838-
fut: impl Future<Output = irpc::Result<spsc::Receiver<ExportRangesItem>>> + Send + 'static,
838+
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ExportRangesItem>>> + Send + 'static,
839839
) -> Self {
840840
Self {
841841
ranges,
@@ -909,12 +909,12 @@ impl ExportRangesProgress {
909909
///
910910
/// You can get access to the underlying stream using the [`ExportBaoResult::stream`] method.
911911
pub struct ExportBaoProgress {
912-
inner: future::Boxed<irpc::Result<spsc::Receiver<EncodedItem>>>,
912+
inner: future::Boxed<irpc::Result<mpsc::Receiver<EncodedItem>>>,
913913
}
914914

915915
impl ExportBaoProgress {
916916
fn new(
917-
fut: impl Future<Output = irpc::Result<spsc::Receiver<EncodedItem>>> + Send + 'static,
917+
fut: impl Future<Output = irpc::Result<mpsc::Receiver<EncodedItem>>> + Send + 'static,
918918
) -> Self {
919919
Self {
920920
inner: Box::pin(fut),

src/api/downloader.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,12 @@ use std::{
1111
use anyhow::bail;
1212
use genawaiter::sync::Gen;
1313
use iroh::{Endpoint, NodeId, endpoint::Connection};
14-
use irpc::channel::spsc;
14+
use irpc::channel::mpsc;
1515
use irpc_derive::rpc_requests;
1616
use n0_future::{BufferedStreamExt, Stream, StreamExt, future, stream};
1717
use rand::seq::SliceRandom;
1818
use serde::{Deserialize, Serialize, de::Error};
19-
use tokio::{
20-
sync::{Mutex, mpsc},
21-
task::JoinSet,
22-
};
19+
use tokio::{sync::Mutex, task::JoinSet};
2320
use tokio_util::time::FutureExt;
2421
use tracing::{info, instrument::Instrument, warn};
2522

@@ -43,7 +40,7 @@ impl irpc::Service for DownloaderService {}
4340
#[rpc_requests(DownloaderService, message = SwarmMsg, alias = "Msg")]
4441
#[derive(Debug, Serialize, Deserialize)]
4542
enum SwarmProtocol {
46-
#[rpc(tx = spsc::Sender<DownloadProgessItem>)]
43+
#[rpc(tx = mpsc::Sender<DownloadProgessItem>)]
4744
Download(DownloadRequest),
4845
}
4946

@@ -83,7 +80,7 @@ impl DownloaderActor {
8380
}
8481
}
8582

86-
async fn run(mut self, mut rx: mpsc::Receiver<SwarmMsg>) {
83+
async fn run(mut self, mut rx: tokio::sync::mpsc::Receiver<SwarmMsg>) {
8784
while let Some(msg) = rx.recv().await {
8885
match msg {
8986
SwarmMsg::Download(request) => {
@@ -115,7 +112,7 @@ async fn handle_download_impl(
115112
store: Store,
116113
pool: ConnectionPool,
117114
request: DownloadRequest,
118-
tx: &mut spsc::Sender<DownloadProgessItem>,
115+
tx: &mut mpsc::Sender<DownloadProgessItem>,
119116
) -> anyhow::Result<()> {
120117
match request.strategy {
121118
SplitStrategy::Split => handle_download_split_impl(store, pool, request, tx).await?,
@@ -136,11 +133,11 @@ async fn handle_download_split_impl(
136133
store: Store,
137134
pool: ConnectionPool,
138135
request: DownloadRequest,
139-
tx: &mut spsc::Sender<DownloadProgessItem>,
136+
tx: &mut mpsc::Sender<DownloadProgessItem>,
140137
) -> anyhow::Result<()> {
141138
let providers = request.providers;
142139
let requests = split_request(&request.request, &providers, &pool, &store, Drain).await?;
143-
let (progress_tx, progress_rx) = mpsc::channel(32);
140+
let (progress_tx, progress_rx) = tokio::sync::mpsc::channel(32);
144141
let mut futs = stream::iter(requests.into_iter().enumerate())
145142
.map(|(id, request)| {
146143
let pool = pool.clone();
@@ -149,7 +146,7 @@ async fn handle_download_split_impl(
149146
let progress_tx = progress_tx.clone();
150147
async move {
151148
let hash = request.hash;
152-
let (tx, rx) = mpsc::channel::<(usize, DownloadProgessItem)>(16);
149+
let (tx, rx) = tokio::sync::mpsc::channel::<(usize, DownloadProgessItem)>(16);
153150
progress_tx.send(rx).await.ok();
154151
let sink = TokioMpscSenderSink(tx)
155152
.with_map_err(io::Error::other)
@@ -199,7 +196,7 @@ async fn handle_download_split_impl(
199196
Ok(())
200197
}
201198

202-
fn into_stream<T>(mut recv: mpsc::Receiver<T>) -> impl Stream<Item = T> {
199+
fn into_stream<T>(mut recv: tokio::sync::mpsc::Receiver<T>) -> impl Stream<Item = T> {
203200
Gen::new(|co| async move {
204201
while let Some(item) = recv.recv().await {
205202
co.yield_(item).await;
@@ -309,11 +306,11 @@ impl<'de> Deserialize<'de> for DownloadRequest {
309306
pub type DownloadOptions = DownloadRequest;
310307

311308
pub struct DownloadProgress {
312-
fut: future::Boxed<irpc::Result<spsc::Receiver<DownloadProgessItem>>>,
309+
fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgessItem>>>,
313310
}
314311

315312
impl DownloadProgress {
316-
fn new(fut: future::Boxed<irpc::Result<spsc::Receiver<DownloadProgessItem>>>) -> Self {
313+
fn new(fut: future::Boxed<irpc::Result<mpsc::Receiver<DownloadProgessItem>>>) -> Self {
317314
Self { fut }
318315
}
319316

@@ -351,7 +348,7 @@ impl IntoFuture for DownloadProgress {
351348

352349
impl Downloader {
353350
pub fn new(store: &Store, endpoint: &Endpoint) -> Self {
354-
let (tx, rx) = mpsc::channel::<SwarmMsg>(32);
351+
let (tx, rx) = tokio::sync::mpsc::channel::<SwarmMsg>(32);
355352
let actor = DownloaderActor::new(store.clone(), endpoint.clone());
356353
tokio::spawn(actor.run(rx));
357354
Self { client: tx.into() }

src/api/proto.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use bao_tree::{
2828
io::{BaoContentItem, Leaf, mixed::EncodedItem},
2929
};
3030
use bytes::Bytes;
31-
use irpc::channel::{oneshot, spsc};
31+
use irpc::channel::{mpsc, oneshot};
3232
use irpc_derive::rpc_requests;
3333
use n0_future::Stream;
3434
use range_collections::RangeSet2;
@@ -91,29 +91,29 @@ impl irpc::Service for StoreService {}
9191
#[rpc_requests(StoreService, message = Command, alias = "Msg")]
9292
#[derive(Debug, Serialize, Deserialize)]
9393
pub enum Request {
94-
#[rpc(tx = spsc::Sender<super::Result<Hash>>)]
94+
#[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
9595
ListBlobs(ListRequest),
96-
#[rpc(tx = oneshot::Sender<Scope>, rx = spsc::Receiver<BatchResponse>)]
96+
#[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
9797
Batch(BatchRequest),
9898
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
9999
DeleteBlobs(BlobDeleteRequest),
100-
#[rpc(rx = spsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
100+
#[rpc(rx = mpsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
101101
ImportBao(ImportBaoRequest),
102-
#[rpc(tx = spsc::Sender<EncodedItem>)]
102+
#[rpc(tx = mpsc::Sender<EncodedItem>)]
103103
ExportBao(ExportBaoRequest),
104-
#[rpc(tx = spsc::Sender<ExportRangesItem>)]
104+
#[rpc(tx = mpsc::Sender<ExportRangesItem>)]
105105
ExportRanges(ExportRangesRequest),
106-
#[rpc(tx = spsc::Sender<Bitfield>)]
106+
#[rpc(tx = mpsc::Sender<Bitfield>)]
107107
Observe(ObserveRequest),
108108
#[rpc(tx = oneshot::Sender<BlobStatus>)]
109109
BlobStatus(BlobStatusRequest),
110-
#[rpc(tx = spsc::Sender<AddProgressItem>)]
110+
#[rpc(tx = mpsc::Sender<AddProgressItem>)]
111111
ImportBytes(ImportBytesRequest),
112-
#[rpc(rx = spsc::Receiver<ImportByteStreamUpdate>, tx = spsc::Sender<AddProgressItem>)]
112+
#[rpc(rx = mpsc::Receiver<ImportByteStreamUpdate>, tx = mpsc::Sender<AddProgressItem>)]
113113
ImportByteStream(ImportByteStreamRequest),
114-
#[rpc(tx = spsc::Sender<AddProgressItem>)]
114+
#[rpc(tx = mpsc::Sender<AddProgressItem>)]
115115
ImportPath(ImportPathRequest),
116-
#[rpc(tx = spsc::Sender<ExportProgressItem>)]
116+
#[rpc(tx = mpsc::Sender<ExportProgressItem>)]
117117
ExportPath(ExportPathRequest),
118118
#[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
119119
ListTags(ListTagsRequest),

src/api/remote.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,7 @@ async fn get_blob_ranges_impl(
860860
};
861861
let buffer_size = get_buffer_size(size);
862862
trace!(%size, %buffer_size, "get blob");
863-
let mut handle = store
863+
let handle = store
864864
.import_bao(hash, size, buffer_size)
865865
.await
866866
.map_err(|e| GetError::LocalFailure(e.into()))?;

0 commit comments

Comments
 (0)