summaryrefslogtreecommitdiff
path: root/data
diff options
context:
space:
mode:
authorDave Henderson <dhenderson@gmail.com>2019-08-17 11:53:32 -0400
committerDave Henderson <dhenderson@gmail.com>2019-10-13 12:03:59 -0400
commitb2a1fb672bdd6d5e804e22766618cafa334af752 (patch)
tree0177dbfe1b80aad8e3b166291efb931cfd6d5728 /data
parentd0d67b1dd388ee9ae0c4afd750cf1e1f11d3032a (diff)
Supporting s3 datasources
Signed-off-by: Dave Henderson <dhenderson@gmail.com>
Diffstat (limited to 'data')
-rw-r--r--data/datasource.go1
-rw-r--r--data/datasource_blob.go126
-rw-r--r--data/datasource_blob_test.go128
3 files changed, 255 insertions, 0 deletions
diff --git a/data/datasource.go b/data/datasource.go
index ee4229d4..8222f670 100644
--- a/data/datasource.go
+++ b/data/datasource.go
@@ -59,6 +59,7 @@ func (d *Data) registerReaders() {
d.sourceReaders["vault"] = readVault
d.sourceReaders["vault+http"] = readVault
d.sourceReaders["vault+https"] = readVault
+ d.sourceReaders["s3"] = readBlob
}
// lookupReader - return the reader function for the given scheme
diff --git a/data/datasource_blob.go b/data/datasource_blob.go
new file mode 100644
index 00000000..69a712a4
--- /dev/null
+++ b/data/datasource_blob.go
@@ -0,0 +1,126 @@
+package data
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "io"
+ "mime"
+ "net/url"
+ "path"
+ "strings"
+
+ "gocloud.dev/blob"
+
+ gaws "github.com/hairyhenderson/gomplate/aws"
+ "github.com/hairyhenderson/gomplate/env"
+ "github.com/pkg/errors"
+
+ "gocloud.dev/blob/s3blob"
+)
+
+func readBlob(source *Source, args ...string) (output []byte, err error) {
+ if len(args) >= 2 {
+ return nil, errors.New("Maximum two arguments to s3 datasource: alias, extraPath")
+ }
+
+ ctx := context.TODO()
+
+ key := source.URL.Path
+ if len(args) == 1 {
+ key = path.Join(key, args[0])
+ }
+
+ // set up a "regular" gomplate AWS SDK session
+ sess := gaws.SDKSession()
+ // see https://gocloud.dev/concepts/urls/#muxes
+ opener := &s3blob.URLOpener{ConfigProvider: sess}
+ mux := blob.URLMux{}
+ mux.RegisterBucket(s3blob.Scheme, opener)
+
+ bucket, err := mux.OpenBucket(ctx, blobURL(source.URL))
+ if err != nil {
+ return nil, err
+ }
+ defer bucket.Close()
+
+ var r func(context.Context, *blob.Bucket, string) (string, []byte, error)
+ if strings.HasSuffix(key, "/") {
+ r = listBucket
+ } else {
+ r = getBlob
+ }
+
+ mediaType, data, err := r(ctx, bucket, key)
+ if mediaType != "" {
+ source.mediaType = mediaType
+ }
+ return data, err
+}
+
+func getBlob(ctx context.Context, bucket *blob.Bucket, key string) (mediaType string, data []byte, err error) {
+ attr, err := bucket.Attributes(ctx, key)
+ if err != nil {
+ return "", nil, err
+ }
+ if attr.ContentType != "" {
+ mt, _, e := mime.ParseMediaType(attr.ContentType)
+ if e != nil {
+ return "", nil, e
+ }
+ mediaType = mt
+ }
+ data, err = bucket.ReadAll(ctx, key)
+ return mediaType, data, err
+}
+
+// calls the bucket listing API, returning a JSON Array
+func listBucket(ctx context.Context, bucket *blob.Bucket, path string) (mediaType string, data []byte, err error) {
+ path = strings.TrimPrefix(path, "/")
+ opts := &blob.ListOptions{
+ Prefix: path,
+ Delimiter: "/",
+ }
+ li := bucket.List(opts)
+ keys := []string{}
+ for {
+ obj, err := li.Next(ctx)
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return "", nil, err
+ }
+ keys = append(keys, strings.TrimPrefix(obj.Key, path))
+ }
+
+ var buf bytes.Buffer
+ enc := json.NewEncoder(&buf)
+ if err := enc.Encode(keys); err != nil {
+ return "", nil, err
+ }
+ b := buf.Bytes()
+ // chop off the newline added by the json encoder
+ data = b[:len(b)-1]
+ return jsonArrayMimetype, data, nil
+}
+
+// copy/sanitize the URL for the Go CDK - it doesn't like params it can't parse
+func blobURL(u *url.URL) string {
+ out, _ := url.Parse(u.String())
+ q := out.Query()
+ for param := range q {
+ switch param {
+ case "region", "endpoint", "disableSSL", "s3ForcePathStyle":
+ default:
+ q.Del(param)
+ }
+ }
+ // handle AWS_S3_ENDPOINT env var
+ endpoint := env.Getenv("AWS_S3_ENDPOINT")
+ if endpoint != "" {
+ q.Set("endpoint", endpoint)
+ }
+ out.RawQuery = q.Encode()
+ return out.String()
+}
diff --git a/data/datasource_blob_test.go b/data/datasource_blob_test.go
new file mode 100644
index 00000000..eebcfbf1
--- /dev/null
+++ b/data/datasource_blob_test.go
@@ -0,0 +1,128 @@
+package data
+
+import (
+ "bytes"
+ "net/http/httptest"
+ "net/url"
+ "os"
+ "testing"
+
+ "github.com/johannesboyne/gofakes3"
+ "github.com/johannesboyne/gofakes3/backend/s3mem"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func setupTestBucket(t *testing.T) (*httptest.Server, *url.URL) {
+ backend := s3mem.New()
+ faker := gofakes3.New(backend)
+ ts := httptest.NewServer(faker.Server())
+
+ err := backend.CreateBucket("mybucket")
+ assert.NoError(t, err)
+ c := "hello"
+ err = putFile(backend, "mybucket", "file1", "text/plain", c)
+ assert.NoError(t, err)
+
+ c = `{"value": "goodbye world"}`
+ err = putFile(backend, "mybucket", "file2", "application/json", c)
+ assert.NoError(t, err)
+
+ c = `value: what a world`
+ err = putFile(backend, "mybucket", "file3", "application/yaml", c)
+ assert.NoError(t, err)
+
+ c = `value: out of this world`
+ err = putFile(backend, "mybucket", "dir1/file1", "application/yaml", c)
+ assert.NoError(t, err)
+
+ c = `value: foo`
+ err = putFile(backend, "mybucket", "dir1/file2", "application/yaml", c)
+ assert.NoError(t, err)
+
+ u, _ := url.Parse(ts.URL)
+ return ts, u
+}
+
+func putFile(backend gofakes3.Backend, bucket, file, mime, content string) error {
+ _, err := backend.PutObject(
+ bucket,
+ file,
+ map[string]string{"Content-Type": mime},
+ bytes.NewBufferString(content),
+ int64(len(content)),
+ )
+ return err
+}
+
+func TestReadBlob(t *testing.T) {
+ _, err := readBlob(nil, "foo", "bar")
+ assert.Error(t, err)
+
+ ts, u := setupTestBucket(t)
+ defer ts.Close()
+
+ os.Setenv("AWS_ANON", "true")
+ defer os.Unsetenv("AWS_ANON")
+
+ d, err := NewData([]string{"-d", "data=s3://mybucket/file1?region=us-east-1&disableSSL=true&s3ForcePathStyle=true&type=text/plain&endpoint=" + u.Host}, nil)
+ assert.NoError(t, err)
+
+ var expected interface{}
+ expected = "hello"
+ out, err := d.Datasource("data")
+ assert.NoError(t, err)
+ assert.Equal(t, expected, out)
+
+ os.Unsetenv("AWS_ANON")
+
+ os.Setenv("AWS_ACCESS_KEY_ID", "fake")
+ os.Setenv("AWS_SECRET_ACCESS_KEY", "fake")
+ defer os.Unsetenv("AWS_ACCESS_KEY_ID")
+ defer os.Unsetenv("AWS_SECRET_ACCESS_KEY")
+ os.Setenv("AWS_S3_ENDPOINT", u.Host)
+ defer os.Unsetenv("AWS_S3_ENDPOINT")
+
+ d, err = NewData([]string{"-d", "data=s3://mybucket/file2?region=us-east-1&disableSSL=true&s3ForcePathStyle=true"}, nil)
+ assert.NoError(t, err)
+
+ expected = map[string]interface{}{"value": "goodbye world"}
+ out, err = d.Datasource("data")
+ assert.NoError(t, err)
+ assert.Equal(t, expected, out)
+
+ d, err = NewData([]string{"-d", "data=s3://mybucket/?region=us-east-1&disableSSL=true&s3ForcePathStyle=true"}, nil)
+ assert.NoError(t, err)
+
+ expected = []interface{}{"dir1/", "file1", "file2", "file3"}
+ out, err = d.Datasource("data")
+ assert.NoError(t, err)
+ assert.EqualValues(t, expected, out)
+
+ d, err = NewData([]string{"-d", "data=s3://mybucket/dir1/?region=us-east-1&disableSSL=true&s3ForcePathStyle=true"}, nil)
+ assert.NoError(t, err)
+
+ expected = []interface{}{"file1", "file2"}
+ out, err = d.Datasource("data")
+ assert.NoError(t, err)
+ assert.EqualValues(t, expected, out)
+}
+
+func TestBlobURL(t *testing.T) {
+ data := []struct {
+ in string
+ expected string
+ }{
+ {"s3://foo/bar/baz", "s3://foo/bar/baz"},
+ {"s3://foo/bar/baz?type=hello/world", "s3://foo/bar/baz"},
+ {"s3://foo/bar/baz?region=us-east-1", "s3://foo/bar/baz?region=us-east-1"},
+ {"s3://foo/bar/baz?disableSSL=true&type=text/csv", "s3://foo/bar/baz?disableSSL=true"},
+ {"s3://foo/bar/baz?type=text/csv&s3ForcePathStyle=true&endpoint=1.2.3.4", "s3://foo/bar/baz?endpoint=1.2.3.4&s3ForcePathStyle=true"},
+ }
+
+ for _, d := range data {
+ u, _ := url.Parse(d.in)
+ out := blobURL(u)
+ assert.Equal(t, d.expected, out)
+ }
+}