Skip to content

Commit b8099ba

Browse files
committed
issue#93: Sustaining request during specific seconds
1 parent 4fde68d commit b8099ba

File tree

4 files changed

+66
-23
lines changed

4 files changed

+66
-23
lines changed

cli/cli.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"flag"
66
"fmt"
77
"io/ioutil"
8+
"math"
89
"os"
910
"os/signal"
1011
"sort"
@@ -25,6 +26,7 @@ var (
2526
url string
2627
concurrency uint
2728
requests uint
29+
execTimeout uint
2830
timeout uint
2931
regions string
3032
method string
@@ -45,6 +47,7 @@ func main() {
4547
flag.StringVar(&body, "b", "", "HTTP request body")
4648
flag.UintVar(&concurrency, "c", 10, "number of concurrent requests")
4749
flag.UintVar(&requests, "n", 1000, "number of total requests to make")
50+
flag.UintVar(&execTimeout, "N", 0, "Maximum execution time in seconds")
4851
flag.UintVar(&timeout, "t", 15, "request timeout in seconds")
4952
flag.StringVar(&regions, "r", "us-east-1,eu-west-1,ap-northeast-1", "AWS regions to run in (comma separated, no spaces)")
5053
flag.StringVar(&awsProfile, "p", "", "AWS named profile to use")
@@ -67,6 +70,7 @@ func main() {
6770
URL: url,
6871
Concurrency: concurrency,
6972
TotalRequests: requests,
73+
ExecTimeout: execTimeout,
7074
RequestTimeout: time.Duration(timeout) * time.Second,
7175
Regions: strings.Split(regions, ","),
7276
Method: method,
@@ -120,6 +124,7 @@ func start(test *goad.Test, finalResult *queue.RegionsAggData, sigChan chan os.S
120124
}
121125
}()
122126

127+
startTime := time.Now()
123128
firstTime := true
124129
outer:
125130
for {
@@ -150,7 +155,12 @@ outer:
150155
}
151156

152157
y = 0
153-
percentDone := float64(totalReqs) / float64(result.TotalExpectedRequests)
158+
var percentDone float64
159+
if result.TotalExpectedRequests > 0 {
160+
percentDone = float64(totalReqs) / float64(result.TotalExpectedRequests)
161+
} else {
162+
percentDone = math.Min(float64(time.Since(startTime).Seconds()) / float64(test.Config.ExecTimeout), 1.0)
163+
}
154164
drawProgressBar(percentDone, y)
155165

156166
termbox.Flush()

goad.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type TestConfig struct {
2525
URL string
2626
Concurrency uint
2727
TotalRequests uint
28+
ExecTimeout uint
2829
RequestTimeout time.Duration
2930
Regions []string
3031
Method string
@@ -53,7 +54,7 @@ var supportedRegions = []string{
5354

5455
// Test type
5556
type Test struct {
56-
config *TestConfig
57+
Config *TestConfig
5758
infra *infrastructure.Infrastructure
5859
}
5960

@@ -63,22 +64,22 @@ func NewTest(config *TestConfig) (*Test, error) {
6364
if err != nil {
6465
return nil, err
6566
}
66-
return &Test{config: config, infra: nil}, nil
67+
return &Test{Config: config, infra: nil}, nil
6768
}
6869

6970
// Start a test
7071
func (t *Test) Start() <-chan queue.RegionsAggData {
71-
awsConfig := aws.NewConfig().WithRegion(t.config.Regions[0])
72+
awsConfig := aws.NewConfig().WithRegion(t.Config.Regions[0])
7273

73-
if t.config.AwsProfile != "" {
74-
creds := credentials.NewSharedCredentials("", t.config.AwsProfile)
74+
if t.Config.AwsProfile != "" {
75+
creds := credentials.NewSharedCredentials("", t.Config.AwsProfile)
7576
if _, err := creds.Get(); err != nil {
7677
log.Fatal(err)
7778
}
7879
awsConfig.WithCredentials(creds)
7980
}
8081

81-
infra, err := infrastructure.New(t.config.Regions, awsConfig)
82+
infra, err := infrastructure.New(t.Config.Regions, awsConfig)
8283
if err != nil {
8384
log.Fatal(err)
8485
}
@@ -89,7 +90,7 @@ func (t *Test) Start() <-chan queue.RegionsAggData {
8990
results := make(chan queue.RegionsAggData)
9091

9192
go func() {
92-
for result := range queue.Aggregate(awsConfig, infra.QueueURL(), t.config.TotalRequests) {
93+
for result := range queue.Aggregate(awsConfig, infra.QueueURL(), t.Config.TotalRequests) {
9394
results <- result
9495
}
9596
close(results)
@@ -99,25 +100,28 @@ func (t *Test) Start() <-chan queue.RegionsAggData {
99100
}
100101

101102
func (t *Test) invokeLambdas(awsConfig *aws.Config, sqsURL string) {
102-
lambdas := numberOfLambdas(t.config.Concurrency, len(t.config.Regions))
103+
lambdas := numberOfLambdas(t.Config.Concurrency, len(t.Config.Regions))
103104

104105
for i := 0; i < lambdas; i++ {
105-
region := t.config.Regions[i%len(t.config.Regions)]
106-
requests, requestsRemainder := divide(t.config.TotalRequests, lambdas)
107-
concurrency, _ := divide(t.config.Concurrency, lambdas)
106+
region := t.Config.Regions[i%len(t.Config.Regions)]
107+
requests, requestsRemainder := divide(t.Config.TotalRequests, lambdas)
108+
concurrency, _ := divide(t.Config.Concurrency, lambdas)
109+
execTimeout := t.Config.ExecTimeout
108110

109111
if requestsRemainder > 0 && i == lambdas-1 {
110112
requests += requestsRemainder
111113
}
112114

113-
c := t.config
115+
c := t.Config
114116
args := []string{
115117
"-u",
116118
fmt.Sprintf("%s", c.URL),
117119
"-c",
118120
fmt.Sprintf("%s", strconv.Itoa(int(concurrency))),
119121
"-n",
120122
fmt.Sprintf("%s", strconv.Itoa(int(requests))),
123+
"-N",
124+
fmt.Sprintf("%s", strconv.Itoa(int(execTimeout))),
121125
"-s",
122126
fmt.Sprintf("%s", sqsURL),
123127
"-q",
@@ -134,7 +138,7 @@ func (t *Test) invokeLambdas(awsConfig *aws.Config, sqsURL string) {
134138
fmt.Sprintf("%s", c.Body),
135139
}
136140

137-
for _, v := range t.config.Headers {
141+
for _, v := range t.Config.Headers {
138142
args = append(args, "-H", fmt.Sprintf("%s", v))
139143
}
140144

@@ -189,9 +193,12 @@ func (c TestConfig) check() error {
189193
if c.Concurrency < 1 || c.Concurrency > concurrencyLimit {
190194
return fmt.Errorf("Invalid concurrency (use 1 - %d)", concurrencyLimit)
191195
}
192-
if c.TotalRequests < 1 || c.TotalRequests > 2000000 {
196+
if (c.TotalRequests < 1 && c.ExecTimeout <= 0) || c.TotalRequests > 2000000 {
193197
return errors.New("Invalid total requests (use 1 - 2000000)")
194198
}
199+
if c.ExecTimeout > 300 {
200+
return errors.New("Invalid maximum execution time in seconds (use 0 - 300)")
201+
}
195202
if c.RequestTimeout.Nanoseconds() < nano || c.RequestTimeout.Nanoseconds() > nano*100 {
196203
return errors.New("Invalid timeout (1s - 100s)")
197204
}

lambda/lambda.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ import (
1919
"github.com/goadapp/goad/queue"
2020
)
2121

22+
const lambdaTimeout = 295
23+
2224
func main() {
2325

2426
var (
2527
address string
2628
sqsurl string
2729
concurrencycount int
2830
maxRequestCount int
31+
execTimeout int
2932
timeout string
3033
frequency string
3134
awsregion string
@@ -46,10 +49,15 @@ func main() {
4649

4750
flag.IntVar(&concurrencycount, "c", 10, "number of concurrent requests")
4851
flag.IntVar(&maxRequestCount, "n", 1000, "number of total requests to make")
52+
flag.IntVar(&execTimeout, "N", 0, "Maximum execution time in seconds")
4953

5054
flag.Var(&requestHeaders, "H", "List of headers")
5155
flag.Parse()
5256

57+
if execTimeout <= 0 || execTimeout > lambdaTimeout {
58+
execTimeout = lambdaTimeout
59+
}
60+
5361
clientTimeout, _ := time.ParseDuration(timeout)
5462
fmt.Printf("Using a timeout of %s\n", clientTimeout)
5563
reportingFrequency, _ := time.ParseDuration(frequency)
@@ -63,7 +71,7 @@ func main() {
6371
client.Timeout = clientTimeout
6472

6573
fmt.Printf("Will spawn %d workers making %d requests to %s\n", concurrencycount, maxRequestCount, address)
66-
runLoadTest(client, sqsurl, address, maxRequestCount, concurrencycount, awsregion, reportingFrequency, queueRegion, requestMethod, requestBody, requestHeaders)
74+
runLoadTest(client, sqsurl, address, maxRequestCount, execTimeout, concurrencycount, awsregion, reportingFrequency, queueRegion, requestMethod, requestBody, requestHeaders)
6775
}
6876

6977
type RequestResult struct {
@@ -80,7 +88,7 @@ type RequestResult struct {
8088
State string `json:"state"`
8189
}
8290

83-
func runLoadTest(client *http.Client, sqsurl string, url string, totalRequests int, concurrencycount int, awsregion string, reportingFrequency time.Duration, queueRegion string, requestMethod string, requestBody string, requestHeaders []string) {
91+
func runLoadTest(client *http.Client, sqsurl string, url string, totalRequests int, execTimeout int, concurrencycount int, awsregion string, reportingFrequency time.Duration, queueRegion string, requestMethod string, requestBody string, requestHeaders []string) {
8492
awsConfig := aws.NewConfig().WithRegion(queueRegion)
8593
sqsAdaptor := queue.NewSQSAdaptor(awsConfig, sqsurl)
8694
//sqsAdaptor := queue.NewDummyAdaptor(sqsurl)
@@ -102,10 +110,10 @@ func runLoadTest(client *http.Client, sqsurl string, url string, totalRequests i
102110
fmt.Println(" done.\nWaiting for results…")
103111

104112
ticker := time.NewTicker(reportingFrequency)
105-
quit := make(chan struct{})
113+
quit := time.NewTimer(time.Duration(execTimeout) * time.Second)
106114
quitting := false
107115

108-
for requestsSoFar < totalRequests && !quitting {
116+
for (totalRequests == 0 || requestsSoFar < totalRequests) && !quitting {
109117
i := 0
110118

111119
var timeToFirstTotal int64
@@ -120,7 +128,7 @@ func runLoadTest(client *http.Client, sqsurl string, url string, totalRequests i
120128
var totalConnectionError int
121129

122130
resetStats := false
123-
for requestsSoFar < totalRequests && !quitting && !resetStats {
131+
for (totalRequests == 0 || requestsSoFar < totalRequests) && !quitting && !resetStats {
124132
select {
125133
case r := <-ch:
126134
i++
@@ -164,12 +172,14 @@ func runLoadTest(client *http.Client, sqsurl string, url string, totalRequests i
164172
statuses[statusStr]++
165173
}
166174
requestTimeTotal += r.Elapsed
175+
167176
case <-ticker.C:
168177
if i == 0 {
169178
continue
170179
}
171180
resetStats = true
172-
case <-quit:
181+
182+
case <-quit.C:
173183
ticker.Stop()
174184
quitting = true
175185
}
@@ -198,6 +208,7 @@ func runLoadTest(client *http.Client, sqsurl string, url string, totalRequests i
198208
avgRequestTime = 0
199209
}
200210

211+
finished := (totalRequests > 0 && requestsSoFar == totalRequests) || quitting
201212
fatalError := ""
202213
if (totalTimedOut + totalConnectionError) > i/2 {
203214
fatalError = "Over 50% of requests failed, aborting"
@@ -216,6 +227,7 @@ func runLoadTest(client *http.Client, sqsurl string, url string, totalRequests i
216227
fastest,
217228
awsregion,
218229
fatalError,
230+
finished,
219231
}
220232
sqsAdaptor.SendResult(aggData)
221233
}
@@ -225,7 +237,13 @@ func runLoadTest(client *http.Client, sqsurl string, url string, totalRequests i
225237

226238
func fetch(loadTestStartTime time.Time, client *http.Client, address string, requestcount int, jobs <-chan struct{}, ch chan RequestResult, wg *sync.WaitGroup, awsregion string, requestMethod string, requestBody string, requestHeaders []string) {
227239
defer wg.Done()
228-
for _ = range jobs {
240+
for {
241+
if requestcount > 0 {
242+
_, ok := <- jobs
243+
if !ok {
244+
break
245+
}
246+
}
229247
start := time.Now()
230248
req, err := http.NewRequest(requestMethod, address, bytes.NewBufferString(requestBody))
231249
if err != nil {

queue/aggregation.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type AggData struct {
2121
Fastest int64 `json:"fastest"`
2222
Region string `json:"region"`
2323
FatalError string `json:"fatal-error"`
24+
Finished bool `json:"finished"`
2425
}
2526

2627
// RegionsAggData type
@@ -31,12 +32,16 @@ type RegionsAggData struct {
3132

3233
func (d *RegionsAggData) allRequestsReceived() bool {
3334
var requests uint
35+
var finishedCount int
3436

3537
for _, region := range d.Regions {
3638
requests += uint(region.TotalReqs)
39+
if region.Finished {
40+
finishedCount += 1
41+
}
3742
}
3843

39-
return requests == d.TotalExpectedRequests
44+
return requests == d.TotalExpectedRequests || finishedCount == len(d.Regions)
4045
}
4146

4247
func addResult(data *AggData, result *AggData, isFinalSum bool) {
@@ -71,6 +76,9 @@ func addResult(data *AggData, result *AggData, isFinalSum bool) {
7176
if result.Fastest > 0 && (data.Fastest == 0 || result.Fastest < data.Fastest) {
7277
data.Fastest = result.Fastest
7378
}
79+
if result.Finished {
80+
data.Finished = true
81+
}
7482
}
7583

7684
// SumRegionResults adds all the results together

0 commit comments

Comments
 (0)