Skip to content

Commit 024801e

Browse files
Include code review feedback
* add missing region in SQS data * allow execution by time rather than quantity of requirements * add test for stresstestTimeout * fix remainingRequestCount negative termination check * add test execution to Makefile * update golang versions to 1.8 in Dockerfile and Godeps
1 parent 5c7009c commit 024801e

File tree

7 files changed

+78
-52
lines changed

7 files changed

+78
-52
lines changed

Dockerfile

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
FROM golang:1.5
1+
FROM golang:1.8
22

33
RUN apt-get update
44
RUN apt-get install -y zip
5-
ADD . /go/src/github.com/gophergala2016/goad
6-
WORKDIR /go/src/github.com/gophergala2016/goad
7-
RUN go get -u github.com/jteeuwen/go-bindata/...
8-
RUN make bindata
9-
RUN go build -o /go/bin/goad-api webapi/webapi.go
5+
ADD . /go/src/github.com/goadapp/goad
6+
WORKDIR /go/src/github.com/goadapp/goad
7+
RUN go get -u github.com/jteeuwen/go-bindata/...
8+
RUN make linux
9+
# RUN go build -o /go/bin/goad-api webapi/webapi.go
1010

1111
CMD ["/go/bin/goad-api", "-addr", ":8080"]
1212
EXPOSE 8080

Godeps/Godeps.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
all: osx linux windows
22

3+
test:
4+
go test ./...
5+
36
lambda:
47
GOOS=linux GOARCH=amd64 go build -o data/lambda/goad-lambda ./lambda
58
zip -jr data/lambda data/lambda
@@ -21,6 +24,7 @@ windows: bindata
2124

2225
clean:
2326
rm -rf data/lambda/goad-lambda
27+
rm -rf data/lambda.zip
2428
rm -rf build
2529

2630
all-zip: all

goad.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ var supportedRegions = []string{
5656
type Test struct {
5757
Config *TestConfig
5858
infra *infrastructure.Infrastructure
59+
lambdas int
5960
}
6061

6162
// NewTest returns a configured Test
@@ -85,12 +86,13 @@ func (t *Test) Start() <-chan queue.RegionsAggData {
8586
}
8687

8788
t.infra = infra
89+
t.lambdas = numberOfLambdas(t.Config.Concurrency, len(t.Config.Regions))
8890
t.invokeLambdas(awsConfig, infra.QueueURL())
8991

9092
results := make(chan queue.RegionsAggData)
9193

9294
go func() {
93-
for result := range queue.Aggregate(awsConfig, infra.QueueURL(), t.Config.TotalRequests) {
95+
for result := range queue.Aggregate(awsConfig, infra.QueueURL(), t.Config.TotalRequests, t.lambdas) {
9496
results <- result
9597
}
9698
close(results)
@@ -100,15 +102,13 @@ func (t *Test) Start() <-chan queue.RegionsAggData {
100102
}
101103

102104
func (t *Test) invokeLambdas(awsConfig *aws.Config, sqsURL string) {
103-
lambdas := numberOfLambdas(t.Config.Concurrency, len(t.Config.Regions))
104-
105-
for i := 0; i < lambdas; i++ {
105+
for i := 0; i < t.lambdas; i++ {
106106
region := t.Config.Regions[i%len(t.Config.Regions)]
107-
requests, requestsRemainder := divide(t.Config.TotalRequests, lambdas)
108-
concurrency, _ := divide(t.Config.Concurrency, lambdas)
107+
requests, requestsRemainder := divide(t.Config.TotalRequests, t.lambdas)
108+
concurrency, _ := divide(t.Config.Concurrency, t.lambdas)
109109
execTimeout := t.Config.ExecTimeout
110110

111-
if requestsRemainder > 0 && i == lambdas-1 {
111+
if requestsRemainder > 0 && i == t.lambdas-1 {
112112
requests += requestsRemainder
113113
}
114114

@@ -129,7 +129,7 @@ func (t *Test) invokeLambdas(awsConfig *aws.Config, sqsURL string) {
129129
"-t",
130130
fmt.Sprintf("%s", c.RequestTimeout.String()),
131131
"-f",
132-
fmt.Sprintf("%s", reportingFrequency(lambdas).String()),
132+
fmt.Sprintf("%s", reportingFrequency(t.lambdas).String()),
133133
"-r",
134134
fmt.Sprintf("%s", region),
135135
"-m",
@@ -196,8 +196,8 @@ func (c TestConfig) check() error {
196196
if (c.TotalRequests < 1 && c.ExecTimeout <= 0) || c.TotalRequests > 2000000 {
197197
return errors.New("Invalid total requests (use 1 - 2000000)")
198198
}
199-
if c.ExecTimeout > 300 {
200-
return errors.New("Invalid maximum execution time in seconds (use 0 - 300)")
199+
if c.ExecTimeout > 3600 {
200+
return errors.New("Invalid maximum execution time in seconds (use 0 - 3600)")
201201
}
202202
if c.RequestTimeout.Nanoseconds() < nano || c.RequestTimeout.Nanoseconds() > nano*100 {
203203
return errors.New("Invalid timeout (1s - 100s)")

lambda/lambda.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,13 +208,16 @@ func NewLambda(s LambdaSettings) *goadLambda {
208208
l := &goadLambda{}
209209
l.Settings = s
210210

211-
l.Metrics = NewRequestMetric()
211+
l.Metrics = NewRequestMetric(s.LambdaRegion)
212212
remainingRequestCount := s.MaxRequestCount - s.CompletedRequestCount
213+
if remainingRequestCount < 0 {
214+
remainingRequestCount = 0
215+
}
213216
l.setupHTTPClientForSelfsignedTLS()
214217
awsSqsConfig := l.setupAwsConfig()
215218
l.setupAwsSqsAdapter(awsSqsConfig)
216219
l.setupJobQueue(remainingRequestCount)
217-
l.results = make(chan requestResult, remainingRequestCount)
220+
l.results = make(chan requestResult)
218221
return l
219222
}
220223

@@ -225,8 +228,10 @@ func setDefaultConcurrencyCount(s *LambdaSettings) {
225228
}
226229

227230
func setLambdaExecTimeout(s *LambdaSettings) {
228-
if s.LambdaExecTimeoutSeconds <= 0 || s.LambdaExecTimeoutSeconds > AWS_MAX_TIMEOUT {
231+
if s.StresstestTimeout <= 0 || s.StresstestTimeout > AWS_MAX_TIMEOUT {
229232
s.LambdaExecTimeoutSeconds = AWS_MAX_TIMEOUT
233+
} else {
234+
s.LambdaExecTimeoutSeconds = s.StresstestTimeout
230235
}
231236
}
232237

@@ -281,9 +286,11 @@ func (l *goadLambda) spawnWorker() {
281286

282287
func work(l *goadLambda) {
283288
for {
284-
_, ok := <-l.jobs
285-
if !ok {
286-
break
289+
if l.Settings.MaxRequestCount > 0 {
290+
_, ok := <-l.jobs
291+
if !ok {
292+
break
293+
}
287294
}
288295
l.results <- fetch(l.HTTPClient, l.Settings.RequestParameters, l.StartTime)
289296
}
@@ -401,9 +408,9 @@ type resultSender interface {
401408
SendResult(queue.AggData)
402409
}
403410

404-
func NewRequestMetric() *requestMetric {
411+
func NewRequestMetric(region string) *requestMetric {
405412
metric := &requestMetric{
406-
aggregatedResults: &queue.AggData{},
413+
aggregatedResults: &queue.AggData{Region: region},
407414
}
408415
metric.resetAndKeepTotalReqs()
409416
return metric
@@ -472,6 +479,7 @@ func (m *requestMetric) resetAndKeepTotalReqs() {
472479
m.requestTimeTotal = 0
473480
m.timeToFirstTotal = 0
474481
m.aggregatedResults = &queue.AggData{
482+
Region: m.aggregatedResults.Region,
475483
Statuses: make(map[string]int),
476484
Fastest: math.MaxInt64,
477485
Finished: false,

lambda/lambda_test.go

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestMain(m *testing.M) {
3333
}
3434

3535
func TestRequestMetric(t *testing.T) {
36-
metric := NewRequestMetric()
36+
metric := NewRequestMetric("us-east-1")
3737
agg := metric.aggregatedResults
3838
if agg.TotalReqs != 0 {
3939
t.Error("totalRequestsFinished should be initialized with 0")
@@ -67,7 +67,7 @@ func TestRequestMetric(t *testing.T) {
6767
func TestAddRequestStatus(t *testing.T) {
6868
success := 200
6969
successStr := strconv.Itoa(success)
70-
metric := NewRequestMetric()
70+
metric := NewRequestMetric("us-east-1")
7171
result := &requestResult{
7272
Status: success,
7373
}
@@ -86,7 +86,7 @@ func TestAddRequest(t *testing.T) {
8686
elapsedFirst := int64(100)
8787
elapsedLast := int64(300)
8888

89-
metric := NewRequestMetric()
89+
metric := NewRequestMetric("us-east-1")
9090
result := &requestResult{
9191
Time: 400,
9292
ElapsedFirstByte: elapsedFirst,
@@ -193,7 +193,7 @@ func TestAddRequest(t *testing.T) {
193193
}
194194

195195
func TestResetAndKeepTotalReqs(t *testing.T) {
196-
metric := NewRequestMetric()
196+
metric := NewRequestMetric("us-east-1")
197197
agg := metric.aggregatedResults
198198
agg.TotalReqs = 7
199199
metric.firstRequestTime = 123
@@ -242,7 +242,7 @@ func TestMetricsAggregate(t *testing.T) {
242242
elapsedFirst := int64(100)
243243
elapsedLast := int64(300)
244244

245-
metric := NewRequestMetric()
245+
metric := NewRequestMetric("us-east-1")
246246
result := &requestResult{
247247
Time: 10000000,
248248
Elapsed: 10000000,
@@ -337,12 +337,11 @@ func TestQuitOnLambdaTimeout(t *testing.T) {
337337

338338
reportingFrequency := time.Duration(5) * time.Second
339339
settings := LambdaSettings{
340-
MaxRequestCount: 3,
341-
ConcurrencyCount: 1,
342-
ReportingFrequency: reportingFrequency,
343-
StresstestTimeout: 10,
344-
LambdaExecTimeoutSeconds: 1,
345-
LambdaRegion: "us-east-1",
340+
MaxRequestCount: 3,
341+
ConcurrencyCount: 1,
342+
ReportingFrequency: reportingFrequency,
343+
StresstestTimeout: 10,
344+
LambdaRegion: "us-east-1",
346345
}
347346
settings.RequestParameters.URL = urlStr
348347
sender := &TestResultSender{}
@@ -353,6 +352,7 @@ func TestQuitOnLambdaTimeout(t *testing.T) {
353352
function := &lambdaTestFunction{
354353
lambda: lambda,
355354
}
355+
lambda.Settings.LambdaExecTimeoutSeconds = 1
356356
RunOrFailAfterTimout(t, function, 1400)
357357
resLength := len(sender.sentResults)
358358
timeoutRemaining := lambda.Settings.StresstestTimeout
@@ -389,7 +389,7 @@ func TestMetricSendResults(t *testing.T) {
389389
ConnectionError: false,
390390
}
391391

392-
metric := NewRequestMetric()
392+
metric := NewRequestMetric("us-east-1")
393393
sender := &TestResultSender{}
394394

395395
metric.addRequest(result)
@@ -404,24 +404,35 @@ func TestRunLoadTestWithHighConcurrency(t *testing.T) {
404404
server := createAndStartTestServer()
405405
defer server.Stop()
406406

407-
runLoadTestWith(t, 500, 100, 500)
407+
runLoadTestWithDefaultExecTimeout(t, 500, 100, 500)
408408
}
409409

410410
func TestRunLoadTestWithOneRequest(t *testing.T) {
411411
server := createAndStartTestServer()
412412
defer server.Stop()
413413

414-
runLoadTestWith(t, 1, -1, 50)
414+
runLoadTestWithDefaultExecTimeout(t, 1, -1, 50)
415415
}
416416

417417
func TestRunLoadTestWithZeroRequests(t *testing.T) {
418418
server := createAndStartTestServer()
419419
defer server.Stop()
420420

421-
runLoadTestWith(t, 0, 1, 50)
421+
runLoadTestWithDefaultExecTimeout(t, 1, 1, 120)
422+
}
423+
424+
func TestRunWithExecTimeout(t *testing.T) {
425+
server := createAndStartTestServer()
426+
defer server.Stop()
427+
428+
runLoadTestWith(t, 0, 1, 1, 1050)
429+
}
430+
431+
func runLoadTestWithDefaultExecTimeout(t *testing.T, requestCount int, concurrency int, milliseconds int) {
432+
runLoadTestWith(t, requestCount, 0, concurrency, milliseconds)
422433
}
423434

424-
func runLoadTestWith(t *testing.T, requestCount int, concurrency int, milliseconds int) {
435+
func runLoadTestWith(t *testing.T, requestCount int, stresstestTimeout int, concurrency int, milliseconds int) {
425436
if testing.Short() {
426437
t.Skip("skipping test in short mode.")
427438
}
@@ -430,6 +441,7 @@ func runLoadTestWith(t *testing.T, requestCount int, concurrency int, millisecon
430441
MaxRequestCount: requestCount,
431442
ConcurrencyCount: concurrency,
432443
ReportingFrequency: reportingFrequency,
444+
StresstestTimeout: stresstestTimeout,
433445
}
434446
settings.RequestParameters.URL = urlStr
435447
sender := &TestResultSender{}
@@ -447,7 +459,7 @@ func runLoadTestWith(t *testing.T, requestCount int, concurrency int, millisecon
447459
if results.Finished != true {
448460
t.Error("the lambda should have finished it's results")
449461
}
450-
if results.TotalReqs != requestCount {
462+
if results.TotalReqs != requestCount && requestCount > 0 {
451463
t.Errorf("the lambda generated results for %d request, expected %d", results.TotalReqs, requestCount)
452464
}
453465
}

queue/aggregation.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,28 @@ type AggData struct {
2222
Region string `json:"region"`
2323
FatalError string `json:"fatal-error"`
2424
Finished bool `json:"finished"`
25+
FinishedLambdas int `json:"finished-lambdas"`
2526
}
2627

2728
// RegionsAggData type
2829
type RegionsAggData struct {
2930
Regions map[string]AggData
3031
TotalExpectedRequests uint
32+
lambdasByRegion int
3133
}
3234

3335
func (d *RegionsAggData) allRequestsReceived() bool {
3436
var requests uint
35-
var finishedCount int
37+
var finishedRegions int
3638

3739
for _, region := range d.Regions {
3840
requests += uint(region.TotalReqs)
39-
if region.Finished {
40-
finishedCount += 1
41+
if region.FinishedLambdas == d.lambdasByRegion {
42+
finishedRegions += 1
4143
}
4244
}
4345

44-
return requests == d.TotalExpectedRequests || finishedCount == len(d.Regions)
46+
return d.TotalExpectedRequests > 0 && requests == d.TotalExpectedRequests || finishedRegions == len(d.Regions)
4547
}
4648

4749
func addResult(data *AggData, result *AggData, isFinalSum bool) {
@@ -77,7 +79,7 @@ func addResult(data *AggData, result *AggData, isFinalSum bool) {
7779
data.Fastest = result.Fastest
7880
}
7981
if result.Finished {
80-
data.Finished = true
82+
data.FinishedLambdas += 1
8183
}
8284
}
8385

@@ -92,15 +94,15 @@ func SumRegionResults(regionData *RegionsAggData) *AggData {
9294
}
9395

9496
// Aggregate listens for results and sends totals, closing the channel when done
95-
func Aggregate(awsConfig *aws.Config, queueURL string, totalExpectedRequests uint) chan RegionsAggData {
97+
func Aggregate(awsConfig *aws.Config, queueURL string, totalExpectedRequests uint, lambdasByRegion int) chan RegionsAggData {
9698
results := make(chan RegionsAggData)
97-
go aggregate(results, awsConfig, queueURL, totalExpectedRequests)
99+
go aggregate(results, awsConfig, queueURL, totalExpectedRequests, lambdasByRegion)
98100
return results
99101
}
100102

101-
func aggregate(results chan RegionsAggData, awsConfig *aws.Config, queueURL string, totalExpectedRequests uint) {
103+
func aggregate(results chan RegionsAggData, awsConfig *aws.Config, queueURL string, totalExpectedRequests uint, lambdasByRegion int) {
102104
defer close(results)
103-
data := RegionsAggData{make(map[string]AggData), totalExpectedRequests}
105+
data := RegionsAggData{make(map[string]AggData), totalExpectedRequests, lambdasByRegion}
104106

105107
adaptor := NewSQSAdaptor(awsConfig, queueURL)
106108
timeoutStart := time.Now()

0 commit comments

Comments
 (0)