@@ -24,7 +24,7 @@ use bao_tree::{
24
24
use bytes:: Bytes ;
25
25
use genawaiter:: sync:: Gen ;
26
26
use iroh_io:: { AsyncStreamReader , TokioStreamReader } ;
27
- use irpc:: channel:: { oneshot , spsc } ;
27
+ use irpc:: channel:: { mpsc , oneshot } ;
28
28
use n0_future:: { Stream , StreamExt , future, stream} ;
29
29
use quinn:: SendStream ;
30
30
use range_collections:: { RangeSet2 , range_set:: RangeSetRange } ;
@@ -225,7 +225,7 @@ impl Blobs {
225
225
} ;
226
226
let client = self . client . clone ( ) ;
227
227
let stream = Gen :: new ( |co| async move {
228
- let ( mut sender, mut receiver) = match client. bidi_streaming ( inner, 32 , 32 ) . await {
228
+ let ( sender, mut receiver) = match client. bidi_streaming ( inner, 32 , 32 ) . await {
229
229
Ok ( x) => x,
230
230
Err ( cause) => {
231
231
co. yield_ ( AddProgressItem :: Error ( cause. into ( ) ) ) . await ;
@@ -338,7 +338,7 @@ impl Blobs {
338
338
trace ! ( "{:?}" , options) ;
339
339
if options. hash == Hash :: EMPTY {
340
340
return ObserveProgress :: new ( async move {
341
- let ( mut tx, rx) = spsc :: channel ( 1 ) ;
341
+ let ( tx, rx) = mpsc :: channel ( 1 ) ;
342
342
tx. send ( Bitfield :: complete ( 0 ) ) . await . ok ( ) ;
343
343
Ok ( rx)
344
344
} ) ;
@@ -405,7 +405,7 @@ impl Blobs {
405
405
let tree = BaoTree :: new ( size. get ( ) , IROH_BLOCK_SIZE ) ;
406
406
let mut decoder = ResponseDecoder :: new ( hash. into ( ) , ranges, tree, reader) ;
407
407
let options = ImportBaoOptions { hash, size } ;
408
- let mut handle = self . import_bao_with_opts ( options, 32 ) . await ?;
408
+ let handle = self . import_bao_with_opts ( options, 32 ) . await ?;
409
409
let driver = async move {
410
410
let reader = loop {
411
411
match decoder. next ( ) . await {
@@ -509,7 +509,7 @@ impl<'a> BatchAddProgress<'a> {
509
509
pub struct Batch < ' a > {
510
510
scope : Scope ,
511
511
blobs : & ' a Blobs ,
512
- _tx : spsc :: Sender < BatchResponse > ,
512
+ _tx : mpsc :: Sender < BatchResponse > ,
513
513
}
514
514
515
515
impl < ' a > Batch < ' a > {
@@ -647,7 +647,7 @@ impl<'a> AddProgress<'a> {
647
647
/// Calling [`ObserveProgress::aggregated`] will return a stream of states,
648
648
/// where each state is the current state at the time of the update.
649
649
pub struct ObserveProgress {
650
- inner : future:: Boxed < irpc:: Result < spsc :: Receiver < Bitfield > > > ,
650
+ inner : future:: Boxed < irpc:: Result < mpsc :: Receiver < Bitfield > > > ,
651
651
}
652
652
653
653
impl IntoFuture for ObserveProgress {
@@ -668,7 +668,7 @@ impl IntoFuture for ObserveProgress {
668
668
669
669
impl ObserveProgress {
670
670
fn new (
671
- fut : impl Future < Output = irpc:: Result < spsc :: Receiver < Bitfield > > > + Send + ' static ,
671
+ fut : impl Future < Output = irpc:: Result < mpsc :: Receiver < Bitfield > > > + Send + ' static ,
672
672
) -> Self {
673
673
Self {
674
674
inner : Box :: pin ( fut) ,
@@ -710,7 +710,7 @@ impl ObserveProgress {
710
710
/// It also implements [`IntoFuture`], so you can await it to get the size of the
711
711
/// exported blob.
712
712
pub struct ExportProgress {
713
- inner : future:: Boxed < irpc:: Result < spsc :: Receiver < ExportProgressItem > > > ,
713
+ inner : future:: Boxed < irpc:: Result < mpsc :: Receiver < ExportProgressItem > > > ,
714
714
}
715
715
716
716
impl IntoFuture for ExportProgress {
@@ -725,7 +725,7 @@ impl IntoFuture for ExportProgress {
725
725
726
726
impl ExportProgress {
727
727
fn new (
728
- fut : impl Future < Output = irpc:: Result < spsc :: Receiver < ExportProgressItem > > > + Send + ' static ,
728
+ fut : impl Future < Output = irpc:: Result < mpsc :: Receiver < ExportProgressItem > > > + Send + ' static ,
729
729
) -> Self {
730
730
Self {
731
731
inner : Box :: pin ( fut) ,
@@ -768,15 +768,15 @@ impl ExportProgress {
768
768
769
769
/// A handle for an ongoing bao import operation.
770
770
pub struct ImportBaoHandle {
771
- pub tx : spsc :: Sender < BaoContentItem > ,
771
+ pub tx : mpsc :: Sender < BaoContentItem > ,
772
772
pub rx : oneshot:: Receiver < super :: Result < ( ) > > ,
773
773
}
774
774
775
775
impl ImportBaoHandle {
776
776
pub ( crate ) async fn new (
777
777
fut : impl Future <
778
778
Output = irpc:: Result < (
779
- spsc :: Sender < BaoContentItem > ,
779
+ mpsc :: Sender < BaoContentItem > ,
780
780
oneshot:: Receiver < super :: Result < ( ) > > ,
781
781
) > ,
782
782
> + Send
@@ -789,20 +789,20 @@ impl ImportBaoHandle {
789
789
790
790
/// A progress handle for a blobs list operation.
791
791
pub struct BlobsListProgress {
792
- inner : future:: Boxed < irpc:: Result < spsc :: Receiver < super :: Result < Hash > > > > ,
792
+ inner : future:: Boxed < irpc:: Result < mpsc :: Receiver < super :: Result < Hash > > > > ,
793
793
}
794
794
795
795
impl BlobsListProgress {
796
796
fn new (
797
- fut : impl Future < Output = irpc:: Result < spsc :: Receiver < super :: Result < Hash > > > > + Send + ' static ,
797
+ fut : impl Future < Output = irpc:: Result < mpsc :: Receiver < super :: Result < Hash > > > > + Send + ' static ,
798
798
) -> Self {
799
799
Self {
800
800
inner : Box :: pin ( fut) ,
801
801
}
802
802
}
803
803
804
804
pub async fn hashes ( self ) -> RequestResult < Vec < Hash > > {
805
- let mut rx: spsc :: Receiver < Result < Hash , super :: Error > > = self . inner . await ?;
805
+ let mut rx: mpsc :: Receiver < Result < Hash , super :: Error > > = self . inner . await ?;
806
806
let mut hashes = Vec :: new ( ) ;
807
807
while let Some ( item) = rx. recv ( ) . await ? {
808
808
hashes. push ( item?) ;
@@ -829,13 +829,13 @@ impl BlobsListProgress {
829
829
/// You can get access to the underlying stream using the [`ExportBaoResult::stream`] method.
830
830
pub struct ExportRangesProgress {
831
831
ranges : RangeSet2 < u64 > ,
832
- inner : future:: Boxed < irpc:: Result < spsc :: Receiver < ExportRangesItem > > > ,
832
+ inner : future:: Boxed < irpc:: Result < mpsc :: Receiver < ExportRangesItem > > > ,
833
833
}
834
834
835
835
impl ExportRangesProgress {
836
836
fn new (
837
837
ranges : RangeSet2 < u64 > ,
838
- fut : impl Future < Output = irpc:: Result < spsc :: Receiver < ExportRangesItem > > > + Send + ' static ,
838
+ fut : impl Future < Output = irpc:: Result < mpsc :: Receiver < ExportRangesItem > > > + Send + ' static ,
839
839
) -> Self {
840
840
Self {
841
841
ranges,
@@ -909,12 +909,12 @@ impl ExportRangesProgress {
909
909
///
910
910
/// You can get access to the underlying stream using the [`ExportBaoResult::stream`] method.
911
911
pub struct ExportBaoProgress {
912
- inner : future:: Boxed < irpc:: Result < spsc :: Receiver < EncodedItem > > > ,
912
+ inner : future:: Boxed < irpc:: Result < mpsc :: Receiver < EncodedItem > > > ,
913
913
}
914
914
915
915
impl ExportBaoProgress {
916
916
fn new (
917
- fut : impl Future < Output = irpc:: Result < spsc :: Receiver < EncodedItem > > > + Send + ' static ,
917
+ fut : impl Future < Output = irpc:: Result < mpsc :: Receiver < EncodedItem > > > + Send + ' static ,
918
918
) -> Self {
919
919
Self {
920
920
inner : Box :: pin ( fut) ,
0 commit comments