Skip to content

Commit 86f1482

Browse files
feat(bin/oli): support tee (#6194)
* init tee * pass the test * support pipe * rm source * support -a
1 parent 38ebddc commit 86f1482

File tree

4 files changed

+254
-0
lines changed

4 files changed

+254
-0
lines changed

bin/oli/src/commands/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub mod ls;
2424
pub mod mv;
2525
pub mod rm;
2626
pub mod stat;
27+
pub mod tee;
2728

2829
#[derive(Debug, clap::Subcommand)]
2930
pub enum OliSubcommand {
@@ -34,6 +35,7 @@ pub enum OliSubcommand {
3435
Rm(rm::RmCmd),
3536
Stat(stat::StatCmd),
3637
Mv(mv::MoveCmd),
38+
Tee(tee::TeeCmd),
3739
}
3840

3941
impl OliSubcommand {
@@ -46,6 +48,7 @@ impl OliSubcommand {
4648
Self::Rm(cmd) => cmd.run(),
4749
Self::Stat(cmd) => cmd.run(),
4850
Self::Mv(cmd) => cmd.run(),
51+
Self::Tee(cmd) => cmd.run(),
4952
}
5053
}
5154
}

bin/oli/src/commands/tee.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::config::Config;
19+
use crate::make_tokio_runtime;
20+
use crate::params::config::ConfigParams;
21+
use anyhow::Result;
22+
use futures::AsyncWriteExt;
23+
use tokio::io::AsyncReadExt as TokioAsyncReadExt;
24+
use tokio::io::AsyncWriteExt as TokioAsyncWriteExt;
25+
#[derive(Debug, clap::Parser)]
26+
#[command(
27+
name = "tee",
28+
about = "Read from standard input and write to destination and stdout",
29+
disable_version_flag = true
30+
)]
31+
pub struct TeeCmd {
32+
#[command(flatten)]
33+
pub config_params: ConfigParams,
34+
#[arg()]
35+
pub destination: String,
36+
37+
#[arg(short, long, help = "Append to the given FILEs, do not overwrite")]
38+
pub append: bool,
39+
}
40+
41+
impl TeeCmd {
42+
pub fn run(self) -> Result<()> {
43+
make_tokio_runtime(1).block_on(self.do_run())
44+
}
45+
46+
async fn do_run(self) -> Result<()> {
47+
let cfg = Config::load(&self.config_params.config)?;
48+
49+
let (dst_op, dst_path) = cfg.parse_location(&self.destination)?;
50+
51+
let mut writer = if self.append {
52+
dst_op
53+
.writer_with(&dst_path)
54+
.append(true)
55+
.await?
56+
.into_futures_async_write()
57+
} else {
58+
dst_op.writer(&dst_path).await?.into_futures_async_write()
59+
};
60+
let mut stdout = tokio::io::stdout();
61+
62+
let mut buf = vec![0; 8 * 1024 * 1024]; // 8MB buffer
63+
64+
let mut stdin = tokio::io::stdin();
65+
loop {
66+
let n = stdin.read(&mut buf).await?;
67+
if n == 0 {
68+
break;
69+
}
70+
71+
// Write to destination
72+
writer.write_all(&buf[..n]).await?;
73+
// Write to stdout
74+
stdout.write_all(&buf[..n]).await?;
75+
}
76+
77+
writer.close().await?;
78+
stdout.flush().await?;
79+
80+
Ok(())
81+
}
82+
}

bin/oli/tests/integration/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ mod ls;
2626
mod mv;
2727
mod rm;
2828
mod stat;
29+
mod tee;
2930

3031
pub mod test_utils;

bin/oli/tests/integration/tee.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::test_utils::*;
19+
use anyhow::Result;
20+
use std::fs;
21+
use std::io::Write;
22+
use tempfile::TempDir;
23+
24+
#[tokio::test]
25+
async fn test_tee_destination_already_exists() -> Result<()> {
26+
let temp_dir = TempDir::new()?;
27+
let dest_path = temp_dir.path().join("dest.txt");
28+
29+
let source_content = "Source content";
30+
let initial_dest_content = "Initial dest content";
31+
32+
fs::write(&dest_path, initial_dest_content)?;
33+
34+
let mut cmd = oli();
35+
cmd.arg("tee").arg(dest_path.to_str().unwrap());
36+
37+
cmd.stdin(std::process::Stdio::piped());
38+
cmd.stdout(std::process::Stdio::piped());
39+
let mut child = cmd.spawn()?;
40+
let mut stdin = child.stdin.take().expect("Failed to open stdin");
41+
42+
let content_to_write = source_content.to_string();
43+
std::thread::spawn(move || {
44+
stdin
45+
.write_all(content_to_write.as_bytes())
46+
.expect("Failed to write to stdin");
47+
});
48+
49+
let output = child.wait_with_output()?;
50+
assert!(output.status.success());
51+
52+
let stdout_output = String::from_utf8(output.stdout.clone())?;
53+
assert_eq!(stdout_output, source_content);
54+
let dest_content_after_tee = fs::read_to_string(&dest_path)?;
55+
assert_eq!(dest_content_after_tee, source_content);
56+
57+
Ok(())
58+
}
59+
60+
#[tokio::test]
61+
async fn test_tee_stdin() -> Result<()> {
62+
let temp_dir = TempDir::new()?;
63+
let dest_path = temp_dir.path().join("dest_stdin.txt");
64+
65+
let test_content = "Hello from stdin!";
66+
67+
let mut cmd = oli();
68+
cmd.arg("tee").arg(&dest_path);
69+
70+
cmd.stdin(std::process::Stdio::piped());
71+
cmd.stdout(std::process::Stdio::piped());
72+
let mut child = cmd.spawn()?;
73+
let mut stdin = child.stdin.take().expect("Failed to open stdin");
74+
75+
let content_to_write = test_content.to_string();
76+
std::thread::spawn(move || {
77+
stdin
78+
.write_all(content_to_write.as_bytes())
79+
.expect("Failed to write to stdin");
80+
});
81+
82+
let output = child.wait_with_output()?;
83+
84+
assert!(output.status.success());
85+
86+
let stdout_output = String::from_utf8(output.stdout.clone())?;
87+
assert_eq!(stdout_output, test_content);
88+
let dest_content = fs::read_to_string(&dest_path)?;
89+
assert_eq!(dest_content, test_content);
90+
91+
Ok(())
92+
}
93+
94+
#[test]
95+
fn test_tee_non_existing_file() -> Result<()> {
96+
let temp_dir = TempDir::new()?;
97+
let dst_path = temp_dir.path().join("non_existing_file.txt");
98+
let dst_path_str = dst_path.as_os_str().to_str().unwrap();
99+
100+
let mut cmd: assert_cmd::Command = std::process::Command::cargo_bin("oli")?.into();
101+
cmd.args(["tee", dst_path_str]);
102+
cmd.write_stdin("Hello, world!");
103+
cmd.assert().success();
104+
105+
let content = fs::read_to_string(dst_path)?;
106+
assert_eq!(content, "Hello, world!");
107+
108+
Ok(())
109+
}
110+
111+
#[test]
112+
fn test_tee_append_succeed() -> Result<()> {
113+
let temp_dir = TempDir::new()?;
114+
let dst_path = temp_dir.path().join("test_append.txt");
115+
let dst_path_str = dst_path.as_os_str().to_str().unwrap();
116+
117+
// Initial content
118+
fs::write(&dst_path, "Hello, ")?;
119+
120+
let mut cmd: assert_cmd::Command = std::process::Command::cargo_bin("oli")?.into();
121+
cmd.args(["tee", "-a", dst_path_str]);
122+
cmd.write_stdin("world!");
123+
cmd.assert().success();
124+
125+
let content = fs::read_to_string(dst_path)?;
126+
assert_eq!(content, "Hello, world!");
127+
128+
Ok(())
129+
}
130+
131+
#[test]
132+
fn test_tee_append_file_not_found() -> Result<()> {
133+
let temp_dir = TempDir::new()?;
134+
let file_path = temp_dir.path().join("test_append_not_found.txt");
135+
let file_path_str = file_path.to_str().unwrap();
136+
137+
let mut cmd: assert_cmd::Command = std::process::Command::cargo_bin("oli")?.into();
138+
cmd.arg("tee")
139+
.arg("-a")
140+
.arg(file_path_str)
141+
.write_stdin("append data")
142+
.assert()
143+
.success();
144+
145+
let content = fs::read_to_string(file_path)?;
146+
assert_eq!(content, "append data");
147+
148+
Ok(())
149+
}
150+
151+
#[test]
152+
fn test_tee_overwrite_existing_file() -> Result<()> {
153+
let temp_dir = TempDir::new()?;
154+
let file_path = temp_dir.path().join("test_overwrite.txt");
155+
let file_path_str = file_path.to_str().unwrap();
156+
157+
// Create an existing file with some content
158+
fs::write(&file_path, "initial data")?;
159+
160+
let mut cmd: assert_cmd::Command = std::process::Command::cargo_bin("oli")?.into();
161+
cmd.arg("tee").arg(file_path_str).write_stdin("new data");
162+
cmd.assert().success();
163+
164+
let content = fs::read_to_string(file_path)?;
165+
assert_eq!(content, "new data");
166+
167+
Ok(())
168+
}

0 commit comments

Comments
 (0)