Skip to content

Commit ce924f9

Browse files
authored
feat: List metadata reuse (#577)
* feature: list-metadata-reuse 1. add more fields to DirEntry 2. record fields in HDFS 3. record fields in GCS 4. record fields in S3 5. record fields in Microsoft Cloud Signed-off-by: ClSlaid <[email protected]> * feature: implement extended DirEntry for fs Signed-off-by: ClSlaid <[email protected]> * refactor: make cargo fmt happy Signed-off-by: ClSlaid <[email protected]> Signed-off-by: ClSlaid <[email protected]>
1 parent 19f6af7 commit ce924f9

File tree

7 files changed

+271
-29
lines changed

7 files changed

+271
-29
lines changed

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ mod tests {
168168
assert_eq!(80, size_of::<AccessorMetadata>());
169169
assert_eq!(16, size_of::<Operator>());
170170
assert_eq!(16, size_of::<BatchOperator>());
171-
assert_eq!(48, size_of::<DirEntry>());
171+
assert_eq!(128, size_of::<DirEntry>());
172172
assert_eq!(40, size_of::<Object>());
173173
assert_eq!(80, size_of::<ObjectMetadata>());
174174
assert_eq!(1, size_of::<ObjectMode>());

src/object.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,12 @@ pub struct DirEntry {
908908

909909
mode: ObjectMode,
910910
path: String,
911+
912+
// metadata fields
913+
etag: Option<String>,
914+
content_length: Option<u64>,
915+
content_md5: Option<String>,
916+
last_modified: Option<OffsetDateTime>,
911917
}
912918

913919
impl DirEntry {
@@ -923,6 +929,11 @@ impl DirEntry {
923929
acc,
924930
mode,
925931
path: path.to_string(),
932+
// all set to None
933+
etag: None,
934+
content_length: None,
935+
content_md5: None,
936+
last_modified: None,
926937
}
927938
}
928939

@@ -961,6 +972,46 @@ impl DirEntry {
961972
get_basename(&self.path)
962973
}
963974

975+
/// The ETag string of `DirEntry`'s corresponding object
976+
///
977+
/// `etag` is a prefetched metadata field in `DirEntry`.
978+
///
979+
/// It doesn't mean this metadata field of object doesn't exist if `etag` is `None`.
980+
/// Then you have to call `DirEntry::metadata()` to get the metadata you want.
981+
pub fn etag(&self) -> Option<&str> {
982+
self.etag.as_deref()
983+
}
984+
985+
/// The size of `DirEntry`'s corresponding object
986+
///
987+
/// `content_length` is a prefetched metadata field in `DirEntry`.
988+
///
989+
/// It doesn't mean this metadata field of object doesn't exist if `content_length` is `None`.
990+
/// Then you have to call `DirEntry::metadata()` to get the metadata you want.
991+
pub fn content_length(&self) -> Option<u64> {
992+
self.content_length
993+
}
994+
995+
/// The MD5 message digest of `DirEntry`'s corresponding object
996+
///
997+
/// `content_md5` is a prefetched metadata field in `DirEntry`
998+
///
999+
/// It doesn't mean this metadata field of object doesn't exist if `content_md5` is `None`.
1000+
/// Then you have to call `DirEntry::metadata()` to get the metadata you want.
1001+
pub fn content_md5(&self) -> Option<&str> {
1002+
self.content_md5.as_deref()
1003+
}
1004+
1005+
/// The last modified UTC datetime of `DirEntry`'s corresponding object
1006+
///
1007+
/// `last_modified` is a prefetched metadata field in `DirEntry`
1008+
///
1009+
/// It doesn't mean this metadata field of object doesn't exist if `last_modified` is `None`.
1010+
/// Then you have to call `DirEntry::metadata()` to get the metadata you want.
1011+
pub fn last_modified(&self) -> Option<OffsetDateTime> {
1012+
self.last_modified
1013+
}
1014+
9641015
/// Fetch metadata about this dir entry.
9651016
///
9661017
/// The same with [`Object::metadata()`]
@@ -971,6 +1022,26 @@ impl DirEntry {
9711022
}
9721023
}
9731024

1025+
// implement setters for DirEntry's metadata fields
1026+
impl DirEntry {
1027+
/// record the ETag of `DirEntry`'s corresponding object
1028+
pub(crate) fn set_etag(&mut self, etag: &str) {
1029+
self.etag = Some(etag.to_string())
1030+
}
1031+
/// record the last modified time of `DirEntry`'s corresponding object
1032+
pub(crate) fn set_last_modified(&mut self, last_modified: OffsetDateTime) {
1033+
self.last_modified = Some(last_modified)
1034+
}
1035+
/// record the content length of `DirEntry`'s corresponding object
1036+
pub(crate) fn set_content_length(&mut self, content_length: u64) {
1037+
self.content_length = Some(content_length)
1038+
}
1039+
/// record the content's md5 of `DirEntry`'s corresponding object
1040+
pub(crate) fn set_content_md5(&mut self, content_md5: &str) {
1041+
self.content_md5 = Some(content_md5.to_string())
1042+
}
1043+
}
1044+
9741045
/// DirEntry can convert into object without overhead.
9751046
impl From<DirEntry> for Object {
9761047
fn from(d: DirEntry) -> Self {

src/services/azblob/dir_stream.rs

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
14+
use anyhow::anyhow;
1415
use std::future::Future;
1516
use std::io::Result;
1617
use std::pin::Pin;
@@ -25,6 +26,8 @@ use isahc::AsyncReadResponseExt;
2526
use log::debug;
2627
use quick_xml::de;
2728
use serde::Deserialize;
29+
use time::format_description::well_known::Rfc2822;
30+
use time::OffsetDateTime;
2831

2932
use super::error::parse_error;
3033
use super::Backend;
@@ -119,11 +122,15 @@ impl futures::Stream for DirStream {
119122
);
120123

121124
debug!(
122-
"dir object {} got entry, mode: {}, path: {}",
123-
&self.path,
124-
de.mode(),
125-
de.path()
126-
);
125+
"dir object {} got entry, mode: {}, path: {}, content length: {:?}, last modified: {:?}, content_md5: {:?}, etag: {:?}",
126+
&self.path,
127+
de.mode(),
128+
de.path(),
129+
de.content_length(),
130+
de.last_modified(),
131+
de.content_md5(),
132+
de.etag()
133+
);
127134
return Poll::Ready(Some(Ok(de)));
128135
}
129136
};
@@ -140,17 +147,36 @@ impl futures::Stream for DirStream {
140147
continue;
141148
}
142149

143-
let de = DirEntry::new(
150+
let mut de = DirEntry::new(
144151
backend.clone(),
145152
ObjectMode::FILE,
146153
&backend.get_rel_path(&object.name),
147154
);
148155

156+
de.set_etag(object.properties.etag.as_str());
157+
de.set_content_length(object.properties.content_length);
158+
de.set_content_md5(object.properties.content_md5.as_str());
159+
160+
let dt =
161+
OffsetDateTime::parse(object.properties.last_modified.as_str(), &Rfc2822)
162+
.map_err(|e| {
163+
other(ObjectError::new(
164+
"list",
165+
&self.path,
166+
anyhow!("parse last modified RFC2822 datetime: {e:?}"),
167+
))
168+
})?;
169+
de.set_last_modified(dt);
170+
149171
debug!(
150-
"dir object {} got entry, mode: {}, path: {}",
172+
"dir object {} got entry, mode: {}, path: {}, content length: {:?}, last modified: {:?}, content_md5: {:?}, etag: {:?}",
151173
&self.path,
152174
de.mode(),
153175
de.path(),
176+
de.content_length(),
177+
de.last_modified(),
178+
de.content_md5(),
179+
de.etag()
154180
);
155181
return Poll::Ready(Some(Ok(de)));
156182
}
@@ -200,6 +226,11 @@ struct Blob {
200226
struct Properties {
201227
#[serde(rename = "Content-Length")]
202228
content_length: u64,
229+
#[serde(rename = "Last-Modified")]
230+
last_modified: String,
231+
#[serde(rename = "Content-MD5")]
232+
content_md5: String,
233+
etag: String,
203234
}
204235

205236
#[cfg(test)]
@@ -317,6 +348,42 @@ mod tests {
317348
.collect::<Vec<u64>>(),
318349
[3485277, 2471869, 1259677]
319350
);
351+
assert_eq!(
352+
out.blobs
353+
.blob
354+
.iter()
355+
.map(|v| v.properties.content_md5.clone())
356+
.collect::<Vec<String>>(),
357+
[
358+
"llJ/+jOlx5GdA1sL7SdKuw==".to_string(),
359+
"xmgUltSnopLSJOukgCHFtg==".to_string(),
360+
"AxTiFXHwrXKaZC5b7ZRybw==".to_string()
361+
]
362+
);
363+
assert_eq!(
364+
out.blobs
365+
.blob
366+
.iter()
367+
.map(|v| v.properties.last_modified.clone())
368+
.collect::<Vec<String>>(),
369+
[
370+
"Sun, 20 Mar 2022 11:29:03 GMT".to_string(),
371+
"Tue, 29 Mar 2022 01:54:07 GMT".to_string(),
372+
"Sun, 20 Mar 2022 11:31:57 GMT".to_string()
373+
]
374+
);
375+
assert_eq!(
376+
out.blobs
377+
.blob
378+
.iter()
379+
.map(|v| v.properties.etag.clone())
380+
.collect::<Vec<String>>(),
381+
[
382+
"0x8DA0A64D66790C3".to_string(),
383+
"0x8DA112702D88FE4".to_string(),
384+
"0x8DA0A653DC82981".to_string()
385+
]
386+
);
320387
assert_eq!(
321388
out.blobs
322389
.blob_prefix

src/services/fs/dir_stream.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::task::Poll;
2020

2121
use log::debug;
2222
use log::error;
23+
use time::OffsetDateTime;
2324

2425
use super::error::parse_io_error;
2526
use super::Backend;
@@ -65,7 +66,7 @@ impl futures::Stream for DirStream {
6566
// the target file type.
6667
let file_type = de.file_type()?;
6768

68-
let d = if file_type.is_file() {
69+
let mut d = if file_type.is_file() {
6970
DirEntry::new(self.backend.clone(), ObjectMode::FILE, &path)
7071
} else if file_type.is_dir() {
7172
// Make sure we are returning the correct path.
@@ -78,12 +79,26 @@ impl futures::Stream for DirStream {
7879
DirEntry::new(self.backend.clone(), ObjectMode::Unknown, &path)
7980
};
8081

82+
// metadata may not available on all platforms, it's ok not setting it here
83+
if let Ok(metadata) = de.metadata() {
84+
d.set_content_length(metadata.len());
85+
// last_modified is not available in all platforms.
86+
// it's ok not setting it here.
87+
if let Ok(last_modified) = metadata.modified().map(OffsetDateTime::from) {
88+
d.set_last_modified(last_modified);
89+
}
90+
}
91+
8192
debug!(
82-
"dir object {} got entry, mode: {}, path: {}",
83-
&self.path,
84-
d.mode(),
85-
d.path()
86-
);
93+
"dir object {} got entry, mode: {}, path: {}, content length: {:?}, last modified: {:?}, content_md5: {:?}, etag: {:?}",
94+
&self.path,
95+
d.mode(),
96+
d.path(),
97+
d.content_length(),
98+
d.last_modified(),
99+
d.content_md5(),
100+
d.etag()
101+
);
87102
Poll::Ready(Some(Ok(d)))
88103
}
89104
}

0 commit comments

Comments
 (0)