Skip to content

Commit f4b4cd6

Browse files
committed
feat: Add IPFS backend (#355)
1 parent a1e5168 commit f4b4cd6

File tree

12 files changed

+602
-1
lines changed

12 files changed

+602
-1
lines changed

.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ OPENDAL_AZBLOB_ACCOUNT_KEY=<account_key>
2020
OPENDAL_HDFS_TEST=false
2121
OPENDAL_HDFS_ROOT=/path/to/dir
2222
OPENDAL_HDFS_NAME_NODE=<name_node>
23+
# ipfs
24+
OPENDAL_IPFS_TEST=false
25+
OPENDAL_IPFS_ROOT=/path/to/dir

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ reqsign = "0.3.0"
7575
serde = { version = "1.0.136", features = ["derive"] }
7676
thiserror = "1.0.30"
7777
time = "0.3.9"
78+
ipfs-api-backend-hyper = { version = "0.4", features = ["with-send-sync", "with-builder"] }
7879

7980
[dev-dependencies]
8081
anyhow = "1.0.56"

examples/ipfs.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use std::env;
2+
use std::sync::Arc;
3+
4+
use anyhow::Result;
5+
6+
use futures::StreamExt;
7+
use log::info;
8+
use opendal::{services::ipfs, Accessor, Operator};
9+
10+
#[tokio::main]
11+
async fn main() -> Result<()> {
12+
if env::var("RUST_LOG").is_err() {
13+
env::set_var("RUST_LOG", "debug");
14+
}
15+
env_logger::init();
16+
17+
println!(
18+
r#"OpenDAL IPFS Example.
19+
20+
Available Environment Values:
21+
22+
- OPENDAL_IPFS_ROOT: root path in mutable file system, default: /
23+
- OPENDAL_IPFS_ENDPOINT: ipfs endpoint, default: localhost:5001
24+
"#
25+
);
26+
27+
let mut builder = ipfs::Backend::build();
28+
// root must be absolute path in MFS.
29+
builder.root(&env::var("OPENDAL_IPFS_ROOT").unwrap_or_else(|_| "/".to_string()));
30+
builder.endpoint(
31+
&env::var("OPENDAL_IPFS_ENDPOINT").unwrap_or_else(|_| "http://localhost:5001".to_string()),
32+
);
33+
34+
let accessor: Arc<dyn Accessor> = builder.finish().await?;
35+
36+
let op: Operator = Operator::new(accessor);
37+
38+
let path = "/file.txt";
39+
info!("try to write file: {}", &path);
40+
op.object(&path).write("Hello, world!").await?;
41+
info!("write file successful!");
42+
43+
let content = op.object(&path).read().await?;
44+
info!("File content: {}", String::from_utf8_lossy(&content));
45+
46+
let root = "/";
47+
let mut list = op.object(&root).list().await?;
48+
info!("Listing entries in {}", &root);
49+
while let Some(res) = list.next().await {
50+
let item = res?;
51+
info!("Found entry: {}", item.path())
52+
}
53+
54+
Ok(())
55+
}

src/operator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ impl Operator {
133133
Scheme::Http => services::http::Backend::from_iter(it).await?,
134134
Scheme::Memory => services::memory::Backend::build().finish().await?,
135135
Scheme::S3 => services::s3::Backend::from_iter(it).await?,
136+
Scheme::Ipfs => services::ipfs::Backend::from_iter(it).await?,
136137
};
137138

138139
Ok(Self { accessor })

src/scheme.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ pub enum Scheme {
3838
Memory,
3939
/// [s3][crate::services::s3]: AWS S3 alike services.
4040
S3,
41+
/// [ipfs][crate::services::ipfs]: IPFS mutable file system
42+
Ipfs,
4143
}
4244

4345
impl Default for Scheme {
@@ -57,6 +59,7 @@ impl Display for Scheme {
5759
Scheme::Http => write!(f, "http"),
5860
Scheme::Memory => write!(f, "memory"),
5961
Scheme::S3 => write!(f, "s3"),
62+
Scheme::Ipfs => write!(f, "ipfs"),
6063
}
6164
}
6265
}
@@ -75,6 +78,7 @@ impl FromStr for Scheme {
7578
"http" | "https" => Ok(Scheme::Http),
7679
"memory" => Ok(Scheme::Memory),
7780
"s3" => Ok(Scheme::S3),
81+
"ipfs" => Ok(Scheme::Ipfs),
7882
v => Err(other(BackendError::new(
7983
Default::default(),
8084
anyhow!("{} is not supported", v),

src/services/ipfs/backend.rs

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
use super::builder::Builder;
2+
use super::dir_stream::DirStream;
3+
use super::request_writer::{IpfsWriterFuture, RequestWriter};
4+
5+
use crate::ops::{OpCreate, OpDelete, OpList, OpRead, OpStat, OpWrite};
6+
use crate::{
7+
Accessor, AccessorMetadata, BytesReader, BytesWriter, DirStreamer, ObjectMetadata, ObjectMode,
8+
Scheme,
9+
};
10+
11+
use async_trait::async_trait;
12+
use bytes::{Buf, Bytes};
13+
use futures::TryStreamExt;
14+
use std::fmt;
15+
use std::io;
16+
use std::sync::Arc;
17+
18+
use ipfs_api_backend_hyper;
19+
use ipfs_api_backend_hyper::response::FilesLsResponse;
20+
use ipfs_api_backend_hyper::{IpfsApi, IpfsClient};
21+
22+
use futures::channel::mpsc::{self};
23+
use metrics::increment_counter;
24+
use minitrace::trace;
25+
26+
/// Backend for IPFS service
27+
#[derive(Clone)]
28+
pub struct Backend {
29+
root: String,
30+
client: IpfsClient,
31+
}
32+
33+
impl fmt::Debug for Backend {
34+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35+
f.debug_struct("ipfs::Backend")
36+
.field("root", &self.root)
37+
.finish()
38+
}
39+
}
40+
41+
impl Backend {
42+
/// Create a default builder for ipfs.
43+
pub fn build() -> Builder {
44+
Builder::default()
45+
}
46+
47+
pub(crate) fn new(root: String, client: IpfsClient) -> Self {
48+
Self { root, client }
49+
}
50+
51+
pub(crate) async fn from_iter(
52+
it: impl Iterator<Item = (String, String)>,
53+
) -> io::Result<Arc<dyn Accessor>> {
54+
let mut builder = Builder::default();
55+
56+
for (key, val) in it {
57+
let val = val.as_str();
58+
match key.as_ref() {
59+
"root" => builder.root(val),
60+
_ => continue,
61+
};
62+
}
63+
64+
builder.finish().await
65+
}
66+
67+
pub(crate) fn get_abs_path(&self, path: &str) -> String {
68+
if path == self.root {
69+
return path.to_string();
70+
}
71+
72+
format!("{}{}", self.root, path.trim_start_matches(&self.root))
73+
}
74+
75+
#[trace("files_stat")]
76+
pub(crate) async fn files_stat(&self, path: &str) -> io::Result<ObjectMetadata> {
77+
let mut meta = ObjectMetadata::default();
78+
79+
let res = self
80+
.client
81+
.files_stat(path)
82+
.await
83+
.map_err(crate::error::other)?;
84+
85+
let mode: ObjectMode = match res.typ.as_str() {
86+
"file" => ObjectMode::FILE,
87+
"directory" => ObjectMode::DIR,
88+
_ => ObjectMode::Unknown,
89+
};
90+
91+
meta.set_mode(mode).set_content_length(res.size);
92+
93+
Ok(meta)
94+
}
95+
96+
pub(crate) async fn files_create(&self, path: &str) -> io::Result<()> {
97+
self.client
98+
.files_write(path, true, false, io::empty())
99+
.await
100+
.map_err(crate::error::other)
101+
}
102+
103+
pub(crate) async fn files_read(
104+
&self,
105+
path: &str,
106+
offset: Option<i64>,
107+
count: Option<i64>,
108+
) -> io::Result<BytesReader> {
109+
let req = ipfs_api_backend_hyper::request::FilesRead {
110+
path,
111+
offset,
112+
count,
113+
};
114+
let reader = self
115+
.client
116+
.files_read_with_options(req)
117+
.map_err(crate::error::other)
118+
.into_async_read();
119+
Ok(Box::new(reader))
120+
}
121+
122+
pub(crate) async fn files_delete(&self, path: &str) -> io::Result<()> {
123+
self.client
124+
.files_rm(path, false)
125+
.await
126+
.map_err(crate::error::other)
127+
}
128+
129+
pub(crate) async fn files_list(&self, path: &str) -> io::Result<FilesLsResponse> {
130+
self.client
131+
.files_ls(Some(path))
132+
.await
133+
.map_err(crate::error::other)
134+
}
135+
136+
pub(crate) async fn files_write(&self, path: &str, data: Bytes) -> io::Result<()> {
137+
self.client
138+
.files_write(path, true, true, data.reader())
139+
.await
140+
.map_err(crate::error::other)
141+
}
142+
}
143+
144+
#[async_trait]
145+
impl Accessor for Backend {
146+
fn metadata(&self) -> AccessorMetadata {
147+
let mut am = AccessorMetadata::default();
148+
am.set_scheme(Scheme::Ipfs)
149+
.set_root(&self.root)
150+
.set_capabilities(None);
151+
152+
am
153+
}
154+
155+
#[trace("create")]
156+
async fn create(&self, args: &OpCreate) -> io::Result<()> {
157+
increment_counter!("opendal_ipfs_create_requests");
158+
159+
let path = self.get_abs_path(args.path());
160+
self.files_create(&path).await
161+
}
162+
163+
#[trace("read")]
164+
async fn read(&self, args: &OpRead) -> io::Result<BytesReader> {
165+
increment_counter!("opendal_ipfs_read_requests");
166+
167+
let path = self.get_abs_path(args.path());
168+
169+
let offset = args.offset().and_then(|val| i64::try_from(val).ok());
170+
let size = args.size().and_then(|val| i64::try_from(val).ok());
171+
let reader = self.files_read(&path, offset, size).await?;
172+
Ok(reader)
173+
}
174+
175+
#[trace("write")]
176+
async fn write(&self, args: &OpWrite) -> io::Result<BytesWriter> {
177+
increment_counter!("opendal_ipfs_write_requests");
178+
179+
let path = self.get_abs_path(args.path());
180+
181+
let (tx, rx) = mpsc::channel::<Bytes>(0);
182+
183+
let req_fut = IpfsWriterFuture::new(rx, Arc::new(self.clone()), path);
184+
185+
let req_writer = RequestWriter::new(tx, req_fut);
186+
187+
Ok(Box::new(req_writer))
188+
}
189+
190+
#[trace("stat")]
191+
async fn stat(&self, args: &OpStat) -> io::Result<ObjectMetadata> {
192+
increment_counter!("opendal_ipfs_stat_requests");
193+
194+
let path = self.get_abs_path(args.path());
195+
self.files_stat(&path).await
196+
}
197+
198+
#[trace("delete")]
199+
async fn delete(&self, args: &OpDelete) -> io::Result<()> {
200+
increment_counter!("opendal_ipfs_delete_requests");
201+
202+
let path = self.get_abs_path(args.path());
203+
self.files_delete(&path).await
204+
}
205+
206+
#[trace("list")]
207+
async fn list(&self, args: &OpList) -> io::Result<DirStreamer> {
208+
increment_counter!("opendal_ipfs_list_requests");
209+
210+
let path = self.get_abs_path(args.path());
211+
Ok(Box::new(DirStream::new(Arc::new(self.clone()), &path)))
212+
}
213+
}

0 commit comments

Comments
 (0)