Skip to content

Commit 3fd29f0

Browse files
xprazak2Xuanwo
andauthored
fix: Handle write data in async way for IPMFS (#694)
* fix: Handle write data in async way for IPMFS (#586) * Format code Signed-off-by: Xuanwo <[email protected]> Signed-off-by: Xuanwo <[email protected]> Co-authored-by: Xuanwo <[email protected]>
1 parent d646216 commit 3fd29f0

File tree

3 files changed

+50
-39
lines changed

3 files changed

+50
-39
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ redis = { version = "0.21", features = [
115115
], optional = true }
116116
reqsign = "0.4"
117117
reqwest = { version = "0.11", features = [
118+
"multipart",
118119
"stream",
119120
"rustls-tls-native-roots",
120121
], default-features = false }

src/http_util/client.rs

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -96,30 +96,60 @@ impl HttpClient {
9696
Ok(resp)
9797
}
9898

99+
pub async fn send_async_multipart(
100+
&self,
101+
req: Request<AsyncBody>,
102+
field_name: String,
103+
) -> Result<Response<AsyncBody>> {
104+
let (parts, body) = req.into_parts();
105+
106+
let mut form = reqwest::multipart::Form::new();
107+
let part = reqwest::multipart::Part::stream(body);
108+
form = form.part(field_name, part);
109+
110+
let req_builder = self
111+
.async_client
112+
.request(
113+
parts.method,
114+
Url::from_str(&parts.uri.to_string()).expect("input request url must be valid"),
115+
)
116+
.version(parts.version)
117+
.headers(parts.headers)
118+
.multipart(form);
119+
120+
self.send_async_req(req_builder).await
121+
}
122+
99123
/// Send a request in async way.
100124
pub async fn send_async(&self, req: Request<AsyncBody>) -> Result<Response<AsyncBody>> {
101125
let (parts, body) = req.into_parts();
102126

103-
let resp = self
127+
let req_builder = self
104128
.async_client
105129
.request(
106130
parts.method,
107131
Url::from_str(&parts.uri.to_string()).expect("input request url must be valid"),
108132
)
109133
.version(parts.version)
110134
.headers(parts.headers)
111-
.body(body)
112-
.send()
113-
.await
114-
.map_err(|err| {
115-
let kind = if err.is_timeout() || err.is_connect() {
116-
ErrorKind::Interrupted
117-
} else {
118-
ErrorKind::Other
119-
};
120-
121-
Error::new(kind, err)
122-
})?;
135+
.body(body);
136+
137+
self.send_async_req(req_builder).await
138+
}
139+
140+
async fn send_async_req(
141+
&self,
142+
req_builder: reqwest::RequestBuilder,
143+
) -> Result<Response<AsyncBody>> {
144+
let resp = req_builder.send().await.map_err(|err| {
145+
let kind = if err.is_timeout() || err.is_connect() {
146+
ErrorKind::Interrupted
147+
} else {
148+
ErrorKind::Other
149+
};
150+
151+
Error::new(kind, err)
152+
})?;
123153

124154
let mut hr = Response::builder()
125155
.version(resp.version())

src/services/ipmfs/backend.rs

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ use std::sync::Arc;
2020

2121
use anyhow::anyhow;
2222
use async_trait::async_trait;
23-
use bytes::Bytes;
24-
use futures::io;
2523
use http::Request;
2624
use http::Response;
2725
use http::StatusCode;
@@ -114,7 +112,7 @@ impl Accessor for Backend {
114112
async fn create(&self, path: &str, args: OpCreate) -> Result<()> {
115113
let resp = match args.mode() {
116114
ObjectMode::DIR => self.ipmfs_mkdir(path).await?,
117-
ObjectMode::FILE => self.ipmfs_write(path, &[]).await?,
115+
ObjectMode::FILE => self.ipmfs_write(path, AsyncBody::Empty).await?,
118116
_ => unreachable!(),
119117
};
120118

@@ -152,11 +150,8 @@ impl Accessor for Backend {
152150
}
153151

154152
async fn write(&self, path: &str, args: OpWrite, r: BytesReader) -> Result<u64> {
155-
// TODO: Accept a reader directly.
156-
let mut buf = Vec::with_capacity(args.size() as usize);
157-
io::copy(r, &mut buf).await?;
158-
159-
let resp = self.ipmfs_write(path, &buf).await?;
153+
let body = AsyncBody::Reader(r);
154+
let resp = self.ipmfs_write(path, body).await?;
160155

161156
let status = resp.status();
162157

@@ -366,7 +361,7 @@ impl Backend {
366361
}
367362

368363
/// Support write from reader.
369-
async fn ipmfs_write(&self, path: &str, data: &[u8]) -> Result<Response<AsyncBody>> {
364+
async fn ipmfs_write(&self, path: &str, body: AsyncBody) -> Result<Response<AsyncBody>> {
370365
let p = build_rooted_abs_path(&self.root, path);
371366

372367
let url = format!(
@@ -375,30 +370,15 @@ impl Backend {
375370
percent_encode_path(&p)
376371
);
377372

378-
let mut req = Request::post(url);
379-
380-
req = req.header(
381-
http::header::CONTENT_TYPE,
382-
"multipart/form-data; boundary=custom-boundary",
383-
);
384-
let left =
385-
"--custom-boundary\nContent-Disposition: form-data; name=\"data\";\n\n".as_bytes();
386-
let right = "\n--custom-boundary".as_bytes();
387-
388-
// TODO: we need to accept a reader.
389-
let mut buf = Vec::with_capacity(left.len() + data.len() + right.len());
390-
buf.extend_from_slice(left);
391-
buf.extend_from_slice(data);
392-
buf.extend_from_slice(right);
373+
let req = Request::post(url);
393374

394-
let body = AsyncBody::Bytes(Bytes::from(buf));
395375
let req = req
396376
.body(body)
397377
.map_err(|err| new_request_build_error(Operation::Write, path, err))?;
398378

399379
let resp = self
400380
.client
401-
.send_async(req)
381+
.send_async_multipart(req, "data".to_string())
402382
.await
403383
.map_err(|e| new_request_send_error(Operation::Write, path, e))?;
404384

0 commit comments

Comments
 (0)