Skip to content

Commit e3d803f

Browse files
Implement batch mode read on SQS
This improves the read/sync speed of the cli by a factor of 10.
1 parent e30335b commit e3d803f

File tree

3 files changed

+30
-20
lines changed

3 files changed

+30
-20
lines changed

infrastructure/aws/aws.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,13 @@ func (infra *AwsInfrastructure) Receive(results chan *result.LambdaResults) {
6262

6363
timeoutStart := time.Now()
6464
for {
65-
lambdaResult := adaptor.Receive()
66-
if lambdaResult != nil {
67-
lambdaAggregate := &data.Lambdas[lambdaResult.RunnerID]
68-
result.AddResult(lambdaAggregate, lambdaResult)
69-
results <- data
65+
lambdaResults := adaptor.Receive()
66+
if lambdaResults != nil {
67+
for _, lambdaResult := range lambdaResults {
68+
lambdaAggregate := &data.Lambdas[lambdaResult.RunnerID]
69+
result.AddResult(lambdaAggregate, lambdaResult)
70+
results <- data
71+
}
7072
if data.AllLambdasFinished() {
7173
break
7274
}

infrastructure/aws/sqsadapter/sqsadapter.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ func getClient(awsConfig *aws.Config) *sqs.SQS {
3838
}
3939

4040
// Receive a result, or timeout in 1 second
41-
func (adaptor Adapter) Receive() *api.RunnerResult {
41+
func (adaptor Adapter) Receive() []*api.RunnerResult {
4242
params := &sqs.ReceiveMessageInput{
4343
QueueUrl: aws.String(adaptor.QueueURL),
44-
MaxNumberOfMessages: aws.Int64(1),
44+
MaxNumberOfMessages: aws.Int64(10),
4545
VisibilityTimeout: aws.Int64(1),
4646
WaitTimeSeconds: aws.Int64(1),
4747
}
@@ -56,25 +56,34 @@ func (adaptor Adapter) Receive() *api.RunnerResult {
5656
return nil
5757
}
5858

59-
item := resp.Messages[0]
59+
items := resp.Messages
60+
results := make([]*api.RunnerResult, 0)
61+
deleteEntries := make([]*sqs.DeleteMessageBatchRequestEntry, 0)
62+
for _, item := range items {
63+
result, jsonerr := resultFromJSON(*item.Body)
64+
if jsonerr != nil {
65+
fmt.Println(err.Error())
66+
return nil
67+
}
68+
deleteEntries = append(deleteEntries, &sqs.DeleteMessageBatchRequestEntry{
69+
Id: aws.String(*item.MessageId),
70+
ReceiptHandle: aws.String(*item.ReceiptHandle),
71+
})
72+
results = append(results, result)
73+
}
6074

61-
deleteParams := &sqs.DeleteMessageInput{
62-
QueueUrl: aws.String(adaptor.QueueURL),
63-
ReceiptHandle: aws.String(*item.ReceiptHandle),
75+
deleteParams := &sqs.DeleteMessageBatchInput{
76+
Entries: deleteEntries,
77+
QueueUrl: aws.String(adaptor.QueueURL),
6478
}
65-
_, delerr := adaptor.Client.DeleteMessage(deleteParams)
79+
_, delerr := adaptor.Client.DeleteMessageBatch(deleteParams)
6680

6781
if delerr != nil {
68-
fmt.Println(err.Error())
82+
fmt.Println(delerr.Error())
6983
return nil
7084
}
7185

72-
result, jsonerr := resultFromJSON(*item.Body)
73-
if jsonerr != nil {
74-
fmt.Println(err.Error())
75-
return nil
76-
}
77-
return result
86+
return results
7887
}
7988

8089
func resultFromJSON(str string) (*api.RunnerResult, error) {

infrastructure/infrastructure.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ func InvokeLambdas(inf Infrastructure) {
6767
}
6868

6969
func Aggregate(i Infrastructure) chan *result.LambdaResults {
70-
fmt.Println("AGGREGATE")
7170
results := make(chan *result.LambdaResults)
7271
go i.Receive(results)
7372
return results

0 commit comments

Comments
 (0)