diff options
| author | Dave Henderson <dhenderson@gmail.com> | 2019-08-17 11:53:32 -0400 |
|---|---|---|
| committer | Dave Henderson <dhenderson@gmail.com> | 2019-10-13 12:03:59 -0400 |
| commit | b2a1fb672bdd6d5e804e22766618cafa334af752 (patch) | |
| tree | 0177dbfe1b80aad8e3b166291efb931cfd6d5728 /data | |
| parent | d0d67b1dd388ee9ae0c4afd750cf1e1f11d3032a (diff) | |
Supporting s3 datasources
Signed-off-by: Dave Henderson <dhenderson@gmail.com>
Diffstat (limited to 'data')
| -rw-r--r-- | data/datasource.go | 1 | ||||
| -rw-r--r-- | data/datasource_blob.go | 126 | ||||
| -rw-r--r-- | data/datasource_blob_test.go | 128 |
3 files changed, 255 insertions, 0 deletions
diff --git a/data/datasource.go b/data/datasource.go index ee4229d4..8222f670 100644 --- a/data/datasource.go +++ b/data/datasource.go @@ -59,6 +59,7 @@ func (d *Data) registerReaders() { d.sourceReaders["vault"] = readVault d.sourceReaders["vault+http"] = readVault d.sourceReaders["vault+https"] = readVault + d.sourceReaders["s3"] = readBlob } // lookupReader - return the reader function for the given scheme diff --git a/data/datasource_blob.go b/data/datasource_blob.go new file mode 100644 index 00000000..69a712a4 --- /dev/null +++ b/data/datasource_blob.go @@ -0,0 +1,126 @@ +package data + +import ( + "bytes" + "context" + "encoding/json" + "io" + "mime" + "net/url" + "path" + "strings" + + "gocloud.dev/blob" + + gaws "github.com/hairyhenderson/gomplate/aws" + "github.com/hairyhenderson/gomplate/env" + "github.com/pkg/errors" + + "gocloud.dev/blob/s3blob" +) + +func readBlob(source *Source, args ...string) (output []byte, err error) { + if len(args) >= 2 { + return nil, errors.New("Maximum two arguments to s3 datasource: alias, extraPath") + } + + ctx := context.TODO() + + key := source.URL.Path + if len(args) == 1 { + key = path.Join(key, args[0]) + } + + // set up a "regular" gomplate AWS SDK session + sess := gaws.SDKSession() + // see https://gocloud.dev/concepts/urls/#muxes + opener := &s3blob.URLOpener{ConfigProvider: sess} + mux := blob.URLMux{} + mux.RegisterBucket(s3blob.Scheme, opener) + + bucket, err := mux.OpenBucket(ctx, blobURL(source.URL)) + if err != nil { + return nil, err + } + defer bucket.Close() + + var r func(context.Context, *blob.Bucket, string) (string, []byte, error) + if strings.HasSuffix(key, "/") { + r = listBucket + } else { + r = getBlob + } + + mediaType, data, err := r(ctx, bucket, key) + if mediaType != "" { + source.mediaType = mediaType + } + return data, err +} + +func getBlob(ctx context.Context, bucket *blob.Bucket, key string) (mediaType string, data []byte, err error) { + attr, err := bucket.Attributes(ctx, key) + if err != nil { + return "", nil, err + } + if attr.ContentType != "" { + mt, _, e := mime.ParseMediaType(attr.ContentType) + if e != nil { + return "", nil, e + } + mediaType = mt + } + data, err = bucket.ReadAll(ctx, key) + return mediaType, data, err +} + +// calls the bucket listing API, returning a JSON Array +func listBucket(ctx context.Context, bucket *blob.Bucket, path string) (mediaType string, data []byte, err error) { + path = strings.TrimPrefix(path, "/") + opts := &blob.ListOptions{ + Prefix: path, + Delimiter: "/", + } + li := bucket.List(opts) + keys := []string{} + for { + obj, err := li.Next(ctx) + if err == io.EOF { + break + } + if err != nil { + return "", nil, err + } + keys = append(keys, strings.TrimPrefix(obj.Key, path)) + } + + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + if err := enc.Encode(keys); err != nil { + return "", nil, err + } + b := buf.Bytes() + // chop off the newline added by the json encoder + data = b[:len(b)-1] + return jsonArrayMimetype, data, nil +} + +// copy/sanitize the URL for the Go CDK - it doesn't like params it can't parse +func blobURL(u *url.URL) string { + out, _ := url.Parse(u.String()) + q := out.Query() + for param := range q { + switch param { + case "region", "endpoint", "disableSSL", "s3ForcePathStyle": + default: + q.Del(param) + } + } + // handle AWS_S3_ENDPOINT env var + endpoint := env.Getenv("AWS_S3_ENDPOINT") + if endpoint != "" { + q.Set("endpoint", endpoint) + } + out.RawQuery = q.Encode() + return out.String() +} diff --git a/data/datasource_blob_test.go b/data/datasource_blob_test.go new file mode 100644 index 00000000..eebcfbf1 --- /dev/null +++ b/data/datasource_blob_test.go @@ -0,0 +1,128 @@ +package data + +import ( + "bytes" + "net/http/httptest" + "net/url" + "os" + "testing" + + "github.com/johannesboyne/gofakes3" + "github.com/johannesboyne/gofakes3/backend/s3mem" + + "github.com/stretchr/testify/assert" +) + +func setupTestBucket(t *testing.T) (*httptest.Server, *url.URL) { + backend := s3mem.New() + faker := gofakes3.New(backend) + ts := httptest.NewServer(faker.Server()) + + err := backend.CreateBucket("mybucket") + assert.NoError(t, err) + c := "hello" + err = putFile(backend, "mybucket", "file1", "text/plain", c) + assert.NoError(t, err) + + c = `{"value": "goodbye world"}` + err = putFile(backend, "mybucket", "file2", "application/json", c) + assert.NoError(t, err) + + c = `value: what a world` + err = putFile(backend, "mybucket", "file3", "application/yaml", c) + assert.NoError(t, err) + + c = `value: out of this world` + err = putFile(backend, "mybucket", "dir1/file1", "application/yaml", c) + assert.NoError(t, err) + + c = `value: foo` + err = putFile(backend, "mybucket", "dir1/file2", "application/yaml", c) + assert.NoError(t, err) + + u, _ := url.Parse(ts.URL) + return ts, u +} + +func putFile(backend gofakes3.Backend, bucket, file, mime, content string) error { + _, err := backend.PutObject( + bucket, + file, + map[string]string{"Content-Type": mime}, + bytes.NewBufferString(content), + int64(len(content)), + ) + return err +} + +func TestReadBlob(t *testing.T) { + _, err := readBlob(nil, "foo", "bar") + assert.Error(t, err) + + ts, u := setupTestBucket(t) + defer ts.Close() + + os.Setenv("AWS_ANON", "true") + defer os.Unsetenv("AWS_ANON") + + d, err := NewData([]string{"-d", "data=s3://mybucket/file1?region=us-east-1&disableSSL=true&s3ForcePathStyle=true&type=text/plain&endpoint=" + u.Host}, nil) + assert.NoError(t, err) + + var expected interface{} + expected = "hello" + out, err := d.Datasource("data") + assert.NoError(t, err) + assert.Equal(t, expected, out) + + os.Unsetenv("AWS_ANON") + + os.Setenv("AWS_ACCESS_KEY_ID", "fake") + os.Setenv("AWS_SECRET_ACCESS_KEY", "fake") + defer os.Unsetenv("AWS_ACCESS_KEY_ID") + defer os.Unsetenv("AWS_SECRET_ACCESS_KEY") + os.Setenv("AWS_S3_ENDPOINT", u.Host) + defer os.Unsetenv("AWS_S3_ENDPOINT") + + d, err = NewData([]string{"-d", "data=s3://mybucket/file2?region=us-east-1&disableSSL=true&s3ForcePathStyle=true"}, nil) + assert.NoError(t, err) + + expected = map[string]interface{}{"value": "goodbye world"} + out, err = d.Datasource("data") + assert.NoError(t, err) + assert.Equal(t, expected, out) + + d, err = NewData([]string{"-d", "data=s3://mybucket/?region=us-east-1&disableSSL=true&s3ForcePathStyle=true"}, nil) + assert.NoError(t, err) + + expected = []interface{}{"dir1/", "file1", "file2", "file3"} + out, err = d.Datasource("data") + assert.NoError(t, err) + assert.EqualValues(t, expected, out) + + d, err = NewData([]string{"-d", "data=s3://mybucket/dir1/?region=us-east-1&disableSSL=true&s3ForcePathStyle=true"}, nil) + assert.NoError(t, err) + + expected = []interface{}{"file1", "file2"} + out, err = d.Datasource("data") + assert.NoError(t, err) + assert.EqualValues(t, expected, out) +} + +func TestBlobURL(t *testing.T) { + data := []struct { + in string + expected string + }{ + {"s3://foo/bar/baz", "s3://foo/bar/baz"}, + {"s3://foo/bar/baz?type=hello/world", "s3://foo/bar/baz"}, + {"s3://foo/bar/baz?region=us-east-1", "s3://foo/bar/baz?region=us-east-1"}, + {"s3://foo/bar/baz?disableSSL=true&type=text/csv", "s3://foo/bar/baz?disableSSL=true"}, + {"s3://foo/bar/baz?type=text/csv&s3ForcePathStyle=true&endpoint=1.2.3.4", "s3://foo/bar/baz?endpoint=1.2.3.4&s3ForcePathStyle=true"}, + } + + for _, d := range data { + u, _ := url.Parse(d.in) + out := blobURL(u) + assert.Equal(t, d.expected, out) + } +} |
