summaryrefslogtreecommitdiff
path: root/data/datasource_blob.go
diff options
context:
space:
mode:
authorDave Henderson <dhenderson@gmail.com>2024-01-22 09:06:33 -0500
committerGitHub <noreply@github.com>2024-01-22 09:06:33 -0500
commit0ac3aa24bf2e4ada9c26fd9ef5b7f0ae8c6b6cfb (patch)
tree9a95f27eec1e77ef8bfefcb2810f7e41681627a5 /data/datasource_blob.go
parentf837061f953bda1e8b42095c6dba0496de11d993 (diff)
Use go-fsimpl to read from datasources (#1336)
* Use go-fsimpl to read from datasources Signed-off-by: Dave Henderson <dhenderson@gmail.com> * trying to fix windows bug Signed-off-by: Dave Henderson <dhenderson@gmail.com> * attempts to fix some of the path madness Signed-off-by: Dave Henderson <dhenderson@gmail.com> * remove 'HOME' from expected env vars Signed-off-by: Dave Henderson <dhenderson@gmail.com> * more tweaks Signed-off-by: Dave Henderson <dhenderson@gmail.com> * lint fix Signed-off-by: Dave Henderson <dhenderson@gmail.com> --------- Signed-off-by: Dave Henderson <dhenderson@gmail.com>
Diffstat (limited to 'data/datasource_blob.go')
-rw-r--r--data/datasource_blob.go173
1 files changed, 0 insertions, 173 deletions
diff --git a/data/datasource_blob.go b/data/datasource_blob.go
deleted file mode 100644
index 1dac584a..00000000
--- a/data/datasource_blob.go
+++ /dev/null
@@ -1,173 +0,0 @@
-package data
-
-import (
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "io"
- "mime"
- "net/url"
- "path"
- "strings"
-
- gaws "github.com/hairyhenderson/gomplate/v4/aws"
- "github.com/hairyhenderson/gomplate/v4/env"
-
- "gocloud.dev/blob"
- "gocloud.dev/blob/gcsblob"
- "gocloud.dev/blob/s3blob"
- "gocloud.dev/gcp"
-)
-
-func readBlob(ctx context.Context, source *Source, args ...string) (output []byte, err error) {
- if len(args) >= 2 {
- return nil, fmt.Errorf("maximum two arguments to blob datasource: alias, extraPath")
- }
-
- key := source.URL.Path
- if len(args) == 1 {
- key = path.Join(key, args[0])
- }
-
- opener, err := newOpener(ctx, source.URL)
- if err != nil {
- return nil, err
- }
-
- mux := blob.URLMux{}
- mux.RegisterBucket(source.URL.Scheme, opener)
-
- u := blobURL(source.URL)
- bucket, err := mux.OpenBucket(ctx, u)
- if err != nil {
- return nil, err
- }
- defer bucket.Close()
-
- var r func(context.Context, *blob.Bucket, string) (string, []byte, error)
- if strings.HasSuffix(key, "/") {
- r = listBucket
- } else {
- r = getBlob
- }
-
- mediaType, data, err := r(ctx, bucket, key)
- if mediaType != "" {
- source.mediaType = mediaType
- }
- return data, err
-}
-
-// create the correct kind of blob.BucketURLOpener for the given URL
-func newOpener(ctx context.Context, u *url.URL) (opener blob.BucketURLOpener, err error) {
- switch u.Scheme {
- case "s3":
- // set up a "regular" gomplate AWS SDK session
- sess := gaws.SDKSession()
- // see https://gocloud.dev/concepts/urls/#muxes
- opener = &s3blob.URLOpener{ConfigProvider: sess}
- case "gs":
- creds, err := gcp.DefaultCredentials(ctx)
- if err != nil {
- return nil, fmt.Errorf("failed to retrieve GCP credentials: %w", err)
- }
-
- client, err := gcp.NewHTTPClient(
- gcp.DefaultTransport(),
- gcp.CredentialsTokenSource(creds))
- if err != nil {
- return nil, fmt.Errorf("failed to create GCP HTTP client: %w", err)
- }
- opener = &gcsblob.URLOpener{
- Client: client,
- }
- }
- return opener, nil
-}
-
-func getBlob(ctx context.Context, bucket *blob.Bucket, key string) (mediaType string, data []byte, err error) {
- key = strings.TrimPrefix(key, "/")
- attr, err := bucket.Attributes(ctx, key)
- if err != nil {
- return "", nil, fmt.Errorf("failed to retrieve attributes for %s: %w", key, err)
- }
- if attr.ContentType != "" {
- mt, _, e := mime.ParseMediaType(attr.ContentType)
- if e != nil {
- return "", nil, e
- }
- mediaType = mt
- }
- data, err = bucket.ReadAll(ctx, key)
- if err != nil {
- return "", nil, fmt.Errorf("failed to read %s: %w", key, err)
- }
- return mediaType, data, nil
-}
-
-// calls the bucket listing API, returning a JSON Array
-func listBucket(ctx context.Context, bucket *blob.Bucket, path string) (mediaType string, data []byte, err error) {
- path = strings.TrimPrefix(path, "/")
- opts := &blob.ListOptions{
- Prefix: path,
- Delimiter: "/",
- }
- li := bucket.List(opts)
- keys := []string{}
- for {
- obj, err := li.Next(ctx)
- if err == io.EOF {
- break
- }
- if err != nil {
- return "", nil, err
- }
- keys = append(keys, strings.TrimPrefix(obj.Key, path))
- }
-
- var buf bytes.Buffer
- enc := json.NewEncoder(&buf)
- if err := enc.Encode(keys); err != nil {
- return "", nil, err
- }
- b := buf.Bytes()
- // chop off the newline added by the json encoder
- data = b[:len(b)-1]
- return jsonArrayMimetype, data, nil
-}
-
-// copy/sanitize the URL for the Go CDK - it doesn't like params it can't parse
-func blobURL(u *url.URL) string {
- out := cloneURL(u)
- q := out.Query()
-
- for param := range q {
- switch u.Scheme {
- case "s3":
- switch param {
- case "region", "endpoint", "disableSSL", "s3ForcePathStyle":
- default:
- q.Del(param)
- }
- case "gs":
- switch param {
- case "access_id", "private_key_path":
- default:
- q.Del(param)
- }
- }
- }
-
- if u.Scheme == "s3" {
- // handle AWS_S3_ENDPOINT env var
- endpoint := env.Getenv("AWS_S3_ENDPOINT")
- if endpoint != "" {
- q.Set("endpoint", endpoint)
- }
- }
-
- out.RawQuery = q.Encode()
-
- return out.String()
-}