Skip to content

feat(bin/oli): support cp to dir #6140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions .github/workflows/ci_bin_oli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ concurrency:
cancel-in-progress: true

jobs:
check_clippy:
check_clippy_and_test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand All @@ -45,7 +45,13 @@ jobs:
need-rocksdb: true
need-protoc: true
github-token: ${{ secrets.GITHUB_TOKEN }}

- name: Cargo clippy
- name: Run sccache-cache
uses: mozilla-actions/sccache-action@7d986dd989559c6ecdb630a3fd2557667be217ad
- name: Cargo clippy && test
working-directory: bin/oli
run: cargo clippy --all-targets --all-features -- -D warnings
env:
SCCACHE_GHA_ENABLED: "true"
RUSTC_WRAPPER: "sccache"
run: |
cargo clippy --all-targets --all-features -- -D warnings
cargo test --all-targets --all-features
129 changes: 115 additions & 14 deletions bin/oli/src/commands/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use anyhow::bail;
use futures::AsyncBufReadExt;
use indicatif::ProgressBar;
use indicatif::ProgressStyle;
use opendal::ErrorKind;
use opendal::Metadata;
use std::path::Path;

use anyhow::Context;
use anyhow::Result;
use futures::AsyncWriteExt;
use futures::TryStreamExt;
Expand Down Expand Up @@ -63,15 +66,53 @@ impl CopyCmd {
pub fn run(self) -> Result<()> {
make_tokio_runtime(1).block_on(self.do_run())
}

async fn do_run(self) -> Result<()> {
let cfg = Config::load(&self.config_params.config)?;

let (src_op, src_path) = cfg.parse_location(&self.source)?;

let (dst_op, dst_path) = cfg.parse_location(&self.destination)?;

let final_dst_path = match dst_op.stat(&dst_path).await {
Ok(dst_meta) if dst_meta.mode().is_dir() => {
if self.recursive {
dst_path.clone()
} else if let Some(filename) = Path::new(&src_path).file_name() {
Path::new(&dst_path)
.join(filename)
.to_string_lossy()
.to_string()
} else {
bail!(
"Cannot copy source '{}' into directory '{}': Source has no filename.",
src_path,
dst_path
);
}
}
Ok(_) => {
// Destination exists but is a file. Overwrite it (non-recursive)
// or error (recursive, handled below).
if self.recursive {
bail!(
"Recursive copy destination '{}' exists but is not a directory.",
dst_path
);
}
dst_path.clone()
}
Err(e) if e.kind() == ErrorKind::NotFound => dst_path.clone(),
Err(e) => {
return Err(e.into());
}
};

if !self.recursive {
let mut dst_w = dst_op.writer(&dst_path).await?.into_futures_async_write();
// Non-recursive copy: Use the final_dst_path directly.
let mut dst_w = dst_op
.writer(&final_dst_path)
.await?
.into_futures_async_write();
let src_meta = src_op.stat(&src_path).await?;
let reader = src_op.reader_with(&src_path).chunk(8 * 1024 * 1024).await?;
let buf_reader = reader
Expand All @@ -84,28 +125,88 @@ impl CopyCmd {
return Ok(());
}

let dst_root = Path::new(&dst_path);
// Recursive copy: Ensure the base destination directory exists or create it.
// Note: final_dst_path here refers to the original dst_path if it was a dir or didn't exist.
match dst_op.stat(&final_dst_path).await {
Ok(meta) if meta.mode().is_dir() => {
// Base destination directory exists.
}
Ok(_) => {
bail!(
"Recursive copy destination '{}' exists but is not a directory.",
final_dst_path
);
}
Err(e) if e.kind() == ErrorKind::NotFound => {
let mut path_to_create = final_dst_path.clone();
if !path_to_create.ends_with('/') {
path_to_create.push('/');
}
dst_op.create_dir(&path_to_create).await?;
}
Err(e) => {
// Another error occurred trying to stat the base destination.
return Err(e.into());
}
}

// Proceed with recursive copy logic. dst_root is the target directory.
let dst_root = Path::new(&final_dst_path);
let mut ds = src_op.lister_with(&src_path).recursive(true).await?;
let prefix = src_path.strip_prefix('/').unwrap_or(src_path.as_str());

while let Some(de) = ds.try_next().await? {
let meta = de.metadata();
let depath = de.path();

// Calculate relative path using Path::strip_prefix
let src_root_path = Path::new(&src_path);
let entry_path = Path::new(depath);
let relative_path = entry_path.strip_prefix(src_root_path).with_context(|| {
format!(
"Internal error: Lister path '{}' does not start with source path '{}'",
depath, src_path
)
})?; // relative_path is a &Path

let current_dst_path_path = dst_root.join(relative_path);
let current_dst_path = current_dst_path_path.to_string_lossy().to_string();

if meta.mode().is_dir() {
let mut dir_path_to_create = current_dst_path.clone();
if !dir_path_to_create.ends_with('/') {
dir_path_to_create.push('/');
}
dst_op.create_dir(&dir_path_to_create).await?;
continue;
}
let depath = de.path();
let fp = depath
.strip_prefix('/')
.unwrap_or(depath)
.strip_prefix(prefix)
.expect("invalid path");
let reader = src_op.reader_with(de.path()).chunk(8 * 1024 * 1024).await?;

// Explicitly stat the source file to get fresh metadata
let fresh_meta = src_op.stat(depath).await.with_context(|| {
format!(
"Failed to stat source file '{}' before recursive copy",
depath
)
})?;

// Use the Path object `current_dst_path_path` to check parent
if let Some(parent_path) = current_dst_path_path.parent() {
if parent_path != dst_root {
let mut parent_dir_string = parent_path.to_string_lossy().into_owned();
if !parent_dir_string.ends_with('/') {
parent_dir_string.push('/');
}
dst_op.create_dir(&parent_dir_string).await?;
}
}

let reader = src_op.reader_with(depath).chunk(8 * 1024 * 1024).await?;
let buf_reader = reader
.into_futures_async_read(0..meta.content_length())
.into_futures_async_read(0..fresh_meta.content_length())
.await?;

let copy_progress = CopyProgress::new(meta, de.path().to_string());
let copy_progress = CopyProgress::new(&fresh_meta, depath.to_string());
let mut writer = dst_op
.writer(&dst_root.join(fp).to_string_lossy())
.writer(&current_dst_path)
.await?
.into_futures_async_write();

Expand Down
105 changes: 98 additions & 7 deletions bin/oli/tests/integration/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,106 @@ async fn test_cp_for_path_in_current_dir() -> Result<()> {
.current_dir(dir.path())
.assert()
.success();
assert_snapshot!(directory_snapshot(dir.path()).with_content(true), @r"
+-------------------------------------+
| Path Type Content |
+=====================================+
| [TEMP_DIR] DIR |
| [TEMP_DIR]/dst.txt FILE hello |
| [TEMP_DIR]/src.txt FILE hello |
+-------------------------------------+
");
Ok(())
}

#[tokio::test]
async fn test_cp_file_to_existing_dir() -> Result<()> {
let dir = tempdir()?;
let source_dir = dir.path().join("source");
let dest_dir = dir.path().join("dest");
fs::create_dir_all(&source_dir)?;
fs::create_dir_all(&dest_dir)?;

let source_file_name = "test_file.txt";
let source_file_path = source_dir.join(source_file_name);
let source_content = "hello";
fs::write(&source_file_path, source_content)?;

// Use paths directly as arguments for local fs operations
let source_arg = source_file_path.to_str().unwrap();
let dest_arg = dest_dir.to_str().unwrap();

oli()
.arg("cp")
.arg(source_arg)
.arg(dest_arg)
.assert()
.success();

assert_snapshot!(directory_snapshot(dir.path()).with_content(true), @r"
+--------------------------------------------------+
| Path Type Content |
+==================================================+
| [TEMP_DIR] DIR |
| [TEMP_DIR]/dest DIR |
| [TEMP_DIR]/dest/test_file.txt FILE hello |
| [TEMP_DIR]/source DIR |
| [TEMP_DIR]/source/test_file.txt FILE hello |
+--------------------------------------------------+
");
Ok(())
}

#[tokio::test]
async fn test_recursive_cp_dir_to_new_dir() -> Result<()> {
let dir = tempdir()?;
let source_base_dir = dir.path().join("source_root");
let dest_base_dir = dir.path().join("dest_root");

// Create source directory structure
let source_dir = source_base_dir.join("source_dir");
fs::create_dir_all(&source_dir)?;

let file1_path = source_dir.join("file1.txt");
fs::write(&file1_path, "file1_content")?;

let sub_dir_path = source_dir.join("sub_dir");
fs::create_dir(&sub_dir_path)?;

let file2_path = sub_dir_path.join("file2.txt");
fs::write(&file2_path, "file2_content")?;

let file3_path = source_dir.join("file3.txt");
fs::write(&file3_path, "file3_content")?;

// Define destination path (should not exist yet)
let dest_dir_path = dest_base_dir.join("dest_dir");

oli()
.arg("cp")
.arg("-r")
.arg(source_dir.to_str().unwrap())
.arg(dest_dir_path.to_str().unwrap())
.assert()
.success();
assert_snapshot!(directory_snapshot(dir.path()).with_content(true), @r"
+----------------------------------------------------+
| Path Type Size (bytes) Content |
+====================================================+
| [TEMP_DIR] DIR 128 |
| [TEMP_DIR]/dst.txt FILE 5 hello |
| [TEMP_DIR]/src.txt FILE 5 hello |
+----------------------------------------------------+
+----------------------------------------------------------------------------+
| Path Type Content |
+============================================================================+
| [TEMP_DIR] DIR |
| [TEMP_DIR]/dest_root DIR |
| [TEMP_DIR]/dest_root/dest_dir DIR |
| [TEMP_DIR]/dest_root/dest_dir/file1.txt FILE file1_content |
| [TEMP_DIR]/dest_root/dest_dir/file3.txt FILE file3_content |
| [TEMP_DIR]/dest_root/dest_dir/sub_dir DIR |
| [TEMP_DIR]/dest_root/dest_dir/sub_dir/file2.txt FILE file2_content |
| [TEMP_DIR]/source_root DIR |
| [TEMP_DIR]/source_root/source_dir DIR |
| [TEMP_DIR]/source_root/source_dir/file1.txt FILE file1_content |
| [TEMP_DIR]/source_root/source_dir/file3.txt FILE file3_content |
| [TEMP_DIR]/source_root/source_dir/sub_dir DIR |
| [TEMP_DIR]/source_root/source_dir/sub_dir/file2.txt FILE file2_content |
+----------------------------------------------------------------------------+
");
Ok(())
}
19 changes: 8 additions & 11 deletions bin/oli/tests/integration/ls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,14 @@ async fn test_basic_ls() -> Result<()> {
fs::write(&dst_path_3, expect)?;

let current_dir = dir.path().to_string_lossy().to_string() + "/";
assert_cmd_snapshot!(oli().arg("ls").arg(current_dir), @r"
success: true
exit_code: 0
----- stdout -----
[TEMP_DIR]/
dst_1.txt
dst_2.txt
dst_3.txt

----- stderr -----
");
let t = oli().arg("ls").arg(current_dir).assert().success();
let output = String::from_utf8(t.get_output().stdout.clone())?;
let mut output_list = output
.split("\n")
.filter(|x| !x.starts_with(".tmp") && !x.is_empty())
.collect::<Vec<_>>();
output_list.sort();
assert_eq!(output_list, ["dst_1.txt", "dst_2.txt", "dst_3.txt"]);

Ok(())
}
Loading
Loading