Skip to content

Commit cb49672

Browse files
fix(fs): fix mergefs bug where files were opened too many times
Signed-off-by: Dave Henderson <[email protected]>
1 parent cd74bb8 commit cb49672

File tree

3 files changed

+195
-180
lines changed

3 files changed

+195
-180
lines changed

internal/datafs/mergefs.go

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ func (f *mergeFS) Open(name string) (fs.File, error) {
122122
// unescaped '+' characters to make it simpler to provide types like
123123
// "application/array+json"
124124
overrideType := typeOverrideParam()
125-
mimeType := u.Query().Get(overrideType)
126-
mimeType = strings.ReplaceAll(mimeType, " ", "+")
125+
mimeTypeHint := u.Query().Get(overrideType)
126+
mimeTypeHint = strings.ReplaceAll(mimeTypeHint, " ", "+")
127127

128128
// now that we have the hint, remove it from the URL - we can't have it
129129
// leaking into the filesystem layer
@@ -151,23 +151,6 @@ func (f *mergeFS) Open(name string) (fs.File, error) {
151151

152152
fsys = fsimpl.WithHTTPClientFS(f.httpClient, fsys)
153153

154-
// find the content type
155-
fi, err := fs.Stat(fsys, base)
156-
if err != nil {
157-
return nil, &fs.PathError{
158-
Op: "open", Path: name,
159-
Err: fmt.Errorf("stat merge part %q: %w", part, err),
160-
}
161-
}
162-
163-
if fi.ModTime().After(modTime) {
164-
modTime = fi.ModTime()
165-
}
166-
167-
if mimeType == "" {
168-
mimeType = fsimpl.ContentType(fi)
169-
}
170-
171154
f, err := fsys.Open(base)
172155
if err != nil {
173156
return nil, &fs.PathError{
@@ -176,7 +159,7 @@ func (f *mergeFS) Open(name string) (fs.File, error) {
176159
}
177160
}
178161

179-
subFiles[i] = subFile{f, mimeType}
162+
subFiles[i] = subFile{f, mimeTypeHint}
180163
}
181164

182165
return &mergeFile{
@@ -226,18 +209,16 @@ func (f *mergeFile) Read(p []byte) (int, error) {
226209
if f.merged == nil {
227210
f.readMux.Lock()
228211
defer f.readMux.Unlock()
212+
229213
// read from all and merge
230-
data := make([]map[string]interface{}, len(f.subFiles))
214+
data := make([]map[string]any, len(f.subFiles))
231215
for i, sf := range f.subFiles {
232-
b, err := io.ReadAll(sf)
233-
if err != nil && !errors.Is(err, io.EOF) {
234-
return 0, fmt.Errorf("readAll: %w", err)
235-
}
236-
237-
data[i], err = parseMap(sf.contentType, string(b))
216+
d, err := f.readSubFile(sf)
238217
if err != nil {
239-
return 0, fmt.Errorf("parsing map with content type %s: %w", sf.contentType, err)
218+
return 0, fmt.Errorf("readSubFile: %w", err)
240219
}
220+
221+
data[i] = d
241222
}
242223

243224
md, err := mergeData(data)
@@ -253,6 +234,36 @@ func (f *mergeFile) Read(p []byte) (int, error) {
253234
return f.merged.Read(p)
254235
}
255236

237+
func (f *mergeFile) readSubFile(sf subFile) (map[string]any, error) {
238+
// stat for content type and modTime
239+
fi, err := sf.Stat()
240+
if err != nil {
241+
return nil, fmt.Errorf("stat merge part %q: %w", f.name, err)
242+
}
243+
244+
// the merged file's modTime is the most recent of all the sub-files
245+
if fi.ModTime().After(f.modTime) {
246+
f.modTime = fi.ModTime()
247+
}
248+
249+
// if we haven't been given a content type hint, guess the normal way
250+
if sf.contentType == "" {
251+
sf.contentType = fsimpl.ContentType(fi)
252+
}
253+
254+
b, err := io.ReadAll(sf)
255+
if err != nil && !errors.Is(err, io.EOF) {
256+
return nil, fmt.Errorf("readAll: %w", err)
257+
}
258+
259+
sfData, err := parseMap(sf.contentType, string(b))
260+
if err != nil {
261+
return nil, fmt.Errorf("parsing map with content type %s: %w", sf.contentType, err)
262+
}
263+
264+
return sfData, nil
265+
}
266+
256267
func mergeData(data []map[string]interface{}) ([]byte, error) {
257268
dst := data[0]
258269
data = data[1:]
@@ -269,17 +280,19 @@ func mergeData(data []map[string]interface{}) ([]byte, error) {
269280
return []byte(s), nil
270281
}
271282

272-
func parseMap(mimeType, data string) (map[string]interface{}, error) {
283+
func parseMap(mimeType, data string) (map[string]any, error) {
273284
datum, err := parsers.ParseData(mimeType, data)
274285
if err != nil {
275286
return nil, fmt.Errorf("parseData: %w", err)
276287
}
277-
var m map[string]interface{}
288+
289+
var m map[string]any
278290
switch datum := datum.(type) {
279-
case map[string]interface{}:
291+
case map[string]any:
280292
m = datum
281293
default:
282294
return nil, fmt.Errorf("unexpected data type '%T' for datasource (type %s); merge: can only merge maps", datum, mimeType)
283295
}
296+
284297
return m, nil
285298
}

internal/datafs/mergefs_test.go

Lines changed: 70 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package datafs
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67
"io/fs"
78
"mime"
@@ -31,19 +32,7 @@ func setupMergeFsys(ctx context.Context, t *testing.T) fs.FS {
3132
yamlContent := "hello: earth\ngoodnight: moon\n"
3233
arrayContent := `["hello", "world"]`
3334

34-
wd, _ := os.Getwd()
35-
36-
// MapFS doesn't support windows path separators, so we use / exclusively
37-
// in this test
38-
vol := filepath.VolumeName(wd)
39-
if vol != "" && wd != vol {
40-
wd = wd[len(vol)+1:]
41-
} else if wd[0] == '/' {
42-
wd = wd[1:]
43-
}
44-
wd = filepath.ToSlash(wd)
45-
46-
t.Logf("wd: %s", wd)
35+
wd := wdForTest(t)
4736

4837
fsys := WrapWdFS(fstest.MapFS{
4938
"tmp": {Mode: fs.ModeDir | 0o777},
@@ -90,84 +79,22 @@ func setupMergeFsys(ctx context.Context, t *testing.T) fs.FS {
9079
return fsys
9180
}
9281

93-
// func TestReadMerge(t *testing.T) {
94-
// ctx := context.Background()
95-
96-
// jsonContent := `{"hello": "world"}`
97-
// yamlContent := "hello: earth\ngoodnight: moon\n"
98-
// arrayContent := `["hello", "world"]`
99-
100-
// mergedContent := "goodnight: moon\nhello: world\n"
101-
102-
// fsys := fstest.MapFS{}
103-
// fsys["tmp"] = &fstest.MapFile{Mode: fs.ModeDir | 0777}
104-
// fsys["tmp/jsonfile.json"] = &fstest.MapFile{Data: []byte(jsonContent)}
105-
// fsys["tmp/array.json"] = &fstest.MapFile{Data: []byte(arrayContent)}
106-
// fsys["tmp/yamlfile.yaml"] = &fstest.MapFile{Data: []byte(yamlContent)}
107-
// fsys["tmp/textfile.txt"] = &fstest.MapFile{Data: []byte(`plain text...`)}
108-
109-
// // workding dir with volume name trimmed
110-
// wd, _ := os.Getwd()
111-
// vol := filepath.VolumeName(wd)
112-
// wd = wd[len(vol)+1:]
113-
114-
// fsys[path.Join(wd, "jsonfile.json")] = &fstest.MapFile{Data: []byte(jsonContent)}
115-
// fsys[path.Join(wd, "array.json")] = &fstest.MapFile{Data: []byte(arrayContent)}
116-
// fsys[path.Join(wd, "yamlfile.yaml")] = &fstest.MapFile{Data: []byte(yamlContent)}
117-
// fsys[path.Join(wd, "textfile.txt")] = &fstest.MapFile{Data: []byte(`plain text...`)}
118-
119-
// fsmux := fsimpl.NewMux()
120-
// fsmux.Add(fsimpl.WrappedFSProvider(&fsys, "file"))
121-
// ctx = datafs.ContextWithFSProvider(ctx, fsmux)
122-
123-
// source := &Source{Alias: "foo", URL: mustParseURL("merge:file:///tmp/jsonfile.json|file:///tmp/yamlfile.yaml")}
124-
// d := &Data{
125-
// Sources: map[string]*Source{
126-
// "foo": source,
127-
// "bar": {Alias: "bar", URL: mustParseURL("file:///tmp/jsonfile.json")},
128-
// "baz": {Alias: "baz", URL: mustParseURL("file:///tmp/yamlfile.yaml")},
129-
// "text": {Alias: "text", URL: mustParseURL("file:///tmp/textfile.txt")},
130-
// "badscheme": {Alias: "badscheme", URL: mustParseURL("bad:///scheme.json")},
131-
// "badtype": {Alias: "badtype", URL: mustParseURL("file:///tmp/textfile.txt?type=foo/bar")},
132-
// "array": {Alias: "array", URL: mustParseURL("file:///tmp/array.json?type=" + url.QueryEscape(jsonArrayMimetype))},
133-
// },
134-
// Ctx: ctx,
135-
// }
136-
137-
// actual, err := d.readMerge(ctx, source)
138-
// require.NoError(t, err)
139-
// assert.Equal(t, mergedContent, string(actual))
140-
141-
// source.URL = mustParseURL("merge:bar|baz")
142-
// actual, err = d.readMerge(ctx, source)
143-
// require.NoError(t, err)
144-
// assert.Equal(t, mergedContent, string(actual))
145-
146-
// source.URL = mustParseURL("merge:./jsonfile.json|baz")
147-
// actual, err = d.readMerge(ctx, source)
148-
// require.NoError(t, err)
149-
// assert.Equal(t, mergedContent, string(actual))
150-
151-
// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json")
152-
// _, err = d.readMerge(ctx, source)
153-
// require.Error(t, err)
154-
155-
// source.URL = mustParseURL("merge:bogusalias|file:///tmp/jsonfile.json")
156-
// _, err = d.readMerge(ctx, source)
157-
// require.Error(t, err)
158-
159-
// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|badscheme")
160-
// _, err = d.readMerge(ctx, source)
161-
// require.Error(t, err)
162-
163-
// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|badtype")
164-
// _, err = d.readMerge(ctx, source)
165-
// require.Error(t, err)
166-
167-
// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|array")
168-
// _, err = d.readMerge(ctx, source)
169-
// require.Error(t, err)
170-
// }
82+
func wdForTest(t *testing.T) string {
83+
t.Helper()
84+
85+
wd, _ := os.Getwd()
86+
87+
// MapFS doesn't support windows path separators, so we use / exclusively
88+
vol := filepath.VolumeName(wd)
89+
if vol != "" && wd != vol {
90+
wd = wd[len(vol)+1:]
91+
} else if wd[0] == '/' {
92+
wd = wd[1:]
93+
}
94+
wd = filepath.ToSlash(wd)
95+
96+
return wd
97+
}
17198

17299
func TestMergeData(t *testing.T) {
173100
def := map[string]interface{}{
@@ -228,7 +155,6 @@ func TestMergeData(t *testing.T) {
228155
}
229156

230157
func TestMergeFS_Open(t *testing.T) {
231-
// u, _ := url.Parse("merge:")
232158
fsys := setupMergeFsys(context.Background(), t)
233159
assert.IsType(t, &mergeFS{}, fsys)
234160

@@ -354,3 +280,55 @@ func TestMergeFS_ReadFile(t *testing.T) {
354280
})
355281
}
356282
}
283+
284+
func TestMergeFS_ReadsSubFilesOnce(t *testing.T) {
285+
mergedContent := "goodnight: moon\nhello: world\n"
286+
287+
wd := wdForTest(t)
288+
289+
fsys := WrapWdFS(
290+
openOnce(&fstest.MapFS{
291+
path.Join(wd, "tmp/jsonfile.json"): {Data: []byte(`{"hello": "world"}`)},
292+
path.Join(wd, "tmp/yamlfile.yaml"): {Data: []byte("hello: earth\ngoodnight: moon\n")},
293+
}))
294+
295+
mux := fsimpl.NewMux()
296+
mux.Add(MergeFS)
297+
mux.Add(WrappedFSProvider(fsys, "file", ""))
298+
299+
ctx := ContextWithFSProvider(context.Background(), mux)
300+
301+
reg := NewRegistry()
302+
reg.Register("jsonfile", config.DataSource{URL: mustParseURL("tmp/jsonfile.json")})
303+
reg.Register("yamlfile", config.DataSource{URL: mustParseURL("tmp/yamlfile.yaml")})
304+
305+
fsys, err := NewMergeFS(mustParseURL("merge:///"))
306+
require.NoError(t, err)
307+
308+
fsys = WithDataSourceRegistryFS(reg, fsys)
309+
fsys = fsimpl.WithContextFS(ctx, fsys)
310+
311+
b, err := fs.ReadFile(fsys, "jsonfile|yamlfile")
312+
require.NoError(t, err)
313+
assert.Equal(t, mergedContent, string(b))
314+
}
315+
316+
type openOnceFS struct {
317+
fs *fstest.MapFS
318+
opened map[string]struct{}
319+
}
320+
321+
// a filesystem that only allows opening or stating a file once
322+
func openOnce(fsys *fstest.MapFS) fs.FS {
323+
return &openOnceFS{fs: fsys, opened: map[string]struct{}{}}
324+
}
325+
326+
func (f *openOnceFS) Open(name string) (fs.File, error) {
327+
if _, ok := f.opened[name]; ok {
328+
return nil, fmt.Errorf("open: %q already opened", name)
329+
}
330+
331+
f.opened[name] = struct{}{}
332+
333+
return f.fs.Open(name)
334+
}

0 commit comments

Comments
 (0)