Skip to content

feat: Implement MultipartObject public API #574

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use flagset::flags;
use flagset::FlagSet;

use crate::error::new_unsupported_object_error;
use crate::multipart::ObjectPart;
use crate::ops::OpAbortMultipart;
use crate::ops::OpCompleteMultipart;
use crate::ops::OpCreate;
Expand Down Expand Up @@ -161,7 +162,7 @@ pub trait Accessor: Send + Sync + Debug {
}

/// Invoke the `write_multipart` operation on the specified path.
async fn write_multipart(&self, args: &OpWriteMultipart, r: BytesReader) -> Result<u64> {
async fn write_multipart(&self, args: &OpWriteMultipart, r: BytesReader) -> Result<ObjectPart> {
let (_, _) = (args, r);

return Err(new_unsupported_object_error(
Expand Down Expand Up @@ -222,7 +223,7 @@ impl<T: Accessor> Accessor for Arc<T> {
async fn create_multipart(&self, args: &OpCreateMultipart) -> Result<String> {
self.as_ref().create_multipart(args).await
}
async fn write_multipart(&self, args: &OpWriteMultipart, r: BytesReader) -> Result<u64> {
async fn write_multipart(&self, args: &OpWriteMultipart, r: BytesReader) -> Result<ObjectPart> {
self.as_ref().write_multipart(args, r).await
}
async fn complete_multipart(&self, args: &OpCompleteMultipart) -> Result<()> {
Expand Down
5 changes: 3 additions & 2 deletions src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use log::debug;
use log::error;
use log::warn;

use crate::multipart::ObjectPart;
use crate::ops::OpAbortMultipart;
use crate::ops::OpCompleteMultipart;
use crate::ops::OpCreate;
Expand Down Expand Up @@ -58,7 +59,7 @@ use crate::Scheme;
///
/// We should migrate to log's kv api after it's ready.
///
/// Tracking issue: https://github.com/rust-lang/log/issues/328
/// Tracking issue: <https://github.com/rust-lang/log/issues/328>
///
/// # Examples
///
Expand Down Expand Up @@ -433,7 +434,7 @@ impl Accessor for LoggingAccessor {
})
}

async fn write_multipart(&self, args: &OpWriteMultipart, r: BytesReader) -> Result<u64> {
async fn write_multipart(&self, args: &OpWriteMultipart, r: BytesReader) -> Result<ObjectPart> {
debug!(
target: "opendal::services",
"service={} operation={} path={} upload_id={} part_number={:?} size={:?} -> started",
Expand Down
3 changes: 2 additions & 1 deletion src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use async_trait::async_trait;
use metrics::histogram;
use metrics::increment_counter;

use crate::multipart::ObjectPart;
use crate::ops::OpAbortMultipart;
use crate::ops::OpCompleteMultipart;
use crate::ops::OpCreate;
Expand Down Expand Up @@ -272,7 +273,7 @@ impl Accessor for MetricsAccessor {
result
}

async fn write_multipart(&self, args: &OpWriteMultipart, r: BytesReader) -> Result<u64> {
async fn write_multipart(&self, args: &OpWriteMultipart, r: BytesReader) -> Result<ObjectPart> {
increment_counter!(
METRIC_REQUESTS_TOTAL,
LABEL_SERVICE => self.meta.scheme().into_static(),
Expand Down
3 changes: 2 additions & 1 deletion src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use async_trait::async_trait;
use backon::Backoff;
use backon::Retryable;

use crate::multipart::ObjectPart;
use crate::ops::OpAbortMultipart;
use crate::ops::OpCompleteMultipart;
use crate::ops::OpCreate;
Expand Down Expand Up @@ -152,7 +153,7 @@ where
.with_error_fn(|e| e.kind() == ErrorKind::Interrupted)
.await
}
async fn write_multipart(&self, args: &OpWriteMultipart, r: BytesReader) -> Result<u64> {
async fn write_multipart(&self, args: &OpWriteMultipart, r: BytesReader) -> Result<ObjectPart> {
// Write can't retry, until can reset this reader.
self.inner.write_multipart(args, r).await
}
Expand Down
3 changes: 2 additions & 1 deletion src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use async_trait::async_trait;

use crate::multipart::ObjectPart;
use crate::ops::OpAbortMultipart;
use crate::ops::OpCompleteMultipart;
use crate::ops::OpCreate;
Expand Down Expand Up @@ -111,7 +112,7 @@ impl Accessor for TracingAccessor {
}

#[tracing::instrument(skip(r))]
async fn write_multipart(&self, args: &OpWriteMultipart, r: BytesReader) -> Result<u64> {
async fn write_multipart(&self, args: &OpWriteMultipart, r: BytesReader) -> Result<ObjectPart> {
self.inner.write_multipart(args, r).await
}

Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ pub use object::Object;
pub use object::ObjectMetadata;
pub use object::ObjectMode;

mod multipart;
pub use multipart::ObjectMultipart;
pub use multipart::ObjectPart;

mod scheme;
pub use scheme::Scheme;

Expand Down
120 changes: 120 additions & 0 deletions src/multipart.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::Result;
use std::sync::Arc;

use futures::io::Cursor;

use crate::ops::OpAbortMultipart;
use crate::ops::OpCompleteMultipart;
use crate::ops::OpWriteMultipart;
use crate::path::normalize_path;
use crate::Accessor;
use crate::Object;

/// ObjectMultipart represent an ongoing multipart upload.
///
/// # Process
///
/// ```txt
/// create
/// -> write
/// -> complete to build a normal Object
/// -> abort to cancel upload and delete all existing parts
/// ```
///
/// # Notes
///
/// Before [`ObjectMultipart::complete`] has been called, we can't read any content from this multipart object.
pub struct ObjectMultipart {
acc: Arc<dyn Accessor>,
path: String,
upload_id: String,
}

impl ObjectMultipart {
/// Build a new MultipartObject.
pub fn new(acc: Arc<dyn Accessor>, path: &str, upload_id: &str) -> Self {
Self {
acc,
path: normalize_path(path),
upload_id: upload_id.to_string(),
}
}

/// Write a new [`ObjectPart`] with specified part number.
pub async fn write(&self, part_number: usize, bs: impl Into<Vec<u8>>) -> Result<ObjectPart> {
let bs = bs.into();

let op = OpWriteMultipart::new(&self.path, &self.upload_id, part_number, bs.len() as u64)?;
let r = Cursor::new(bs);
let part = self.acc.write_multipart(&op, Box::new(r)).await?;
Ok(part)
}

/// Complete multipart uploads with specified parts.
///
/// # Notes
///
/// - This operation will complete and finish this upload.
/// - This operation will concat input parts to build a new object.
/// - Input parts order is **SENSITIVE**, please make sure the order is correct.
pub async fn complete(&self, parts: Vec<ObjectPart>) -> Result<Object> {
let op = OpCompleteMultipart::new(&self.path, &self.upload_id, parts)?;
self.acc.complete_multipart(&op).await?;

Ok(Object::new(self.acc.clone(), &self.path))
}

/// Abort multipart uploads.
///
/// # Notes
///
/// - This operation will cancel this upload.
/// - This operation will remove all parts that already uploaded.
/// - This operation will return `succeeded` even when object or upload_id not exist.
pub async fn abort(&self) -> Result<()> {
let op = OpAbortMultipart::new(&self.path, &self.upload_id)?;
self.acc.abort_multipart(&op).await
}
}

/// ObjectPart is generated by `write_multipart` operation, carries
/// required information for `complete_multipart`.
#[derive(Debug, Clone, Default)]
pub struct ObjectPart {
part_number: usize,
etag: String,
}

impl ObjectPart {
/// Create a new part
pub fn new(part_number: usize, etag: &str) -> Self {
Self {
part_number,
etag: etag.to_string(),
}
}

/// Get part_number from part.
pub fn part_number(&self) -> usize {
self.part_number
}

/// Get etag from part.
pub fn etag(&self) -> &str {
&self.etag
}
}
16 changes: 15 additions & 1 deletion src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ use crate::io_util::CompressAlgorithm;
#[cfg(feature = "compress")]
use crate::io_util::DecompressReader;
use crate::io_util::SeekableReader;
use crate::multipart::ObjectMultipart;
use crate::ops::BytesRange;
use crate::ops::OpCreate;
use crate::ops::OpCreateMultipart;
use crate::ops::OpDelete;
use crate::ops::OpList;
use crate::ops::OpPresign;
Expand Down Expand Up @@ -525,7 +527,7 @@ impl Object {
let bs = bs.into();

let op = OpWrite::new(self.path(), bs.len() as u64)?;
let r = io::Cursor::new(bs);
let r = Cursor::new(bs);
let _ = self.acc.write(&op, Box::new(r)).await?;
Ok(())
}
Expand Down Expand Up @@ -750,6 +752,18 @@ impl Object {

self.acc.presign(&op)
}

/// Construct a multipart with existing upload id.
pub fn to_multipart(&self, upload_id: &str) -> ObjectMultipart {
ObjectMultipart::new(self.acc.clone(), &self.path, upload_id)
}

/// Create a new multipart for current path.
pub async fn create_multipart(&self) -> Result<ObjectMultipart> {
let op = OpCreateMultipart::new(self.path())?;
let upload_id = self.acc.create_multipart(&op).await?;
Ok(self.to_multipart(&upload_id))
}
}

/// Metadata carries all object metadata.
Expand Down
1 change: 0 additions & 1 deletion src/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ pub use op_multipart::OpAbortMultipart;
pub use op_multipart::OpCompleteMultipart;
pub use op_multipart::OpCreateMultipart;
pub use op_multipart::OpWriteMultipart;
pub use op_multipart::Part;

mod bytes_range;
pub use bytes_range::BytesRange;
35 changes: 4 additions & 31 deletions src/ops/op_multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use anyhow::anyhow;

use crate::error::other;
use crate::error::ObjectError;
use crate::multipart::ObjectPart;
use crate::ops::Operation;

/// Args for `create_multipart` operation.
Expand Down Expand Up @@ -112,14 +113,14 @@ impl OpWriteMultipart {
pub struct OpCompleteMultipart {
path: String,
upload_id: String,
parts: Vec<Part>,
parts: Vec<ObjectPart>,
}

impl OpCompleteMultipart {
/// Create a new `OpCompleteMultipart`.
///
/// If input path is not a file path, an error will be returned.
pub fn new(path: &str, upload_id: &str, parts: Vec<Part>) -> Result<Self> {
pub fn new(path: &str, upload_id: &str, parts: Vec<ObjectPart>) -> Result<Self> {
if path.ends_with('/') {
return Err(other(ObjectError::new(
Operation::CompleteMultipart,
Expand All @@ -146,7 +147,7 @@ impl OpCompleteMultipart {
}

/// Get parts from option.
pub fn parts(&self) -> &[Part] {
pub fn parts(&self) -> &[ObjectPart] {
&self.parts
}
}
Expand Down Expand Up @@ -189,31 +190,3 @@ impl OpAbortMultipart {
&self.upload_id
}
}

/// Part that generated by `write_multipart` operation, carries
/// required information for `complete_multipart`.
#[derive(Debug, Clone, Default)]
pub struct Part {
part_number: usize,
etag: String,
}

impl Part {
/// Create a new part
pub fn new(part_number: usize, etag: &str) -> Self {
Self {
part_number,
etag: etag.to_string(),
}
}

/// Get part_number from part.
pub fn part_number(&self) -> usize {
self.part_number
}

/// Get etag from part.
pub fn etag(&self) -> &str {
&self.etag
}
}
2 changes: 1 addition & 1 deletion src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct Builder {
/// bucket name
bucket: String,
/// endpoint URI of GCS service,
/// default is "https://storage.googleapis.com"
/// default is `https://storage.googleapis.com`
endpoint: Option<String>,

/// credential string for GCS service
Expand Down
9 changes: 6 additions & 3 deletions src/services/obs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ use std::sync::Arc;
use anyhow::anyhow;
use async_trait::async_trait;
use http::header::CONTENT_LENGTH;
use http::Request;
use http::StatusCode;
use http::Uri;
use http::{Request, StatusCode};
use isahc::{AsyncBody, AsyncReadResponseExt};
use isahc::AsyncBody;
use isahc::AsyncReadResponseExt;
use log::debug;
use log::info;
use reqsign::services::huaweicloud::obs::Signer;
Expand All @@ -32,15 +34,16 @@ use super::error::parse_error;
use crate::error::other;
use crate::error::BackendError;
use crate::error::ObjectError;
use crate::http_util::new_request_build_error;
use crate::http_util::new_request_send_error;
use crate::http_util::new_request_sign_error;
use crate::http_util::new_response_consume_error;
use crate::http_util::parse_content_length;
use crate::http_util::parse_error_response;
use crate::http_util::parse_etag;
use crate::http_util::parse_last_modified;
use crate::http_util::percent_encode_path;
use crate::http_util::HttpClient;
use crate::http_util::{new_request_build_error, new_response_consume_error};
use crate::io_util::unshared_reader;
use crate::ops::BytesRange;
use crate::ops::OpCreate;
Expand Down
Loading