summaryrefslogtreecommitdiff
path: root/data
diff options
context:
space:
mode:
authorDave Henderson <dhenderson@gmail.com>2019-10-14 21:49:06 -0400
committerDave Henderson <dhenderson@gmail.com>2019-10-15 19:17:19 -0400
commit2304c68127c613b386ef6f2fbc12453313fac51a (patch)
tree3191a24c14edfa97534f600774d77a035112c409 /data
parentb6d60b450bf4c291d8f0673baee35ef9c0012731 (diff)
Supporting gs (Google Cloud Storage) datasources
Signed-off-by: Dave Henderson <dhenderson@gmail.com>
Diffstat (limited to 'data')
-rw-r--r--data/datasource.go1
-rw-r--r--data/datasource_blob.go84
-rw-r--r--data/datasource_blob_test.go6
3 files changed, 72 insertions, 19 deletions
diff --git a/data/datasource.go b/data/datasource.go
index 8222f670..929dab13 100644
--- a/data/datasource.go
+++ b/data/datasource.go
@@ -60,6 +60,7 @@ func (d *Data) registerReaders() {
d.sourceReaders["vault+http"] = readVault
d.sourceReaders["vault+https"] = readVault
d.sourceReaders["s3"] = readBlob
+ d.sourceReaders["gs"] = readBlob
}
// lookupReader - return the reader function for the given scheme
diff --git a/data/datasource_blob.go b/data/datasource_blob.go
index 69a712a4..1898270e 100644
--- a/data/datasource_blob.go
+++ b/data/datasource_blob.go
@@ -10,18 +10,19 @@ import (
"path"
"strings"
- "gocloud.dev/blob"
-
gaws "github.com/hairyhenderson/gomplate/aws"
"github.com/hairyhenderson/gomplate/env"
"github.com/pkg/errors"
+ "gocloud.dev/blob"
+ "gocloud.dev/blob/gcsblob"
"gocloud.dev/blob/s3blob"
+ "gocloud.dev/gcp"
)
func readBlob(source *Source, args ...string) (output []byte, err error) {
if len(args) >= 2 {
- return nil, errors.New("Maximum two arguments to s3 datasource: alias, extraPath")
+ return nil, errors.New("Maximum two arguments to blob datasource: alias, extraPath")
}
ctx := context.TODO()
@@ -31,14 +32,16 @@ func readBlob(source *Source, args ...string) (output []byte, err error) {
key = path.Join(key, args[0])
}
- // set up a "regular" gomplate AWS SDK session
- sess := gaws.SDKSession()
- // see https://gocloud.dev/concepts/urls/#muxes
- opener := &s3blob.URLOpener{ConfigProvider: sess}
+ opener, err := newOpener(ctx, source.URL)
+ if err != nil {
+ return nil, err
+ }
+
mux := blob.URLMux{}
- mux.RegisterBucket(s3blob.Scheme, opener)
+ mux.RegisterBucket(source.URL.Scheme, opener)
- bucket, err := mux.OpenBucket(ctx, blobURL(source.URL))
+ u := blobURL(source.URL)
+ bucket, err := mux.OpenBucket(ctx, u)
if err != nil {
return nil, err
}
@@ -58,10 +61,38 @@ func readBlob(source *Source, args ...string) (output []byte, err error) {
return data, err
}
+// create the correct kind of blob.BucketURLOpener for the given URL
+func newOpener(ctx context.Context, u *url.URL) (opener blob.BucketURLOpener, err error) {
+ switch u.Scheme {
+ case "s3":
+ // set up a "regular" gomplate AWS SDK session
+ sess := gaws.SDKSession()
+ // see https://gocloud.dev/concepts/urls/#muxes
+ opener = &s3blob.URLOpener{ConfigProvider: sess}
+ case "gs":
+ creds, err := gcp.DefaultCredentials(ctx)
+ if err != nil {
+ return nil, errors.Wrap(err, "failed to retrieve GCP credentials")
+ }
+
+ client, err := gcp.NewHTTPClient(
+ gcp.DefaultTransport(),
+ gcp.CredentialsTokenSource(creds))
+ if err != nil {
+ return nil, errors.Wrap(err, "failed to create GCP HTTP client")
+ }
+ opener = &gcsblob.URLOpener{
+ Client: client,
+ }
+ }
+ return opener, nil
+}
+
func getBlob(ctx context.Context, bucket *blob.Bucket, key string) (mediaType string, data []byte, err error) {
+ key = strings.TrimPrefix(key, "/")
attr, err := bucket.Attributes(ctx, key)
if err != nil {
- return "", nil, err
+ return "", nil, errors.Wrapf(err, "failed to retrieve attributes for %s", key)
}
if attr.ContentType != "" {
mt, _, e := mime.ParseMediaType(attr.ContentType)
@@ -71,7 +102,7 @@ func getBlob(ctx context.Context, bucket *blob.Bucket, key string) (mediaType st
mediaType = mt
}
data, err = bucket.ReadAll(ctx, key)
- return mediaType, data, err
+ return mediaType, data, errors.Wrapf(err, "failed to read %s", key)
}
// calls the bucket listing API, returning a JSON Array
@@ -109,18 +140,33 @@ func listBucket(ctx context.Context, bucket *blob.Bucket, path string) (mediaTyp
func blobURL(u *url.URL) string {
out, _ := url.Parse(u.String())
q := out.Query()
+
for param := range q {
- switch param {
- case "region", "endpoint", "disableSSL", "s3ForcePathStyle":
- default:
- q.Del(param)
+ switch u.Scheme {
+ case "s3":
+ switch param {
+ case "region", "endpoint", "disableSSL", "s3ForcePathStyle":
+ default:
+ q.Del(param)
+ }
+ case "gs":
+ switch param {
+ case "access_id", "private_key_path":
+ default:
+ q.Del(param)
+ }
}
}
- // handle AWS_S3_ENDPOINT env var
- endpoint := env.Getenv("AWS_S3_ENDPOINT")
- if endpoint != "" {
- q.Set("endpoint", endpoint)
+
+ if u.Scheme == "s3" {
+ // handle AWS_S3_ENDPOINT env var
+ endpoint := env.Getenv("AWS_S3_ENDPOINT")
+ if endpoint != "" {
+ q.Set("endpoint", endpoint)
+ }
}
+
out.RawQuery = q.Encode()
+
return out.String()
}
diff --git a/data/datasource_blob_test.go b/data/datasource_blob_test.go
index eebcfbf1..75d466f1 100644
--- a/data/datasource_blob_test.go
+++ b/data/datasource_blob_test.go
@@ -118,6 +118,12 @@ func TestBlobURL(t *testing.T) {
{"s3://foo/bar/baz?region=us-east-1", "s3://foo/bar/baz?region=us-east-1"},
{"s3://foo/bar/baz?disableSSL=true&type=text/csv", "s3://foo/bar/baz?disableSSL=true"},
{"s3://foo/bar/baz?type=text/csv&s3ForcePathStyle=true&endpoint=1.2.3.4", "s3://foo/bar/baz?endpoint=1.2.3.4&s3ForcePathStyle=true"},
+ {"gs://foo/bar/baz", "gs://foo/bar/baz"},
+ {"gs://foo/bar/baz?type=foo/bar", "gs://foo/bar/baz"},
+ {"gs://foo/bar/baz?access_id=123", "gs://foo/bar/baz?access_id=123"},
+ {"gs://foo/bar/baz?private_key_path=/foo/bar", "gs://foo/bar/baz?private_key_path=%2Ffoo%2Fbar"},
+ {"gs://foo/bar/baz?private_key_path=key.json&foo=bar", "gs://foo/bar/baz?private_key_path=key.json"},
+ {"gs://foo/bar/baz?private_key_path=key.json&foo=bar&access_id=abcd", "gs://foo/bar/baz?access_id=abcd&private_key_path=key.json"},
}
for _, d := range data {