summaryrefslogtreecommitdiff
path: root/internal/tests/integration/datasources_blob_test.go
blob: 9cfcfd2904ccae1a44dc56e62ceed07ab732b285 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package integration

import (
	"bytes"
	"net/http/httptest"
	"os"
	"testing"

	"github.com/johannesboyne/gofakes3"
	"github.com/johannesboyne/gofakes3/backend/s3mem"
	"github.com/stretchr/testify/require"
)

func setupDatasourcesBlobTest(t *testing.T) *httptest.Server {
	backend := s3mem.New()
	s3 := gofakes3.New(backend)

	srv := httptest.NewServer(s3.Server())
	t.Cleanup(srv.Close)

	err := backend.CreateBucket("mybucket")
	require.NoError(t, err)
	contents := `{"value":"json", "name":"foo"}`
	_, err = backend.PutObject("mybucket", "foo.json", map[string]string{"Content-Type": "application/json"}, bytes.NewBufferString(contents), int64(len(contents)))
	require.NoError(t, err)

	contents = `{"value":"json", "name":"bar"}`
	_, err = backend.PutObject("mybucket", "bar.json", map[string]string{"Content-Type": "application/json"}, bytes.NewBufferString(contents), int64(len(contents)))
	require.NoError(t, err)

	contents = `hello world`
	_, err = backend.PutObject("mybucket", "a/b/c/hello.txt", map[string]string{"Content-Type": "text/plain"}, bytes.NewBufferString(contents), int64(len(contents)))
	require.NoError(t, err)

	contents = `goodbye world`
	_, err = backend.PutObject("mybucket", "a/b/c/goodbye.txt", map[string]string{"Content-Type": "text/plain"}, bytes.NewBufferString(contents), int64(len(contents)))
	require.NoError(t, err)

	contents = "a: foo\nb: bar\nc:\n  cc: yay for yaml\n"
	_, err = backend.PutObject("mybucket", "a/b/c/d", map[string]string{"Content-Type": "application/yaml"}, bytes.NewBufferString(contents), int64(len(contents)))
	require.NoError(t, err)

	return srv
}

func TestDatasources_Blob_S3Datasource(t *testing.T) {
	o, e, err := cmd(t,
		"-c", "data=s3://noaa-bathymetry-pds/csv/2022/03/02/20220302_056e577c7cd8323fdd8a04d3812cf78e_pointData.csv?region=us-east-1&type=text/csv",
		"-i", `{{ index (index .data 0) 6 }}: {{ index (index .data 1) 6 }}
{{ index (index .data 0) 5 }}: {{ index (index .data 1) 5 }}`).
		withEnv("AWS_ANON", "true").
		withEnv("AWS_TIMEOUT", "5000").
		run()
	assertSuccess(t, o, e, err, `PLATFORM_NAME: Ramform Hyperion
TIME: 2022-03-01T22:00:04.000Z`)

	srv := setupDatasourcesBlobTest(t)

	o, e, err = cmd(t,
		"-c", "data=s3://mybucket/foo.json?"+
			"region=us-east-1&"+
			"disableSSL=true&"+
			"endpoint="+srv.Listener.Addr().String()+"&"+
			"s3ForcePathStyle=true",
		"-i", "{{ .data.value }}").
		withEnv("AWS_ACCESS_KEY_ID", "YOUR-ACCESSKEYID").
		withEnv("AWS_SECRET_ACCESS_KEY", "YOUR-SECRETACCESSKEY").
		run()
	assertSuccess(t, o, e, err, "json")

	o, e, err = cmd(t,
		"-c", "data=s3://mybucket/foo.json?"+
			"region=us-east-1&"+
			"disableSSL=true&"+
			"s3ForcePathStyle=true",
		"-i", "{{ .data.value }}").
		withEnv("AWS_ANON", "true").
		withEnv("AWS_S3_ENDPOINT", srv.Listener.Addr().String()).
		run()
	assertSuccess(t, o, e, err, "json")
}

func TestDatasources_Blob_S3Directory(t *testing.T) {
	// This recently replaced ryft-public-sample-data after access was disabled.
	// This bucket came from https://registry.opendata.aws, and is public. The
	// content isn't important, just that it's something we can read and parse
	// on a _real_ S3 bucket.
	o, e, err := cmd(t, "-c", "data=s3://noaa-bathymetry-pds/csv/2022/03/02/?region=us-east-1",
		"-i", "{{ index .data 0 }}").
		withEnv("AWS_ANON", "true").
		withEnv("AWS_TIMEOUT", "15000").
		run()
	assertSuccess(t, o, e, err, "20220302_056e577c7cd8323fdd8a04d3812cf78e_pointData.csv")

	srv := setupDatasourcesBlobTest(t)

	o, e, err = cmd(t, "-c", "data=s3://mybucket/a/b/c/?"+
		"region=us-east-1&"+
		"disableSSL=true&"+
		"endpoint="+srv.Listener.Addr().String()+"&"+
		"s3ForcePathStyle=true",
		"-i", "{{ .data }}").
		withEnv("AWS_ACCESS_KEY_ID", "YOUR-ACCESSKEYID").
		withEnv("AWS_SECRET_ACCESS_KEY", "YOUR-SECRETACCESSKEY").
		run()
	assertSuccess(t, o, e, err, "[d goodbye.txt hello.txt]")
}

func TestDatasources_Blob_S3MIMETypes(t *testing.T) {
	srv := setupDatasourcesBlobTest(t)
	o, e, err := cmd(t, "-c", "data=s3://mybucket/a/b/c/d?"+
		"region=us-east-1&"+
		"disableSSL=true&"+
		"endpoint="+srv.Listener.Addr().String()+"&"+
		"s3ForcePathStyle=true",
		"-i", "{{ .data.c.cc }}").
		withEnv("AWS_ANON", "true").run()
	assertSuccess(t, o, e, err, "yay for yaml")
}

func TestDatasources_Blob_GCSDatasource(t *testing.T) {
	// this only works if we're authed with GCS
	if os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") == "" {
		t.Skip("Not configured to authenticate with Google Cloud - skipping")
		return
	}

	o, e, err := cmd(t, "-c", "data=gs://gcp-public-data-landsat/LT08/01/015/013/LT08_L1GT_015013_20130315_20170310_01_T2/LT08_L1GT_015013_20130315_20170310_01_T2_MTL.txt?type=text/plain",
		"-i", "{{ len .data }}").
		withEnv("GOOGLE_APPLICATION_CREDENTIALS",
			os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")).run()
	assertSuccess(t, o, e, err, "3672")
}

func TestDatasources_Blob_GCSDirectory(t *testing.T) {
	// this only works if we're likely to be authed with GCS
	if os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") == "" {
		t.Skip("Not configured to authenticate with Google Cloud - skipping")
		return
	}

	o, e, err := cmd(t, "-c", "data=gs://gcp-public-data-landsat/",
		"-i", "{{ coll.Has .data `index.csv.gz` }}").
		withEnv("GOOGLE_APPLICATION_CREDENTIALS",
			os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")).run()
	assertSuccess(t, o, e, err, "true")
}