Skip to content
This repository was archived by the owner on Dec 25, 2022. It is now read-only.

Commit 188aa11

Browse files
author
Michael Bahr
committed
speed improvements
1 parent b83a0b2 commit 188aa11

File tree

6 files changed

+86
-56
lines changed

6 files changed

+86
-56
lines changed

consumer.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313

1414
def publish_to_failure_topic(event, reason):
15+
# todo: prepare against failure of publish sns
1516
print('Event failed: %s' % event)
1617
if 'failure_topic' in event:
1718
payload = {
@@ -22,19 +23,23 @@ def publish_to_failure_topic(event, reason):
2223

2324

2425
def handle(events):
26+
received = datetime.utcnow()
2527
to_be_scheduled = []
2628
event_wrappers = []
2729
for event in events:
2830
print(event)
2931

3032
if 'date' not in event:
3133
publish_to_failure_topic(event, 'date is required')
34+
print('error.date_required %s' % (json.dumps({'event': event})))
3235
continue
3336
if 'payload' not in event:
3437
publish_to_failure_topic(event, 'payload is required')
38+
print('error.payload_required %s' % (json.dumps({'event': event})))
3539
continue
3640
if 'target' not in event:
3741
publish_to_failure_topic(event, 'target is required')
42+
print('error.target_required %s' % (json.dumps({'event': event})))
3843
continue
3944

4045
event_wrapper = EventWrapper()
@@ -43,14 +48,16 @@ def handle(events):
4348

4449
if not isinstance(event['payload'], str):
4550
publish_to_failure_topic(event, 'payload must be a string')
51+
print('error.payload_is_not_string %s' % (json.dumps({'event': event})))
4652
continue
4753

4854
event_wrapper.payload = event['payload']
4955
event_wrapper.target = event['target']
5056

5157
if 'user' not in event:
52-
if 'true' == os.environ.get('ENFORCE_USER'):
58+
if os.environ.get('ENFORCE_USER'):
5359
publish_to_failure_topic(event, 'user is required')
60+
print('error.event_has_no_user %s' % (json.dumps({'event': event})))
5461
continue
5562
else:
5663
event_wrapper.user = event['user']
@@ -59,6 +66,7 @@ def handle(events):
5966
if has_less_then_ten_minutes(event_wrapper.date):
6067
to_be_scheduled.append(event_wrapper.id)
6168

69+
print('event.consumed %s' % (json.dumps({'id': event_wrapper.id, 'timestamp': str(received), 'user': event_wrapper.user})))
6270
event_wrappers.append(event_wrapper)
6371

6472
# we must save before delegating, because the downstream function will access the DB entity

emitter.py

Lines changed: 21 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,41 @@
11
import json
2+
import time
3+
from datetime import datetime
24

3-
from db_helper import delete_with_retry, save_with_retry
5+
from db_helper import save_with_retry
46
from model import EventWrapper
5-
67
from sns_client import publish_sns
78

89

910
def handle(items):
10-
# todo: remove the message from the queue?
11-
processed_ids = []
1211
failed_ids = []
1312
print(f'Processing {len(items)} records')
13+
14+
# sort the items so that we process the earliest first
15+
items.sort(key=lambda x: x['date'])
16+
1417
for item in items:
1518
event_id = item['id']
16-
try:
17-
if EventWrapper.count(hash_key=event_id) == 0:
18-
# if the event was already deleted from the database, then don't send it again
19-
continue
20-
except Exception as e:
21-
print(e)
22-
# if we can't determine if the event was already processed, then we'll send it
23-
# to make sure we have at least one delivery
19+
20+
# the event we received may have been scheduled early
21+
scheduled_execution = datetime.fromisoformat(item['date'])
22+
23+
delay = (scheduled_execution - datetime.utcnow()).total_seconds()
24+
# remove another 10ms as there will be a short delay between the emitter, the target sns and its consumer
25+
delay -= 0.01
26+
# if there is a positive delay then wait until it's time
27+
if delay > 0:
28+
time.sleep(delay)
2429

2530
try:
2631
publish_sns(item['target'], item['payload'])
27-
processed_ids.append(event_id)
32+
print('event.emitted %s' % (json.dumps({'id': event_id, 'timestamp': str(datetime.utcnow()), 'scheduled': str(scheduled_execution)})))
2833
except Exception as e:
29-
print(e)
34+
print(str(e))
3035
failed_ids.append(event_id)
3136

32-
to_delete = []
33-
for event_id in processed_ids:
34-
try:
35-
to_delete.append(EventWrapper.get(hash_key=event_id))
36-
except Exception as e:
37-
print(f'Skipped {event_id} because it doesn\'t exist anymore')
38-
print(e)
39-
40-
delete_with_retry(to_delete)
41-
4237
failed_items = []
4338
for event_id in failed_ids:
44-
# todo: maybe also emit this to an error topic?
4539
try:
4640
event = EventWrapper.get(hash_key=event_id)
4741
event.status = 'FAILED'
@@ -53,15 +47,9 @@ def handle(items):
5347
'error': 'ERROR',
5448
'event': event.payload
5549
}
56-
# todo: let the publish_sns/sqs methods do the json dumping themselves if they encounter a non-string
57-
# i believe json.dumps("test") results in "test", if that's correct then we can always apply json.dumps()
5850
publish_sns(event.failure_topic, json.dumps(payload))
5951
except Exception as e:
60-
print(f'Skipped {event_id} because it doesn\'t exist anymore')
61-
print(e)
52+
print(f'Failure update: Skipped {event_id} because it doesn\'t exist anymore')
53+
print(str(e))
6254

6355
save_with_retry(failed_items)
64-
65-
66-
if __name__ == '__main__':
67-
print(json.dumps("123"))

event_loader.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,13 @@ def run():
4747
print('Batched %d entries' % count)
4848

4949

50-
def load_data(until, last_evaluated_key, limit=5000):
50+
def load_data(until, last_evaluated_key, limit=5000, status='NEW'):
5151
if last_evaluated_key is None:
5252
response = table.query(
5353
IndexName=os.environ.get('INDEX_NAME'),
5454
KeyConditionExpression='#status = :st and #date < :until',
5555
ExpressionAttributeNames={"#status": "status", "#date": "date"},
56-
ExpressionAttributeValues={":until": until, ':st': 'NEW'},
56+
ExpressionAttributeValues={":until": until, ':st': status},
5757
Limit=limit,
5858
ProjectionExpression='id'
5959
)
@@ -62,7 +62,7 @@ def load_data(until, last_evaluated_key, limit=5000):
6262
IndexName=os.environ.get('INDEX_NAME'),
6363
KeyConditionExpression='#status = :st and #date < :until',
6464
ExpressionAttributeNames={"#status": "status", "#date": "date"},
65-
ExpressionAttributeValues={":until": until, ':st': 'NEW'},
65+
ExpressionAttributeValues={":until": until, ':st': status},
6666
Limit=limit,
6767
ProjectionExpression='id',
6868
ExclusiveStartKey=last_evaluated_key

scheduler.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313

1414
def handle(events):
15-
now = datetime.utcnow()
1615
successful_ids = []
1716
failed_ids = []
1817

@@ -30,18 +29,25 @@ def handle(events):
3029

3130
events_by_id[event_id] = item
3231

33-
delta = datetime.fromisoformat(item.date) - now
32+
delta = datetime.fromisoformat(item.date) - datetime.utcnow()
3433
delay = delta.total_seconds()
3534
rounded_delay = math.ceil(delay)
3635
if rounded_delay < 0:
3736
rounded_delay = 0
3837

38+
# schedule the event a second earlier to help with delays in sqs/lambda cold start
39+
# the emitter will wait accordingly
40+
rounded_delay -= 1
41+
42+
print(f'ID {event_id} is supposed to emit in {rounded_delay}s which is {delay - rounded_delay}s before target.')
43+
3944
to_be_scheduled.append({
4045
'Id': event_id,
4146
'MessageBody': json.dumps({
4247
'payload': item.payload,
4348
'target': item.target,
44-
'id': item.id
49+
'id': item.id,
50+
'date': item.date
4551
}),
4652
'DelaySeconds': rounded_delay
4753
})
@@ -92,11 +98,11 @@ def send_to_sqs(to_be_scheduled):
9298
for element in response['Successful']:
9399
successful_ids.append(element['Id'])
94100
if 'Failed' in response:
95-
print(response['Failed'])
101+
print(f'ERROR: Failed to process entry: {response["Failed"]}')
96102
for element in response['Failed']:
97103
failed_ids.append(element['Id'])
98104

99105
except Exception as e:
100-
print(e)
106+
print(str(e))
101107
failed_ids = to_be_scheduled
102108
return successful_ids, failed_ids

serverless.yml

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,35 @@ provider:
55
runtime: python3.7
66
stage: ${opt:stage, 'dev'}
77
region: ${opt:region, 'us-east-1'}
8+
environment:
9+
STAGE: "${self:provider.stage}"
10+
tags:
11+
name: 'aws-scheduler'
812
iamRoleStatements:
913
- Effect: Allow
1014
Action:
15+
- dynamodb:DescribeTable
16+
- dynamodb:DeleteItem
17+
- dynamodb:PutItem
1118
- dynamodb:DescribeTable
1219
- dynamodb:Query
1320
- dynamodb:GetItem
1421
- dynamodb:GetShardIterator
15-
- dynamodb:PutItem
16-
- dynamodb:DeleteItem
1722
Resource:
18-
- "${self:custom.table.arn}"
1923
- "${self:custom.table.index.arn}"
20-
- Effect: Allow
21-
Action:
22-
- SQS:SendMessage
23-
Resource: "${self:custom.queue.arn}"
24+
- "${self:custom.table.arn}"
2425
- Effect: Allow
2526
Action:
2627
- lambda:InvokeFunction
2728
Resource: { "Fn::Join": [":", ["arn:aws:lambda:${self:provider.region}", { "Ref": "AWS::AccountId" }, "function", "${self:custom.scheduleFunction}" ] ] }
28-
environment:
29-
STAGE: "${self:provider.stage}"
29+
- Effect: Allow
30+
Action:
31+
- SQS:SendMessage
32+
Resource: "${self:custom.queue.arn}"
3033

3134
custom:
35+
lumigo:
36+
token: t_e74a5589b5524099bdc04
3237
inbound:
3338
name: "scheduler-input-${self:provider.stage}"
3439
arn: { "Fn::Join": [":", ["arn:aws:sns:${self:provider.region}", { "Ref": "AWS::AccountId" }, "${self:custom.inbound.name}" ] ] }
@@ -60,20 +65,29 @@ functions:
6065
topicName: "${self:custom.inbound.name}"
6166
environment:
6267
SCHEDULE_FUNCTION: "${self:custom.scheduleFunction}"
63-
ENFORCE_USER: false
68+
ENFORCE_USER: true
69+
tags:
70+
name: 'aws-scheduler-consumer'
71+
6472
eventLoader:
6573
handler: handler.event_loader
6674
events:
6775
- schedule: rate(1 minute)
6876
environment:
6977
SCHEDULE_FUNCTION: "${self:custom.scheduleFunction}"
7078
INDEX_NAME: "${self:custom.table.index.name}"
79+
tags:
80+
name: 'aws-scheduler-event-loader'
81+
7182
scheduler:
7283
handler: handler.scheduler
7384
environment:
7485
QUEUE_URL: "${self:custom.queue.url}"
7586
# when we have to wait for DynamoDB autoscaling we may exceed the default of 6s
7687
timeout: 30
88+
tags:
89+
name: 'aws-scheduler-scheduler'
90+
7791
emitter:
7892
handler: handler.emitter
7993
events:
@@ -87,6 +101,16 @@ functions:
87101
- Ref: AWS::Region
88102
- Ref: AWS::AccountId
89103
- "${self:custom.queue.name}"
104+
timeout: 20
105+
tags:
106+
name: 'aws-scheduler-emitter'
107+
cleanupFailed:
108+
handler: cleanup_failed.handle
109+
timeout: 300
110+
tags:
111+
name: 'aws-scheduler-cleanup'
112+
environment:
113+
INDEX_NAME: "${self:custom.table.index.name}"
90114

91115
plugins:
92116
- serverless-python-requirements

setup/init_table.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import sys
2+
import time
23

34
import boto3
45

@@ -11,10 +12,13 @@ def events():
1112
stage = sys.argv[1]
1213

1314
name = f'aws-scheduler-events-{stage}'
14-
response = client.list_tables()
15-
if name in response['TableNames']:
16-
print('Table %s already exists. Please delete it first.' % name)
17-
return
15+
while True:
16+
response = client.list_tables()
17+
if name in response['TableNames']:
18+
print('Table %s already exists. Please delete it first. Waiting 5 seconds until trying again...' % name)
19+
time.sleep(5)
20+
else:
21+
break
1822
client.create_table(
1923
TableName=name,
2024
AttributeDefinitions=[

0 commit comments

Comments
 (0)