summaryrefslogtreecommitdiff
path: root/internal/datafs
diff options
context:
space:
mode:
authorDave Henderson <dhenderson@gmail.com>2024-12-16 17:16:44 -0500
committerGitHub <noreply@github.com>2024-12-16 22:16:44 +0000
commit7eb7829dc69830dbe42cbf688a90eca561d75671 (patch)
tree1b50c695c356f5188ef361aede1365d422a07098 /internal/datafs
parent40c38b753553237b19df01ce94e5e6b05e394712 (diff)
fix(fs): fix mergefs bug where files were opened too many times (#2287)
Signed-off-by: Dave Henderson <dhenderson@gmail.com>
Diffstat (limited to 'internal/datafs')
-rw-r--r--internal/datafs/mergefs.go75
-rw-r--r--internal/datafs/mergefs_test.go162
2 files changed, 114 insertions, 123 deletions
diff --git a/internal/datafs/mergefs.go b/internal/datafs/mergefs.go
index b056aff7..dd74eb67 100644
--- a/internal/datafs/mergefs.go
+++ b/internal/datafs/mergefs.go
@@ -122,8 +122,8 @@ func (f *mergeFS) Open(name string) (fs.File, error) {
// unescaped '+' characters to make it simpler to provide types like
// "application/array+json"
overrideType := typeOverrideParam()
- mimeType := u.Query().Get(overrideType)
- mimeType = strings.ReplaceAll(mimeType, " ", "+")
+ mimeTypeHint := u.Query().Get(overrideType)
+ mimeTypeHint = strings.ReplaceAll(mimeTypeHint, " ", "+")
// now that we have the hint, remove it from the URL - we can't have it
// leaking into the filesystem layer
@@ -151,23 +151,6 @@ func (f *mergeFS) Open(name string) (fs.File, error) {
fsys = fsimpl.WithHTTPClientFS(f.httpClient, fsys)
- // find the content type
- fi, err := fs.Stat(fsys, base)
- if err != nil {
- return nil, &fs.PathError{
- Op: "open", Path: name,
- Err: fmt.Errorf("stat merge part %q: %w", part, err),
- }
- }
-
- if fi.ModTime().After(modTime) {
- modTime = fi.ModTime()
- }
-
- if mimeType == "" {
- mimeType = fsimpl.ContentType(fi)
- }
-
f, err := fsys.Open(base)
if err != nil {
return nil, &fs.PathError{
@@ -176,7 +159,7 @@ func (f *mergeFS) Open(name string) (fs.File, error) {
}
}
- subFiles[i] = subFile{f, mimeType}
+ subFiles[i] = subFile{f, mimeTypeHint}
}
return &mergeFile{
@@ -226,18 +209,16 @@ func (f *mergeFile) Read(p []byte) (int, error) {
if f.merged == nil {
f.readMux.Lock()
defer f.readMux.Unlock()
+
// read from all and merge
- data := make([]map[string]interface{}, len(f.subFiles))
+ data := make([]map[string]any, len(f.subFiles))
for i, sf := range f.subFiles {
- b, err := io.ReadAll(sf)
- if err != nil && !errors.Is(err, io.EOF) {
- return 0, fmt.Errorf("readAll: %w", err)
- }
-
- data[i], err = parseMap(sf.contentType, string(b))
+ d, err := f.readSubFile(sf)
if err != nil {
- return 0, fmt.Errorf("parsing map with content type %s: %w", sf.contentType, err)
+ return 0, fmt.Errorf("readSubFile: %w", err)
}
+
+ data[i] = d
}
md, err := mergeData(data)
@@ -253,6 +234,36 @@ func (f *mergeFile) Read(p []byte) (int, error) {
return f.merged.Read(p)
}
+func (f *mergeFile) readSubFile(sf subFile) (map[string]any, error) {
+ // stat for content type and modTime
+ fi, err := sf.Stat()
+ if err != nil {
+ return nil, fmt.Errorf("stat merge part %q: %w", f.name, err)
+ }
+
+ // the merged file's modTime is the most recent of all the sub-files
+ if fi.ModTime().After(f.modTime) {
+ f.modTime = fi.ModTime()
+ }
+
+ // if we haven't been given a content type hint, guess the normal way
+ if sf.contentType == "" {
+ sf.contentType = fsimpl.ContentType(fi)
+ }
+
+ b, err := io.ReadAll(sf)
+ if err != nil && !errors.Is(err, io.EOF) {
+ return nil, fmt.Errorf("readAll: %w", err)
+ }
+
+ sfData, err := parseMap(sf.contentType, string(b))
+ if err != nil {
+ return nil, fmt.Errorf("parsing map with content type %s: %w", sf.contentType, err)
+ }
+
+ return sfData, nil
+}
+
func mergeData(data []map[string]interface{}) ([]byte, error) {
dst := data[0]
data = data[1:]
@@ -269,17 +280,19 @@ func mergeData(data []map[string]interface{}) ([]byte, error) {
return []byte(s), nil
}
-func parseMap(mimeType, data string) (map[string]interface{}, error) {
+func parseMap(mimeType, data string) (map[string]any, error) {
datum, err := parsers.ParseData(mimeType, data)
if err != nil {
return nil, fmt.Errorf("parseData: %w", err)
}
- var m map[string]interface{}
+
+ var m map[string]any
switch datum := datum.(type) {
- case map[string]interface{}:
+ case map[string]any:
m = datum
default:
return nil, fmt.Errorf("unexpected data type '%T' for datasource (type %s); merge: can only merge maps", datum, mimeType)
}
+
return m, nil
}
diff --git a/internal/datafs/mergefs_test.go b/internal/datafs/mergefs_test.go
index 17ffa716..b332722b 100644
--- a/internal/datafs/mergefs_test.go
+++ b/internal/datafs/mergefs_test.go
@@ -2,6 +2,7 @@ package datafs
import (
"context"
+ "fmt"
"io"
"io/fs"
"mime"
@@ -31,19 +32,7 @@ func setupMergeFsys(ctx context.Context, t *testing.T) fs.FS {
yamlContent := "hello: earth\ngoodnight: moon\n"
arrayContent := `["hello", "world"]`
- wd, _ := os.Getwd()
-
- // MapFS doesn't support windows path separators, so we use / exclusively
- // in this test
- vol := filepath.VolumeName(wd)
- if vol != "" && wd != vol {
- wd = wd[len(vol)+1:]
- } else if wd[0] == '/' {
- wd = wd[1:]
- }
- wd = filepath.ToSlash(wd)
-
- t.Logf("wd: %s", wd)
+ wd := wdForTest(t)
fsys := WrapWdFS(fstest.MapFS{
"tmp": {Mode: fs.ModeDir | 0o777},
@@ -90,84 +79,22 @@ func setupMergeFsys(ctx context.Context, t *testing.T) fs.FS {
return fsys
}
-// func TestReadMerge(t *testing.T) {
-// ctx := context.Background()
-
-// jsonContent := `{"hello": "world"}`
-// yamlContent := "hello: earth\ngoodnight: moon\n"
-// arrayContent := `["hello", "world"]`
-
-// mergedContent := "goodnight: moon\nhello: world\n"
-
-// fsys := fstest.MapFS{}
-// fsys["tmp"] = &fstest.MapFile{Mode: fs.ModeDir | 0777}
-// fsys["tmp/jsonfile.json"] = &fstest.MapFile{Data: []byte(jsonContent)}
-// fsys["tmp/array.json"] = &fstest.MapFile{Data: []byte(arrayContent)}
-// fsys["tmp/yamlfile.yaml"] = &fstest.MapFile{Data: []byte(yamlContent)}
-// fsys["tmp/textfile.txt"] = &fstest.MapFile{Data: []byte(`plain text...`)}
-
-// // workding dir with volume name trimmed
-// wd, _ := os.Getwd()
-// vol := filepath.VolumeName(wd)
-// wd = wd[len(vol)+1:]
-
-// fsys[path.Join(wd, "jsonfile.json")] = &fstest.MapFile{Data: []byte(jsonContent)}
-// fsys[path.Join(wd, "array.json")] = &fstest.MapFile{Data: []byte(arrayContent)}
-// fsys[path.Join(wd, "yamlfile.yaml")] = &fstest.MapFile{Data: []byte(yamlContent)}
-// fsys[path.Join(wd, "textfile.txt")] = &fstest.MapFile{Data: []byte(`plain text...`)}
-
-// fsmux := fsimpl.NewMux()
-// fsmux.Add(fsimpl.WrappedFSProvider(&fsys, "file"))
-// ctx = datafs.ContextWithFSProvider(ctx, fsmux)
-
-// source := &Source{Alias: "foo", URL: mustParseURL("merge:file:///tmp/jsonfile.json|file:///tmp/yamlfile.yaml")}
-// d := &Data{
-// Sources: map[string]*Source{
-// "foo": source,
-// "bar": {Alias: "bar", URL: mustParseURL("file:///tmp/jsonfile.json")},
-// "baz": {Alias: "baz", URL: mustParseURL("file:///tmp/yamlfile.yaml")},
-// "text": {Alias: "text", URL: mustParseURL("file:///tmp/textfile.txt")},
-// "badscheme": {Alias: "badscheme", URL: mustParseURL("bad:///scheme.json")},
-// "badtype": {Alias: "badtype", URL: mustParseURL("file:///tmp/textfile.txt?type=foo/bar")},
-// "array": {Alias: "array", URL: mustParseURL("file:///tmp/array.json?type=" + url.QueryEscape(jsonArrayMimetype))},
-// },
-// Ctx: ctx,
-// }
-
-// actual, err := d.readMerge(ctx, source)
-// require.NoError(t, err)
-// assert.Equal(t, mergedContent, string(actual))
-
-// source.URL = mustParseURL("merge:bar|baz")
-// actual, err = d.readMerge(ctx, source)
-// require.NoError(t, err)
-// assert.Equal(t, mergedContent, string(actual))
-
-// source.URL = mustParseURL("merge:./jsonfile.json|baz")
-// actual, err = d.readMerge(ctx, source)
-// require.NoError(t, err)
-// assert.Equal(t, mergedContent, string(actual))
-
-// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json")
-// _, err = d.readMerge(ctx, source)
-// require.Error(t, err)
-
-// source.URL = mustParseURL("merge:bogusalias|file:///tmp/jsonfile.json")
-// _, err = d.readMerge(ctx, source)
-// require.Error(t, err)
-
-// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|badscheme")
-// _, err = d.readMerge(ctx, source)
-// require.Error(t, err)
-
-// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|badtype")
-// _, err = d.readMerge(ctx, source)
-// require.Error(t, err)
-
-// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|array")
-// _, err = d.readMerge(ctx, source)
-// require.Error(t, err)
-// }
+func wdForTest(t *testing.T) string {
+ t.Helper()
+
+ wd, _ := os.Getwd()
+
+ // MapFS doesn't support windows path separators, so we use / exclusively
+ vol := filepath.VolumeName(wd)
+ if vol != "" && wd != vol {
+ wd = wd[len(vol)+1:]
+ } else if wd[0] == '/' {
+ wd = wd[1:]
+ }
+ wd = filepath.ToSlash(wd)
+
+ return wd
+}
func TestMergeData(t *testing.T) {
def := map[string]interface{}{
@@ -228,7 +155,6 @@ func TestMergeData(t *testing.T) {
}
func TestMergeFS_Open(t *testing.T) {
- // u, _ := url.Parse("merge:")
fsys := setupMergeFsys(context.Background(), t)
assert.IsType(t, &mergeFS{}, fsys)
@@ -354,3 +280,55 @@ func TestMergeFS_ReadFile(t *testing.T) {
})
}
}
+
+func TestMergeFS_ReadsSubFilesOnce(t *testing.T) {
+ mergedContent := "goodnight: moon\nhello: world\n"
+
+ wd := wdForTest(t)
+
+ fsys := WrapWdFS(
+ openOnce(&fstest.MapFS{
+ path.Join(wd, "tmp/jsonfile.json"): {Data: []byte(`{"hello": "world"}`)},
+ path.Join(wd, "tmp/yamlfile.yaml"): {Data: []byte("hello: earth\ngoodnight: moon\n")},
+ }))
+
+ mux := fsimpl.NewMux()
+ mux.Add(MergeFS)
+ mux.Add(WrappedFSProvider(fsys, "file", ""))
+
+ ctx := ContextWithFSProvider(context.Background(), mux)
+
+ reg := NewRegistry()
+ reg.Register("jsonfile", config.DataSource{URL: mustParseURL("tmp/jsonfile.json")})
+ reg.Register("yamlfile", config.DataSource{URL: mustParseURL("tmp/yamlfile.yaml")})
+
+ fsys, err := NewMergeFS(mustParseURL("merge:///"))
+ require.NoError(t, err)
+
+ fsys = WithDataSourceRegistryFS(reg, fsys)
+ fsys = fsimpl.WithContextFS(ctx, fsys)
+
+ b, err := fs.ReadFile(fsys, "jsonfile|yamlfile")
+ require.NoError(t, err)
+ assert.Equal(t, mergedContent, string(b))
+}
+
+type openOnceFS struct {
+ fs *fstest.MapFS
+ opened map[string]struct{}
+}
+
+// a filesystem that only allows opening or stating a file once
+func openOnce(fsys *fstest.MapFS) fs.FS {
+ return &openOnceFS{fs: fsys, opened: map[string]struct{}{}}
+}
+
+func (f *openOnceFS) Open(name string) (fs.File, error) {
+ if _, ok := f.opened[name]; ok {
+ return nil, fmt.Errorf("open: %q already opened", name)
+ }
+
+ f.opened[name] = struct{}{}
+
+ return f.fs.Open(name)
+}