diff options
Diffstat (limited to 'internal/datafs')
| -rw-r--r-- | internal/datafs/mergefs.go | 75 | ||||
| -rw-r--r-- | internal/datafs/mergefs_test.go | 162 |
2 files changed, 114 insertions, 123 deletions
diff --git a/internal/datafs/mergefs.go b/internal/datafs/mergefs.go index b056aff7..dd74eb67 100644 --- a/internal/datafs/mergefs.go +++ b/internal/datafs/mergefs.go @@ -122,8 +122,8 @@ func (f *mergeFS) Open(name string) (fs.File, error) { // unescaped '+' characters to make it simpler to provide types like // "application/array+json" overrideType := typeOverrideParam() - mimeType := u.Query().Get(overrideType) - mimeType = strings.ReplaceAll(mimeType, " ", "+") + mimeTypeHint := u.Query().Get(overrideType) + mimeTypeHint = strings.ReplaceAll(mimeTypeHint, " ", "+") // now that we have the hint, remove it from the URL - we can't have it // leaking into the filesystem layer @@ -151,23 +151,6 @@ func (f *mergeFS) Open(name string) (fs.File, error) { fsys = fsimpl.WithHTTPClientFS(f.httpClient, fsys) - // find the content type - fi, err := fs.Stat(fsys, base) - if err != nil { - return nil, &fs.PathError{ - Op: "open", Path: name, - Err: fmt.Errorf("stat merge part %q: %w", part, err), - } - } - - if fi.ModTime().After(modTime) { - modTime = fi.ModTime() - } - - if mimeType == "" { - mimeType = fsimpl.ContentType(fi) - } - f, err := fsys.Open(base) if err != nil { return nil, &fs.PathError{ @@ -176,7 +159,7 @@ func (f *mergeFS) Open(name string) (fs.File, error) { } } - subFiles[i] = subFile{f, mimeType} + subFiles[i] = subFile{f, mimeTypeHint} } return &mergeFile{ @@ -226,18 +209,16 @@ func (f *mergeFile) Read(p []byte) (int, error) { if f.merged == nil { f.readMux.Lock() defer f.readMux.Unlock() + // read from all and merge - data := make([]map[string]interface{}, len(f.subFiles)) + data := make([]map[string]any, len(f.subFiles)) for i, sf := range f.subFiles { - b, err := io.ReadAll(sf) - if err != nil && !errors.Is(err, io.EOF) { - return 0, fmt.Errorf("readAll: %w", err) - } - - data[i], err = parseMap(sf.contentType, string(b)) + d, err := f.readSubFile(sf) if err != nil { - return 0, fmt.Errorf("parsing map with content type %s: %w", sf.contentType, err) + return 0, fmt.Errorf("readSubFile: %w", err) } + + data[i] = d } md, err := mergeData(data) @@ -253,6 +234,36 @@ func (f *mergeFile) Read(p []byte) (int, error) { return f.merged.Read(p) } +func (f *mergeFile) readSubFile(sf subFile) (map[string]any, error) { + // stat for content type and modTime + fi, err := sf.Stat() + if err != nil { + return nil, fmt.Errorf("stat merge part %q: %w", f.name, err) + } + + // the merged file's modTime is the most recent of all the sub-files + if fi.ModTime().After(f.modTime) { + f.modTime = fi.ModTime() + } + + // if we haven't been given a content type hint, guess the normal way + if sf.contentType == "" { + sf.contentType = fsimpl.ContentType(fi) + } + + b, err := io.ReadAll(sf) + if err != nil && !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("readAll: %w", err) + } + + sfData, err := parseMap(sf.contentType, string(b)) + if err != nil { + return nil, fmt.Errorf("parsing map with content type %s: %w", sf.contentType, err) + } + + return sfData, nil +} + func mergeData(data []map[string]interface{}) ([]byte, error) { dst := data[0] data = data[1:] @@ -269,17 +280,19 @@ func mergeData(data []map[string]interface{}) ([]byte, error) { return []byte(s), nil } -func parseMap(mimeType, data string) (map[string]interface{}, error) { +func parseMap(mimeType, data string) (map[string]any, error) { datum, err := parsers.ParseData(mimeType, data) if err != nil { return nil, fmt.Errorf("parseData: %w", err) } - var m map[string]interface{} + + var m map[string]any switch datum := datum.(type) { - case map[string]interface{}: + case map[string]any: m = datum default: return nil, fmt.Errorf("unexpected data type '%T' for datasource (type %s); merge: can only merge maps", datum, mimeType) } + return m, nil } diff --git a/internal/datafs/mergefs_test.go b/internal/datafs/mergefs_test.go index 17ffa716..b332722b 100644 --- a/internal/datafs/mergefs_test.go +++ b/internal/datafs/mergefs_test.go @@ -2,6 +2,7 @@ package datafs import ( "context" + "fmt" "io" "io/fs" "mime" @@ -31,19 +32,7 @@ func setupMergeFsys(ctx context.Context, t *testing.T) fs.FS { yamlContent := "hello: earth\ngoodnight: moon\n" arrayContent := `["hello", "world"]` - wd, _ := os.Getwd() - - // MapFS doesn't support windows path separators, so we use / exclusively - // in this test - vol := filepath.VolumeName(wd) - if vol != "" && wd != vol { - wd = wd[len(vol)+1:] - } else if wd[0] == '/' { - wd = wd[1:] - } - wd = filepath.ToSlash(wd) - - t.Logf("wd: %s", wd) + wd := wdForTest(t) fsys := WrapWdFS(fstest.MapFS{ "tmp": {Mode: fs.ModeDir | 0o777}, @@ -90,84 +79,22 @@ func setupMergeFsys(ctx context.Context, t *testing.T) fs.FS { return fsys } -// func TestReadMerge(t *testing.T) { -// ctx := context.Background() - -// jsonContent := `{"hello": "world"}` -// yamlContent := "hello: earth\ngoodnight: moon\n" -// arrayContent := `["hello", "world"]` - -// mergedContent := "goodnight: moon\nhello: world\n" - -// fsys := fstest.MapFS{} -// fsys["tmp"] = &fstest.MapFile{Mode: fs.ModeDir | 0777} -// fsys["tmp/jsonfile.json"] = &fstest.MapFile{Data: []byte(jsonContent)} -// fsys["tmp/array.json"] = &fstest.MapFile{Data: []byte(arrayContent)} -// fsys["tmp/yamlfile.yaml"] = &fstest.MapFile{Data: []byte(yamlContent)} -// fsys["tmp/textfile.txt"] = &fstest.MapFile{Data: []byte(`plain text...`)} - -// // workding dir with volume name trimmed -// wd, _ := os.Getwd() -// vol := filepath.VolumeName(wd) -// wd = wd[len(vol)+1:] - -// fsys[path.Join(wd, "jsonfile.json")] = &fstest.MapFile{Data: []byte(jsonContent)} -// fsys[path.Join(wd, "array.json")] = &fstest.MapFile{Data: []byte(arrayContent)} -// fsys[path.Join(wd, "yamlfile.yaml")] = &fstest.MapFile{Data: []byte(yamlContent)} -// fsys[path.Join(wd, "textfile.txt")] = &fstest.MapFile{Data: []byte(`plain text...`)} - -// fsmux := fsimpl.NewMux() -// fsmux.Add(fsimpl.WrappedFSProvider(&fsys, "file")) -// ctx = datafs.ContextWithFSProvider(ctx, fsmux) - -// source := &Source{Alias: "foo", URL: mustParseURL("merge:file:///tmp/jsonfile.json|file:///tmp/yamlfile.yaml")} -// d := &Data{ -// Sources: map[string]*Source{ -// "foo": source, -// "bar": {Alias: "bar", URL: mustParseURL("file:///tmp/jsonfile.json")}, -// "baz": {Alias: "baz", URL: mustParseURL("file:///tmp/yamlfile.yaml")}, -// "text": {Alias: "text", URL: mustParseURL("file:///tmp/textfile.txt")}, -// "badscheme": {Alias: "badscheme", URL: mustParseURL("bad:///scheme.json")}, -// "badtype": {Alias: "badtype", URL: mustParseURL("file:///tmp/textfile.txt?type=foo/bar")}, -// "array": {Alias: "array", URL: mustParseURL("file:///tmp/array.json?type=" + url.QueryEscape(jsonArrayMimetype))}, -// }, -// Ctx: ctx, -// } - -// actual, err := d.readMerge(ctx, source) -// require.NoError(t, err) -// assert.Equal(t, mergedContent, string(actual)) - -// source.URL = mustParseURL("merge:bar|baz") -// actual, err = d.readMerge(ctx, source) -// require.NoError(t, err) -// assert.Equal(t, mergedContent, string(actual)) - -// source.URL = mustParseURL("merge:./jsonfile.json|baz") -// actual, err = d.readMerge(ctx, source) -// require.NoError(t, err) -// assert.Equal(t, mergedContent, string(actual)) - -// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json") -// _, err = d.readMerge(ctx, source) -// require.Error(t, err) - -// source.URL = mustParseURL("merge:bogusalias|file:///tmp/jsonfile.json") -// _, err = d.readMerge(ctx, source) -// require.Error(t, err) - -// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|badscheme") -// _, err = d.readMerge(ctx, source) -// require.Error(t, err) - -// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|badtype") -// _, err = d.readMerge(ctx, source) -// require.Error(t, err) - -// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|array") -// _, err = d.readMerge(ctx, source) -// require.Error(t, err) -// } +func wdForTest(t *testing.T) string { + t.Helper() + + wd, _ := os.Getwd() + + // MapFS doesn't support windows path separators, so we use / exclusively + vol := filepath.VolumeName(wd) + if vol != "" && wd != vol { + wd = wd[len(vol)+1:] + } else if wd[0] == '/' { + wd = wd[1:] + } + wd = filepath.ToSlash(wd) + + return wd +} func TestMergeData(t *testing.T) { def := map[string]interface{}{ @@ -228,7 +155,6 @@ func TestMergeData(t *testing.T) { } func TestMergeFS_Open(t *testing.T) { - // u, _ := url.Parse("merge:") fsys := setupMergeFsys(context.Background(), t) assert.IsType(t, &mergeFS{}, fsys) @@ -354,3 +280,55 @@ func TestMergeFS_ReadFile(t *testing.T) { }) } } + +func TestMergeFS_ReadsSubFilesOnce(t *testing.T) { + mergedContent := "goodnight: moon\nhello: world\n" + + wd := wdForTest(t) + + fsys := WrapWdFS( + openOnce(&fstest.MapFS{ + path.Join(wd, "tmp/jsonfile.json"): {Data: []byte(`{"hello": "world"}`)}, + path.Join(wd, "tmp/yamlfile.yaml"): {Data: []byte("hello: earth\ngoodnight: moon\n")}, + })) + + mux := fsimpl.NewMux() + mux.Add(MergeFS) + mux.Add(WrappedFSProvider(fsys, "file", "")) + + ctx := ContextWithFSProvider(context.Background(), mux) + + reg := NewRegistry() + reg.Register("jsonfile", config.DataSource{URL: mustParseURL("tmp/jsonfile.json")}) + reg.Register("yamlfile", config.DataSource{URL: mustParseURL("tmp/yamlfile.yaml")}) + + fsys, err := NewMergeFS(mustParseURL("merge:///")) + require.NoError(t, err) + + fsys = WithDataSourceRegistryFS(reg, fsys) + fsys = fsimpl.WithContextFS(ctx, fsys) + + b, err := fs.ReadFile(fsys, "jsonfile|yamlfile") + require.NoError(t, err) + assert.Equal(t, mergedContent, string(b)) +} + +type openOnceFS struct { + fs *fstest.MapFS + opened map[string]struct{} +} + +// a filesystem that only allows opening or stating a file once +func openOnce(fsys *fstest.MapFS) fs.FS { + return &openOnceFS{fs: fsys, opened: map[string]struct{}{}} +} + +func (f *openOnceFS) Open(name string) (fs.File, error) { + if _, ok := f.opened[name]; ok { + return nil, fmt.Errorf("open: %q already opened", name) + } + + f.opened[name] = struct{}{} + + return f.fs.Open(name) +} |
