Skip to content

feat: Implement operator metadata support #296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions src/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::BytesReader;
use crate::BytesWriter;
use crate::Metadata;
use crate::ObjectStreamer;
use crate::Scheme;

/// Underlying trait of all backends for implementors.
///
Expand All @@ -42,6 +43,11 @@ use crate::ObjectStreamer;
/// should handle them based on services requirement.
#[async_trait]
pub trait Accessor: Send + Sync + Debug {
/// Invoke the `metadata` operation to get metadata of accessor.
fn metadata(&self) -> AccessorMetadata {
unimplemented!()
}

/// Invoke the `read` operation on the specified path, returns corresponding
/// [`Metadata`] if operate successful.
///
Expand Down Expand Up @@ -114,6 +120,9 @@ pub trait Accessor: Send + Sync + Debug {
/// `Accessor` for `Arc<dyn Accessor>`.
#[async_trait]
impl<T: Accessor> Accessor for Arc<T> {
fn metadata(&self) -> AccessorMetadata {
self.as_ref().metadata()
}
async fn create(&self, args: &OpCreate) -> Result<()> {
self.as_ref().create(args).await
}
Expand All @@ -133,3 +142,48 @@ impl<T: Accessor> Accessor for Arc<T> {
self.as_ref().list(args).await
}
}

/// Metadata for accessor, users can use this metadata to get information of underlying backend.
#[derive(Clone, Debug, Default)]
pub struct AccessorMetadata {
scheme: Scheme,
root: String,
name: String,
}

impl AccessorMetadata {
/// [`Scheme`] of backend.
pub fn scheme(&self) -> Scheme {
self.scheme
}

pub(crate) fn set_scheme(&mut self, scheme: Scheme) -> &mut Self {
self.scheme = scheme;
self
}

/// Root of backend, will be in format like `/path/to/dir/`
pub fn root(&self) -> &str {
&self.root
}

pub(crate) fn set_root(&mut self, root: &str) -> &mut Self {
self.root = root.to_string();
self
}

/// Name of backend, could be empty if underlying backend doesn't have namespace concept.
///
/// For example:
///
/// - name for `s3` => bucket name
/// - name for `azblob` => container name
pub fn name(&self) -> &str {
&self.name
}

pub(crate) fn set_name(&mut self, name: &str) -> &mut Self {
self.name = name.to_string();
self
}
}
9 changes: 9 additions & 0 deletions src/io_util/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,24 @@ impl From<CompressAlgorithm> for DecompressDecoder {
/// ```
#[derive(Debug)]
pub enum DecompressDecoder {
/// Decoder for [`CompressAlgorithm::Brotli`]
///
/// BrotliDecoder is too large that is 2592 bytes
/// Wrap into box to reduce the total size of the enum
Brotli(Box<BrotliDecoder>),
/// Decoder for [`CompressAlgorithm::Bz2`]
Bz2(BzDecoder),
/// Decoder for [`CompressAlgorithm::Deflate`]
Deflate(DeflateDecoder),
/// Decoder for [`CompressAlgorithm::Gzip`]
Gzip(GzipDecoder),
/// Decoder for [`CompressAlgorithm::Lzma`]
Lzma(LzmaDecoder),
/// Decoder for [`CompressAlgorithm::Xz`]
Xz(XzDecoder),
/// Decoder for [`CompressAlgorithm::Zlib`]
Zlib(ZlibDecoder),
/// Decoder for [`CompressAlgorithm::Zstd`]
Zstd(ZstdDecoder),
}

Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,13 @@
//! }
//! ```

// Make sure all our public APIs have docs.
#![warn(missing_docs)]

// Private module with public types, they will be accessed via `opendal::Xxxx`
mod accessor;
pub use accessor::Accessor;
pub use accessor::AccessorMetadata;

mod io;
pub use io::BytesRead;
Expand Down
6 changes: 6 additions & 0 deletions src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,12 @@ pub struct Metadata {
}

impl Metadata {
/// Whether this object is complete.
///
/// - If complete, this metadata is the full set. And we can't
/// get more information.
/// - If not, this metadata is not the full set. We can use
/// `Object::metadata()` to get more information.
pub fn complete(&self) -> bool {
self.complete
}
Expand Down
25 changes: 24 additions & 1 deletion src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;
use backon::Backoff;

use crate::Accessor;
use crate::AccessorMetadata;
use crate::Layer;
use crate::Object;

Expand Down Expand Up @@ -113,6 +114,29 @@ impl Operator {
self.accessor.clone()
}

/// Get metadata of underlying accessor.
///
/// # Examples
///
/// ```
/// # use std::sync::Arc;
/// # use anyhow::Result;
/// # use opendal::services::fs;
/// # use opendal::services::fs::Builder;
/// use opendal::Operator;
///
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// # let accessor = fs::Backend::build().finish().await?;
/// let op = Operator::new(accessor);
/// let meta = op.metadata();
/// # Ok(())
/// # }
/// ```
pub fn metadata(&self) -> AccessorMetadata {
self.accessor.metadata()
}

/// Create a new [`Object`][crate::Object] handle to take operations.
pub fn object(&self, path: &str) -> Object {
Object::new(self.inner(), path)
Expand All @@ -134,7 +158,6 @@ impl Operator {
/// # async fn main() -> Result<()> {
/// # let accessor = fs::Backend::build().finish().await?;
/// let op = Operator::new(accessor);
/// // All operations will be retried if the error is retryable
/// op.check().await?;
/// # Ok(())
/// # }
Expand Down
8 changes: 7 additions & 1 deletion src/scheme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::error::other;
use crate::error::BackendError;

/// Backends that OpenDAL supports
#[derive(Clone, Debug, PartialEq)]
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum Scheme {
/// [azblob][crate::services::azblob]: Azure Storage Blob services.
Azblob,
Expand All @@ -35,6 +35,12 @@ pub enum Scheme {
S3,
}

impl Default for Scheme {
fn default() -> Self {
Self::Memory
}
}

impl FromStr for Scheme {
type Err = io::Error;

Expand Down
16 changes: 14 additions & 2 deletions src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use reqsign::services::azure::storage::Signer;
use time::format_description::well_known::Rfc2822;
use time::OffsetDateTime;

use crate::accessor::AccessorMetadata;
use crate::error::other;
use crate::error::BackendError;
use crate::error::ObjectError;
Expand All @@ -61,6 +62,7 @@ use crate::BytesReader;
use crate::BytesWriter;
use crate::ObjectMode;
use crate::ObjectStreamer;
use crate::Scheme;

const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";

Expand Down Expand Up @@ -234,6 +236,7 @@ pub struct Backend {
}

impl Backend {
/// Create a builder for azblob.
pub fn build() -> Builder {
Builder::default()
}
Expand Down Expand Up @@ -263,7 +266,16 @@ impl Backend {

#[async_trait]
impl Accessor for Backend {
#[trace("read")]
fn metadata(&self) -> AccessorMetadata {
let mut am = AccessorMetadata::default();
am.set_scheme(Scheme::Azblob)
.set_root(&self.root)
.set_name(&self.container);

am
}

#[trace("create")]
async fn create(&self, args: &OpCreate) -> Result<()> {
increment_counter!("opendal_azblob_create_requests");
let p = self.get_abs_path(args.path());
Expand Down Expand Up @@ -728,7 +740,7 @@ async fn parse_error_response_with_body(
}
}

io::Error::new(
Error::new(
kind,
ObjectError::new(
op,
Expand Down
9 changes: 9 additions & 0 deletions src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use tokio::fs;

use super::error::parse_io_error;
use super::object_stream::Readdir;
use crate::accessor::AccessorMetadata;
use crate::error::other;
use crate::error::BackendError;
use crate::error::ObjectError;
Expand All @@ -49,6 +50,7 @@ use crate::ops::OpWrite;
use crate::Accessor;
use crate::BytesReader;
use crate::BytesWriter;
use crate::Scheme;

/// Builder for fs backend.
#[derive(Default, Debug)]
Expand Down Expand Up @@ -122,6 +124,13 @@ impl Backend {

#[async_trait]
impl Accessor for Backend {
fn metadata(&self) -> AccessorMetadata {
let mut am = AccessorMetadata::default();
am.set_scheme(Scheme::Fs).set_root(&self.root);

am
}

async fn create(&self, args: &OpCreate) -> Result<()> {
let path = self.get_abs_path(args.path());

Expand Down
11 changes: 10 additions & 1 deletion src/services/hdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ use crate::ops::OpRead;
use crate::ops::OpStat;
use crate::ops::OpWrite;
use crate::Accessor;
use crate::AccessorMetadata;
use crate::BytesReader;
use crate::BytesWriter;
use crate::Metadata;
use crate::ObjectMode;
use crate::ObjectStreamer;
use crate::Scheme;

/// Builder for hdfs services
#[derive(Debug, Default)]
Expand Down Expand Up @@ -179,7 +181,14 @@ impl Backend {

#[async_trait]
impl Accessor for Backend {
#[trace("read")]
fn metadata(&self) -> AccessorMetadata {
let mut am = AccessorMetadata::default();
am.set_scheme(Scheme::Hdfs).set_root(&self.root);

am
}

#[trace("create")]
async fn create(&self, args: &OpCreate) -> Result<()> {
let path = self.get_abs_path(args.path());

Expand Down
11 changes: 11 additions & 0 deletions src/services/memory/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ use crate::ops::OpRead;
use crate::ops::OpStat;
use crate::ops::OpWrite;
use crate::Accessor;
use crate::AccessorMetadata;
use crate::BytesReader;
use crate::BytesWriter;
use crate::Metadata;
use crate::Object;
use crate::ObjectMode;
use crate::Scheme;

/// Builder for memory backend
#[derive(Default)]
Expand Down Expand Up @@ -73,6 +75,15 @@ impl Backend {

#[async_trait]
impl Accessor for Backend {
fn metadata(&self) -> AccessorMetadata {
let mut am = AccessorMetadata::default();
am.set_scheme(Scheme::Memory)
.set_root("/")
.set_name("memory");

am
}

#[trace("create")]
async fn create(&self, args: &OpCreate) -> Result<()> {
let path = args.path();
Expand Down
15 changes: 14 additions & 1 deletion src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ use crate::ops::OpRead;
use crate::ops::OpStat;
use crate::ops::OpWrite;
use crate::Accessor;
use crate::AccessorMetadata;
use crate::BytesReader;
use crate::BytesWriter;
use crate::ObjectMode;
use crate::Scheme;

/// Allow constructing correct region endpoint if user gives a global endpoint.
static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str, &'static str>> = Lazy::new(|| {
Expand Down Expand Up @@ -508,6 +510,7 @@ impl Builder {
}
}

/// Finish the build process and create a new accessor.
pub async fn finish(&mut self) -> Result<Arc<dyn Accessor>> {
info!("backend build started: {:?}", &self);

Expand Down Expand Up @@ -669,6 +672,7 @@ pub struct Backend {
}

impl Backend {
/// Create a new builder for s3.
pub fn build() -> Builder {
Builder::default()
}
Expand Down Expand Up @@ -763,7 +767,16 @@ impl Backend {

#[async_trait]
impl Accessor for Backend {
#[trace("read")]
fn metadata(&self) -> AccessorMetadata {
let mut am = AccessorMetadata::default();
am.set_scheme(Scheme::S3)
.set_root(&self.root)
.set_name(&self.bucket);

am
}

#[trace("create")]
async fn create(&self, args: &OpCreate) -> Result<()> {
increment_counter!("opendal_s3_create_requests");
let p = self.get_abs_path(args.path());
Expand Down
Loading