Skip to content

services: Add full memory support #134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# memory
OPENDAL_MEMORY_TEST=on
# fs
OPENDAL_FS_TEST=false
OPENDAL_FS_ROOT=/path/to/dir
Expand Down
27 changes: 27 additions & 0 deletions .github/workflows/service_test_memory.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Service Test Memory

on: [ push, pull_request ]

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
cancel-in-progress: true

jobs:
memory:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os:
- ubuntu-latest
- macos-11
steps:
- uses: actions/checkout@v2

- uses: Swatinem/rust-cache@v1

- name: Test
shell: bash
run: cargo test memory
env:
RUST_BACKTRACE: full
OPENDAL_MEMORY_TEST: on
1 change: 1 addition & 0 deletions benches/ops/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub fn services() -> Vec<(&'static str, Option<Arc<dyn Accessor>>)> {
vec![
("fs", fs::new().await.expect("init fs")),
("s3", s3::new().await.expect("init s3")),
("memory", memory::new().await.expect("init memory")),
]
})
}
Expand Down
33 changes: 33 additions & 0 deletions opendal_test/src/services/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use std::sync::Arc;

use opendal::error::Result;
use opendal::services::memory;
use opendal::Accessor;

/// In order to test memory service, please set the following environment variables:
///
/// - `OPENDAL_MEMORY_TEST=on`: set to `on` to enable the test.
pub async fn new() -> Result<Option<Arc<dyn Accessor>>> {
dotenv::from_filename(".env").ok();

if env::var("OPENDAL_MEMORY_TEST").is_err() || env::var("OPENDAL_MEMORY_TEST").unwrap() != "on"
{
return Ok(None);
}

Ok(Some(memory::Backend::build().finish().await?))
}
1 change: 1 addition & 0 deletions opendal_test/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod fs;
pub mod memory;
pub mod s3;
194 changes: 173 additions & 21 deletions src/services/memory/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,80 +15,192 @@
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;

use anyhow::anyhow;
use async_trait::async_trait;
use bytes::Bytes;
use futures::io;
use futures::TryStreamExt;

use crate::error::Error;
use crate::error::Kind;
use crate::error::Result;
use crate::object::BoxedObjectStream;
use crate::ops::OpDelete;
use crate::ops::OpList;
use crate::ops::OpRead;
use crate::ops::OpStat;
use crate::ops::OpWrite;
use crate::Accessor;
use crate::BoxedAsyncReader;
use crate::Metadata;
use crate::Object;
use crate::ObjectMode;

#[derive(Default)]
pub struct Builder {
data: HashMap<String, Bytes>,
}
pub struct Builder {}

impl Builder {
pub fn add_bytes(&mut self, key: &str, data: Bytes) -> &mut Self {
self.data.insert(key.to_string(), data);
self
}

pub async fn finish(&mut self) -> Result<Arc<dyn Accessor>> {
Ok(Arc::new(Backend {
data: self.data.clone(),
}))
Ok(Arc::new(Backend::default()))
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct Backend {
data: HashMap<String, Bytes>,
inner: Arc<Mutex<HashMap<String, bytes::Bytes>>>,
}

impl Backend {
pub fn build() -> Builder {
Builder::default()
}

// normalize_path removes all internal `//` inside path.
pub(crate) fn normalize_path(path: &str) -> String {
let has_trailing = path.ends_with('/');

let mut p = path
.split('/')
.filter(|v| !v.is_empty())
.collect::<Vec<&str>>()
.join("/");

if has_trailing && !p.eq("/") {
p.push('/')
}

p
}
}

#[async_trait]
impl Accessor for Backend {
async fn read(&self, args: &OpRead) -> Result<BoxedAsyncReader> {
let data = self.data.get(&args.path).ok_or_else(|| Error::Object {
let path = Backend::normalize_path(&args.path);

let map = self.inner.lock().expect("lock poisoned");

let data = map.get(&path).ok_or_else(|| Error::Object {
kind: Kind::ObjectNotExist,
op: "read",
path: args.path.to_string(),
path: path.to_string(),
source: anyhow!("key not exists in map"),
})?;

let mut data = data.clone();
if let Some(offset) = args.offset {
if offset >= data.len() as u64 {
return Err(Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: HashMap::from([("offset".to_string(), offset.to_string())]),
source: anyhow!("Offset out of bound {} >= {}", offset, data.len()),
return Err(Error::Object {
kind: Kind::Unexpected,
op: "read",
path: path.to_string(),
source: anyhow!("offset out of bound {} >= {}", offset, data.len()),
});
}
data = data.slice(offset as usize..data.len());
};

if let Some(size) = args.size {
let size = (size as usize).min(data.len());
data = data.slice(0..size);
if size > data.len() as u64 {
return Err(Error::Object {
kind: Kind::Unexpected,
op: "read",
path: path.to_string(),
source: anyhow!("size out of bound {} > {}", size, data.len()),
});
}
data = data.slice(0..size as usize);
};

let r: BoxedAsyncReader = Box::new(BytesStream(data).into_async_read());
Ok(r)
}
async fn write(&self, mut r: BoxedAsyncReader, args: &OpWrite) -> Result<usize> {
let path = Backend::normalize_path(&args.path);

let bs = vec![0; args.size as usize];
let mut cursor = io::Cursor::new(bs);
let n = io::copy(&mut r, &mut cursor)
.await
.map_err(|e| Error::Object {
kind: Kind::Unexpected,
op: "write",
path: path.clone(),
source: anyhow::Error::from(e),
})?;
if n < args.size {
return Err(Error::Object {
kind: Kind::Unexpected,
op: "write",
path: path.clone(),
source: anyhow!("write short {} M {}", n, args.size),
});
}

let mut map = self.inner.lock().expect("lock poisoned");
map.insert(path.to_string(), Bytes::from(cursor.into_inner()));

Ok(n as usize)
}
async fn stat(&self, args: &OpStat) -> Result<Metadata> {
let path = Backend::normalize_path(&args.path);

if path.ends_with('/') {
let mut meta = Metadata::default();
meta.set_path(&path)
.set_mode(ObjectMode::DIR)
.set_content_length(0)
.set_complete();

return Ok(meta);
}

let map = self.inner.lock().expect("lock poisoned");

let data = map.get(&path).ok_or_else(|| Error::Object {
kind: Kind::ObjectNotExist,
op: "stat",
path: path.to_string(),
source: anyhow!("key not exists in map"),
})?;

let mut meta = Metadata::default();
meta.set_path(&path)
.set_mode(ObjectMode::FILE)
.set_content_length(data.len() as u64)
.set_complete();

Ok(meta)
}
async fn delete(&self, args: &OpDelete) -> Result<()> {
let path = Backend::normalize_path(&args.path);

let mut map = self.inner.lock().expect("lock poisoned");
map.remove(&path);

Ok(())
}
async fn list(&self, args: &OpList) -> Result<BoxedObjectStream> {
let path = Backend::normalize_path(&args.path);

let map = self.inner.lock().expect("lock poisoned");

let paths = map
.iter()
.map(|(k, _)| k.clone())
.filter(|k| k.starts_with(&path))
.collect::<Vec<String>>();

Ok(Box::new(EntryStream {
backend: self.clone(),
paths,
idx: 0,
}))
}
}

struct BytesStream(Bytes);
Expand All @@ -109,3 +221,43 @@ impl futures::Stream for BytesStream {
(self.0.len(), Some(self.0.len()))
}
}

struct EntryStream {
backend: Backend,
paths: Vec<String>,
idx: usize,
}

impl futures::Stream for EntryStream {
type Item = Result<Object>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.idx >= self.paths.len() {
return Poll::Ready(None);
}

let idx = self.idx;
self.idx += 1;

let path = self.paths.get(idx).expect("path must valid");

let backend = self.backend.clone();
let map = backend.inner.lock().expect("lock poisoned");

let data = map.get(path);
// If the path is not get, we can skip it safely.
if data.is_none() {
return self.poll_next(cx);
}
let bs = data.expect("object must exist");

let mut o = Object::new(Arc::new(self.backend.clone()), path);
let meta = o.metadata_mut();
meta.set_path(path)
.set_mode(ObjectMode::FILE)
.set_content_length(bs.len() as u64)
.set_complete();

Poll::Ready(Some(Ok(o)))
}
}
32 changes: 0 additions & 32 deletions src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ use std::io::SeekFrom;
use std::str::from_utf8;

use anyhow::Result;
use bytes::Bytes;
use futures::AsyncReadExt;
use futures::AsyncSeekExt;

use crate::services::fs;
use crate::services::memory;
use crate::Operator;

#[tokio::test]
Expand Down Expand Up @@ -155,33 +153,3 @@ async fn test_limited_reader() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_memory_reader() -> Result<()> {
let data = "Hello, world!";
let f = Operator::new(
memory::Backend::build()
.add_bytes("test", Bytes::from(data))
.finish()
.await
.unwrap(),
);
let mut r = f.object("test").limited_reader(5);
let mut buf = vec![];

let n = r.read_to_end(&mut buf).await?;
assert_eq!(n, 5);
assert_eq!("Hello", from_utf8(&buf).unwrap());

let n = r.seek(SeekFrom::Start(0)).await?;
assert_eq!(n, 0);
let n = r.seek(SeekFrom::End(0)).await?;
assert_eq!(n, 5);

// offset out of bound
let mut r = f.object("test").range_reader(100, 5);
let n = r.read_to_end(&mut buf).await;
assert!(n.is_err());

Ok(())
}
Loading