Skip to content

Commit efe0b8a

Browse files
authored
bench: Add parallel_read bench (#100)
1 parent 61c483c commit efe0b8a

File tree

1 file changed

+42
-10
lines changed

1 file changed

+42
-10
lines changed

benches/ops/ops.rs

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,11 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::io::SeekFrom;
16-
1715
use criterion::BenchmarkId;
1816
use criterion::Criterion;
17+
use futures::future::join_all;
1918
use futures::io;
2019
use futures::io::BufReader;
21-
use futures::AsyncSeekExt;
2220
use opendal::Operator;
2321
use opendal_test::services::fs;
2422
use opendal_test::services::s3;
@@ -71,13 +69,15 @@ pub fn bench(c: &mut Criterion) {
7169
.iter(|| bench_buf_read(input.0.clone(), input.1))
7270
},
7371
);
72+
73+
group.throughput(criterion::Throughput::Bytes((TOTAL_SIZE / 2) as u64));
7474
group.bench_with_input(
75-
BenchmarkId::new("seek_read", &path),
75+
BenchmarkId::new("range_read", &path),
7676
&(op.clone(), &path),
7777
|b, input| {
78-
let pos = rng.gen_range(0..TOTAL_SIZE - BATCH_SIZE) as u64;
78+
let pos = rng.gen_range(0..(TOTAL_SIZE / 2) as u64) as u64;
7979
b.to_async(&runtime)
80-
.iter(|| bench_seek_read(input.0.clone(), input.1, pos))
80+
.iter(|| bench_range_read(input.0.clone(), input.1, pos))
8181
},
8282
);
8383
group.throughput(criterion::Throughput::Bytes((TOTAL_SIZE / 2) as u64));
@@ -99,6 +99,40 @@ pub fn bench(c: &mut Criterion) {
9999
.iter(|| bench_write(input.0.clone(), input.1, input.2.clone()))
100100
},
101101
);
102+
103+
// runtime
104+
const RUNTIME_THREAD: usize = 4;
105+
let mut builder = tokio::runtime::Builder::new_multi_thread();
106+
builder.enable_all().worker_threads(RUNTIME_THREAD);
107+
108+
let runtime = builder.build().unwrap();
109+
for parallel in [2, 4, 6, 8, 10, 12, 16] {
110+
group.throughput(criterion::Throughput::Bytes(
111+
parallel as u64 * TOTAL_SIZE as u64 / 2,
112+
));
113+
group.bench_with_input(
114+
BenchmarkId::new(&format!("parallel_read_{}", parallel), &path),
115+
&(op.clone(), &path, content.clone()),
116+
|b, input| {
117+
let pos = rng.gen_range(0..(TOTAL_SIZE / 2) as u64) as u64;
118+
b.to_async(&runtime).iter(|| {
119+
let futures = (0..parallel)
120+
.map(|_| async {
121+
bench_range_read(input.0.clone(), input.1, pos).await;
122+
let mut d = 0;
123+
// mock same little cpu work
124+
for c in pos..pos + 100u64 {
125+
d += c & 0x1f1f1f1f + c % 256;
126+
}
127+
let _ = d;
128+
})
129+
.collect::<Vec<_>>();
130+
join_all(futures)
131+
})
132+
},
133+
);
134+
}
135+
102136
group.finish();
103137
}
104138
}
@@ -115,10 +149,8 @@ pub async fn bench_read(op: Operator, path: &str) {
115149
io::copy(&mut r, &mut io::sink()).await.unwrap();
116150
}
117151

118-
pub async fn bench_seek_read(op: Operator, path: &str, pos: u64) {
119-
let mut r = op.object(path).limited_reader(TOTAL_SIZE as u64);
120-
r.seek(SeekFrom::Start(pos)).await.expect("seek");
121-
r.seek(SeekFrom::Start(0)).await.expect("seek");
152+
pub async fn bench_range_read(op: Operator, path: &str, pos: u64) {
153+
let mut r = op.object(path).range_reader(pos, (TOTAL_SIZE / 2) as u64);
122154
io::copy(&mut r, &mut io::sink()).await.unwrap();
123155
}
124156

0 commit comments

Comments
 (0)