Skip to content

services/fs: Use separate dedicated thread pool instead #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@ name = "ops"
harness = false

[dependencies]
async-compat = "0.2.1"
async-trait = "0.1.52"
bytes = "1.1.0"
futures = { version = "0.3.21", features = ["alloc"] }
tokio = { version = "1.16.1", features = ["full"] }
thiserror = "1.0.30"
aws-types = { version = "0.6.0", features = ["hardcoded-credentials"] }
aws-sdk-s3 = "0.6.0"
Expand All @@ -28,10 +26,12 @@ aws-smithy-http = "0.36.0"
hyper = { version = "0.14.16", features = ["stream"] }
pin-project = "1.0.10"
aws-config = "0.6.0"
blocking = "1.1.0"

[dev-dependencies]
uuid = { version = "0.8.2", features = ["serde", "v4"] }
anyhow = "1.0"
rand = "0.8.5"
sha2 = "0.10.1"
criterion = { version = "0.3", features = ["async", "async_tokio"] }
tokio = { version = "1.16.1", features = ["full"] }
17 changes: 15 additions & 2 deletions benches/ops/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,29 @@ pub fn bench(c: &mut Criterion) {
let mut group = c.benchmark_group(case.0);
group.throughput(criterion::Throughput::Bytes(size as u64));
group.bench_with_input(
BenchmarkId::new("bench_read", &path),
BenchmarkId::new("read", &path),
&(op.clone(), &path),
|b, input| {
b.to_async(&runtime)
.iter(|| bench_read(input.0.clone(), input.1))
},
);
group.bench_with_input(
BenchmarkId::new("bench_buf_read", &path),
BenchmarkId::new("buf_read", &path),
&(op.clone(), &path),
|b, input| {
b.to_async(&runtime)
.iter(|| bench_buf_read(input.0.clone(), input.1))
},
);
group.bench_with_input(
BenchmarkId::new("write", &path),
&(op.clone(), &path, content.clone()),
|b, input| {
b.to_async(&runtime)
.iter(|| bench_write(input.0.clone(), input.1, input.2.clone()))
},
);
group.finish();
}
}
Expand All @@ -86,3 +94,8 @@ pub async fn bench_buf_read(op: Operator, path: &str) {

io::copy(&mut r, &mut io::sink()).await.unwrap();
}

pub async fn bench_write(op: Operator, path: &str, content: Vec<u8>) {
let w = op.object(path).writer();
w.write_bytes(content).await.unwrap();
}
83 changes: 54 additions & 29 deletions src/services/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fs;
use std::io::SeekFrom;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;

use async_compat::CompatExt;
use async_trait::async_trait;
use tokio::fs;
use tokio::io;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use blocking::unblock;
use blocking::Unblock;
use futures::io;
use futures::AsyncReadExt;
use futures::AsyncSeekExt;
use futures::AsyncWriteExt;

use crate::error::Error;
use crate::error::Result;
Expand Down Expand Up @@ -51,9 +53,11 @@ impl Builder {
let root = self.root.clone().unwrap_or_else(|| "/".to_string());

// If root dir is not exist, we must create it.
if let Err(e) = fs::metadata(&root).await {
let metadata_root = root.clone();
if let Err(e) = unblock(|| fs::metadata(metadata_root)).await {
if e.kind() == std::io::ErrorKind::NotFound {
fs::create_dir_all(&root)
let dir_root = root.clone();
unblock(|| fs::create_dir_all(dir_root))
.await
.map_err(|e| parse_io_error(&e, PathBuf::from(&root).as_path()))?;
}
Expand All @@ -63,6 +67,14 @@ impl Builder {
}
}

/// Backend is used to serve `Accessor` support for posix alike fs.
///
/// # Note
///
/// We will use separate dedicated thread pool (powered by `unblocking`)
/// for better async performance under tokio. All `std::File` will be wrapped
/// by `Unblock` to gain async support. IO will happen at the separate dedicated
/// thread pool, so we will not block the tokio runtime.
#[derive(Debug, Clone)]
pub struct Backend {
root: String,
Expand All @@ -79,21 +91,22 @@ impl Accessor for Backend {
async fn read(&self, args: &OpRead) -> Result<BoxedAsyncRead> {
let path = PathBuf::from(&self.root).join(&args.path);

let mut f = fs::OpenOptions::new()
.read(true)
.open(&path)
let open_path = path.clone();
let f = unblock(|| fs::OpenOptions::new().read(true).open(open_path))
.await
.map_err(|e| parse_io_error(&e, &path))?;

let mut f = Unblock::new(f);

if let Some(offset) = args.offset {
f.seek(SeekFrom::Start(offset))
.await
.map_err(|e| parse_io_error(&e, &path))?;
};

let r: BoxedAsyncRead = match args.size {
Some(size) => Box::new(f.take(size).compat()),
None => Box::new(f.compat()),
Some(size) => Box::new(f.take(size)),
None => Box::new(f),
};

Ok(r)
Expand All @@ -110,36 +123,45 @@ impl Accessor for Backend {
// - Is it better to check the parent dir exists before call mkdir?
let parent = path
.parent()
.ok_or_else(|| Error::Unexpected(format!("malformed path: {:?}", path.to_str())))?;
fs::create_dir_all(parent)
.await
.map_err(|e| parse_io_error(&e, parent))?;
.ok_or_else(|| Error::Unexpected(format!("malformed path: {:?}", path.to_str())))?
.to_path_buf();

let mut f = fs::OpenOptions::new()
.create(true)
.write(true)
.open(&path)
let capture_parent = parent.clone();
unblock(|| fs::create_dir_all(capture_parent))
.await
.map_err(|e| parse_io_error(&e, &path))?;
.map_err(|e| parse_io_error(&e, &parent))?;

let capture_path = path.clone();
let f = unblock(|| {
fs::OpenOptions::new()
.create(true)
.write(true)
.open(capture_path)
})
.await
.map_err(|e| parse_io_error(&e, &path))?;

let mut f = Unblock::new(f);

// TODO: we should respect the input size.
let s = io::copy(&mut r.compat_mut(), &mut f)
let s = io::copy(&mut r, &mut f)
.await
.map_err(|e| parse_io_error(&e, &path))?;

// `std::fs::File`'s errors detected on closing are ignored by
// the implementation of Drop.
// So we need to call `sync_all` to make sure all internal metadata
// have been flushed to fs successfully.
f.sync_all().await.map_err(|e| parse_io_error(&e, &path))?;
// So we need to call `flush` to make sure all data have been flushed
// to fs successfully.
f.flush().await.map_err(|e| parse_io_error(&e, &path))?;

Ok(s as usize)
}

async fn stat(&self, args: &OpStat) -> Result<Metadata> {
let path = PathBuf::from(&self.root).join(&args.path);

let meta = fs::metadata(&path)
let capture_path = path.clone();
let meta = unblock(|| fs::metadata(capture_path))
.await
.map_err(|e| parse_io_error(&e, &path))?;

Expand All @@ -152,8 +174,9 @@ impl Accessor for Backend {
async fn delete(&self, args: &OpDelete) -> Result<()> {
let path = PathBuf::from(&self.root).join(&args.path);

let capture_path = path.clone();
// PathBuf.is_dir() is not free, call metadata directly instead.
let meta = fs::metadata(&path).await;
let meta = unblock(|| fs::metadata(capture_path)).await;

if let Err(err) = &meta {
if err.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -165,9 +188,11 @@ impl Accessor for Backend {
let meta = meta.ok().unwrap();

let f = if meta.is_dir() {
fs::remove_dir(&path).await
let capture_path = path.clone();
unblock(|| fs::remove_dir(capture_path)).await
} else {
fs::remove_file(&path).await
let capture_path = path.clone();
unblock(|| fs::remove_file(capture_path)).await
};

f.map_err(|e| parse_io_error(&e, &path))
Expand Down