11package goad
22
33import (
4- "bytes"
5- "context"
6- "encoding/json"
74 "errors"
85 "fmt"
9- "io"
106 "math"
11- "os"
127 "strconv"
138 "strings"
149 "time"
1510
1611 "github.com/aws/aws-sdk-go/aws"
17- "github.com/aws/aws-sdk-go/aws/session"
18- "github.com/aws/aws-sdk-go/service/lambda"
19- "github.com/docker/docker/api/types"
20- "github.com/docker/docker/api/types/container"
21- "github.com/docker/docker/api/types/network"
22- "github.com/docker/docker/client"
2312 "github.com/goadapp/goad/infrastructure"
13+ "github.com/goadapp/goad/infrastructure/aws"
14+ "github.com/goadapp/goad/infrastructure/docker"
2415 "github.com/goadapp/goad/queue"
25- "github.com/goadapp/goad/version"
2616)
2717
2818// TestConfig type
@@ -41,11 +31,6 @@ type TestConfig struct {
4131 RunDocker bool
4232}
4333
44- type invokeArgs struct {
45- File string `json:"file"`
46- Args []string `json:"args"`
47- }
48-
4934const nano = 1000000000
5035
5136var supportedRegions = []string {
@@ -80,9 +65,9 @@ func (t *Test) Start() (<-chan queue.RegionsAggData, func()) {
8065 awsConfig := aws .NewConfig ().WithRegion (t .Config .Regions [0 ])
8166
8267 if t .Config .RunDocker {
83- t .infra = infrastructure .NewDockerInfrastructure ()
68+ t .infra = dockerinfra .NewDockerInfrastructure ()
8469 } else {
85- t .infra = infrastructure .New (t .Config .Regions , awsConfig )
70+ t .infra = awsinfra .New (t .Config .Regions , awsConfig )
8671 }
8772 teardown , err := t .infra .Setup ()
8873 handleErr (err )
@@ -130,18 +115,12 @@ func (t *Test) invokeLambdas(awsConfig *aws.Config, queueURL string) {
130115 }
131116 args = append (args , fmt .Sprintf ("%s" , c .URL ))
132117
133- invokeargs := invokeArgs {
118+ invokeargs := infrastructure. InvokeArgs {
134119 File : "./goad-lambda" ,
135120 Args : args ,
136121 }
137122
138- config := aws .NewConfig ().WithRegion (region )
139-
140- if t .Config .RunDocker {
141- go runAsDockerContainer (queueURL , invokeargs )
142- } else {
143- go invokeLambda (config , invokeargs )
144- }
123+ go t .infra .Run (invokeargs )
145124 }
146125}
147126
@@ -151,75 +130,6 @@ func handleErr(err error) {
151130 }
152131}
153132
154- func DockerPullLambdaImage () {
155- DockerPullImage ("lambci/lambda" )
156- }
157-
158- func DockerPullRabbitMQImage () {
159- DockerPullImage ("rabbitmq:3" )
160- }
161-
162- func DockerPullImage (imageName string ) {
163- ctx := context .Background ()
164- cli , err := client .NewEnvClient ()
165- handleErr (err )
166-
167- // Pull the image from dockerhub.
168- out , err := cli .ImagePull (ctx , imageName , types.ImagePullOptions {})
169- handleErr (err )
170- defer out .Close ()
171- io .Copy (os .Stdout , out )
172- }
173-
174- func runAsDockerContainer (queueURL string , args invokeArgs ) {
175- ctx := context .Background ()
176- cli , err := client .NewEnvClient ()
177- handleErr (err )
178- rabbitmqURL := fmt .Sprintf ("RABBITMQ=%s" , queueURL )
179-
180- // Create container to execute lambda
181- resp , err := cli .ContainerCreate (ctx , & container.Config {
182- Image : "lambci/lambda" ,
183- Cmd : append ([]string {"index.handler" }, ToJSONString (args )),
184- Volumes : map [string ]struct {}{
185- "/var/task" : struct {}{},
186- },
187- Env : []string {rabbitmqURL },
188- }, & container.HostConfig {
189- AutoRemove : true ,
190- Binds : []string {os .ExpandEnv ("${PWD}/data/lambda:/var/task:ro" )},
191- }, & network.NetworkingConfig {
192- EndpointsConfig : map [string ]* network.EndpointSettings {
193- "goad-bridge" : & network.EndpointSettings {},
194- },
195- }, "" )
196- handleErr (err )
197-
198- // run container
199- err = cli .ContainerStart (ctx , resp .ID , types.ContainerStartOptions {})
200- handleErr (err )
201- }
202-
203- func ToJSONString (args interface {}) string {
204- b , err := json .Marshal (args )
205- handleErr (err )
206- return string (b [:])
207- }
208- func toJSONReadSeeker (args interface {}) io.ReadSeeker {
209- j , err := json .Marshal (args )
210- handleErr (err )
211- return bytes .NewReader (j )
212- }
213-
214- func invokeLambda (awsConfig * aws.Config , args invokeArgs ) {
215- svc := lambda .New (session .New (), awsConfig )
216-
217- svc .InvokeAsync (& lambda.InvokeAsyncInput {
218- FunctionName : aws .String ("goad:" + version .LambdaVersion ()),
219- InvokeArgs : toJSONReadSeeker (args ),
220- })
221- }
222-
223133func numberOfLambdas (concurrency int , numRegions int ) int {
224134 if numRegions > int (concurrency ) {
225135 return int (concurrency )
0 commit comments