Skip to content
This repository was archived by the owner on Jul 21, 2025. It is now read-only.

Commit 080435a

Browse files
committed
Add import_byte_stream_mid that takes a stream
so we don't have to go item -> request -> stream
1 parent eece2c4 commit 080435a

File tree

4 files changed

+35
-37
lines changed

4 files changed

+35
-37
lines changed

Cargo.lock

Lines changed: 5 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ self_cell = "1.1.0"
3333
genawaiter = { version = "0.99.1", features = ["futures03"] }
3434
iroh-base = "0.34.1"
3535
reflink-copy = "0.1.24"
36-
irpc = { version = "0.2.3", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream"], default-features = false, path = "../irpc" }
37-
irpc-derive = { version = "0.2.3", default-features = false }
36+
irpc = { version = "0.3.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream"], default-features = false }
37+
irpc-derive = { version = "0.3.0", default-features = false }
3838
iroh-metrics = { version = "0.32.0" }
3939
hashlink = "0.10.0"
4040
futures-buffered = "0.2.11"

src/store/fs/import.rs

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::{
1515
fs::{self, File, OpenOptions},
1616
io::{self, Seek, Write},
1717
path::PathBuf,
18-
sync::{Arc, mpsc::RecvError},
18+
sync::Arc,
1919
};
2020

2121
use bao_tree::{
@@ -163,21 +163,12 @@ pub async fn import_bytes(cmd: ImportBytesMsg, ctx: Arc<TaskContext>) {
163163
if ctx.options.is_inlined_all(size) {
164164
import_bytes_tiny_outer(cmd, ctx).await;
165165
} else {
166-
let (mut tx, rx) = spsc::channel(2);
167-
tx.send(ImportByteStreamUpdate::Bytes(cmd.data.clone()))
168-
.await
169-
.unwrap();
170-
tx.send(ImportByteStreamUpdate::Done).await.unwrap();
171-
let cmd = ImportByteStreamMsg {
172-
inner: ImportByteStreamRequest {
173-
format: cmd.format,
174-
scope: cmd.scope,
175-
},
176-
tx: cmd.tx,
177-
rx,
178-
span: tracing::Span::current(),
166+
let request = ImportByteStreamRequest {
167+
format: cmd.format,
168+
scope: cmd.scope,
179169
};
180-
import_byte_stream_outer(cmd, ctx).await;
170+
let stream = stream::iter(Some(Ok(cmd.data.clone())));
171+
import_byte_stream_mid(request, cmd.tx, cmd.span, stream, ctx).await;
181172
}
182173
}
183174

@@ -235,7 +226,8 @@ async fn import_bytes_tiny_impl(
235226

236227
#[instrument(skip_all)]
237228
pub async fn import_byte_stream(cmd: ImportByteStreamMsg, ctx: Arc<TaskContext>) {
238-
import_byte_stream_outer(cmd, ctx).await;
229+
let stream = into_stream(cmd.rx);
230+
import_byte_stream_mid(cmd.inner, cmd.tx, cmd.span, stream, ctx).await
239231
}
240232

241233
fn into_stream(
@@ -263,20 +255,25 @@ fn into_stream(
263255
})
264256
}
265257

266-
pub async fn import_byte_stream_outer(mut cmd: ImportByteStreamMsg, ctx: Arc<TaskContext>) {
267-
let stream = into_stream(cmd.rx);
268-
match import_byte_stream_impl(cmd.inner, &mut cmd.tx, stream, ctx.options.clone()).await {
258+
async fn import_byte_stream_mid(
259+
request: ImportByteStreamRequest,
260+
mut tx: spsc::Sender<AddProgressItem>,
261+
span: tracing::Span,
262+
stream: impl Stream<Item = io::Result<Bytes>> + Unpin,
263+
ctx: Arc<TaskContext>,
264+
) {
265+
match import_byte_stream_impl(request, &mut tx, stream, ctx.options.clone()).await {
269266
Ok(entry) => {
270267
let entry = ImportEntryMsg {
271268
inner: entry,
272-
tx: cmd.tx,
269+
tx,
273270
rx: NoReceiver,
274-
span: cmd.span,
271+
span,
275272
};
276273
ctx.internal_cmd_tx.send(entry.into()).await.ok();
277274
}
278275
Err(cause) => {
279-
cmd.tx.send(cause.into()).await.ok();
276+
tx.send(cause.into()).await.ok();
280277
}
281278
}
282279
}

src/store/mem.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use bao_tree::{
2727
};
2828
use bytes::Bytes;
2929
use irpc::channel::spsc;
30-
use n0_future::{StreamExt, future::yield_now};
30+
use n0_future::future::yield_now;
3131
use range_collections::range_set::RangeSetRange;
3232
use tokio::{
3333
io::AsyncReadExt,
@@ -43,15 +43,14 @@ use crate::{
4343
self, ApiClient,
4444
blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
4545
proto::{
46-
BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest,
47-
BoxedByteStream, Command, CreateTagMsg, CreateTagRequest, CreateTempTagMsg,
48-
DeleteBlobsMsg, DeleteTagsMsg, DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest,
49-
ExportPathMsg, ExportPathRequest, ExportRangesItem, ExportRangesMsg,
50-
ExportRangesRequest, ImportBaoMsg, ImportBaoRequest, ImportByteStreamMsg,
51-
ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest, ImportPathMsg,
52-
ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest, ObserveMsg,
53-
ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg, SetTagRequest,
54-
ShutdownMsg, SyncDbMsg,
46+
BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
47+
CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
48+
DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
49+
ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
50+
ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
51+
ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
52+
ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
53+
SetTagRequest, ShutdownMsg, SyncDbMsg,
5554
},
5655
tags::TagInfo,
5756
},

0 commit comments

Comments
 (0)