Skip to content

Commit 117e26d

Browse files
authored
Merge pull request #241 from lap1817/feng-pan-synapse
[synapse] enable generator config update from ZK
2 parents 9226826 + 80bbf62 commit 117e26d

File tree

10 files changed

+403
-82
lines changed

10 files changed

+403
-82
lines changed

Gemfile.lock

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
PATH
22
remote: .
33
specs:
4-
synapse (0.14.7)
4+
synapse (0.15.0)
55
aws-sdk (~> 1.39)
66
docker-api (~> 1.7)
7+
hashdiff (~> 0.2.3)
78
logging (~> 1.8)
89
zk (~> 1.9.4)
910

README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ are proven routing components like [HAProxy](http://haproxy.1wt.eu/) or [NGINX](
3434
For every external service that your application talks to, we assign a synapse local port on localhost.
3535
Synapse creates a proxy from the local port to the service, and you reconfigure your application to talk to the proxy.
3636

37-
Under the hood, Synapse sports `service_watcher`s for service discovery and
37+
Under the hood, Synapse supports `service_watcher`s for service discovery and
3838
`config_generators` for configuring local state (e.g. load balancer configs)
3939
based on that service discovery state.
4040

41-
Synapse supports service discovery with with pluggable `service_watcher`s which
41+
Synapse supports service discovery with pluggable `service_watcher`s which
4242
take care of signaling to the `config_generators` so that they can react and
4343
reconfigure to point at available servers on the fly.
4444

@@ -183,7 +183,7 @@ relevant routing component. For example if you want to only configure HAProxy an
183183
not NGINX for a particular service, you would pass ``disabled`` to the `nginx` section
184184
of that service's watcher config.
185185

186-
* [`haproxy`](#haproxysvc): how will the haproxy section for this service be configured
186+
* [`haproxy`](#haproxysvc): how will the haproxy section for this service be configured. If the corresponding `watcher` is defined to use `zookeeper` and the service publishes its `haproxy` configure on ZK, the `haproxy` hash can be filled/updated via data from the ZK node.
187187
* [`nginx`](https://github.com/jolynch/synapse-nginx#service-watcher-config): how will the nginx section for this service be configured. **NOTE** to use this you must have the synapse-nginx [plugin](#plugins) installed.
188188

189189
The services hash may contain the following additional keys:
@@ -221,7 +221,7 @@ Given a `label_filters`: `[{ "label": "cluster", "value": "dev", "condition": "e
221221

222222
##### Zookeeper #####
223223

224-
This watcher retrieves a list of servers from zookeeper.
224+
This watcher retrieves a list of servers and also service config data from zookeeper.
225225
It takes the following mandatory arguments:
226226

227227
* `method`: zookeeper
@@ -230,6 +230,8 @@ It takes the following mandatory arguments:
230230

231231
The watcher assumes that each node under `path` represents a service server.
232232

233+
The watcher assumes that the data (if any) retrieved at znode `path` is a hash, where each key is named by a valid `config_generator` (e.g. `haproxy`) and the value is a hash that configs the generator.
234+
233235
The following arguments are optional:
234236

235237
* `decode`: A hash containing configuration for how to decode the data found in zookeeper.

lib/synapse/config_generator/haproxy.rb

Lines changed: 114 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
require 'socket'
66
require 'digest/sha1'
77
require 'set'
8+
require 'hashdiff'
89

910
class Synapse::ConfigGenerator
1011
class Haproxy < BaseGenerator
@@ -801,6 +802,8 @@ class Haproxy < BaseGenerator
801802
# should be enough for anyone right (famous last words)?
802803
MAX_SERVER_ID = (2**16 - 1).freeze
803804

805+
attr_reader :server_id_map, :state_cache
806+
804807
def initialize(opts)
805808
super(opts)
806809

@@ -845,8 +848,11 @@ def initialize(opts)
845848
@backends_cache = {}
846849
@watcher_revisions = {}
847850

848-
@state_file_path = @opts['state_file_path']
849-
@state_file_ttl = @opts.fetch('state_file_ttl', DEFAULT_STATE_FILE_TTL).to_i
851+
@state_cache = HaproxyState.new(
852+
@opts['state_file_path'],
853+
@opts.fetch('state_file_ttl', DEFAULT_STATE_FILE_TTL).to_i,
854+
self
855+
)
850856

851857
# For giving consistent orders, even if they are random
852858
@server_order_seed = @opts.fetch('server_order_seed', rand(2000)).to_i
@@ -907,15 +913,26 @@ def update_config(watchers)
907913
end
908914
end
909915

916+
def update_state_file(watchers)
917+
@state_cache.update_state_file(watchers)
918+
end
919+
910920
# generates a new config based on the state of the watchers
911921
def generate_config(watchers)
912922
new_config = generate_base_config
913923
shared_frontend_lines = generate_shared_frontend
914924

915925
watchers.each do |watcher|
916926
watcher_config = watcher.config_for_generator[name]
917-
@watcher_configs[watcher.name] ||= parse_watcher_config(watcher)
918-
next if watcher_config['disabled']
927+
next if watcher_config.nil? || watcher_config.empty? || watcher_config['disabled']
928+
@watcher_configs[watcher.name] = parse_watcher_config(watcher)
929+
930+
# if watcher_config is changed, trigger restart
931+
config_diff = HashDiff.diff(@state_cache.config_for_generator(watcher.name), watcher_config)
932+
if !config_diff.empty?
933+
log.info "synapse: restart required because config_for_generator changed. before: #{@state_cache.config_for_generator(watcher.name)}, after: #{watcher_config}"
934+
@restart_required = true
935+
end
919936

920937
regenerate = watcher.revision != @watcher_revisions[watcher.name] ||
921938
@frontends_cache[watcher.name].nil? ||
@@ -1051,7 +1068,7 @@ def generate_backend_stanza(watcher, config)
10511068

10521069
# The ordering here is important. First we add all the backends in the
10531070
# disabled state...
1054-
seen.fetch(watcher.name, []).each do |backend_name, backend|
1071+
@state_cache.backends(watcher).each do |backend_name, backend|
10551072
backends[backend_name] = backend.merge('enabled' => false)
10561073
# We remember the haproxy_server_id from a previous reload here.
10571074
# Note though that if live servers below define haproxy_server_id
@@ -1308,74 +1325,113 @@ def construct_name(backend)
13081325
######################################
13091326
# methods for managing the state file
13101327
######################################
1311-
def seen
1312-
# if we don't support the state file, return nothing
1313-
return {} if @state_file_path.nil?
1328+
class HaproxyState
1329+
include Synapse::Logging
13141330

1315-
# if we've never needed the backends, now is the time to load them
1316-
@seen = read_state_file if @seen.nil?
1331+
# TODO: enable version in the Haproxy Cache File
1332+
KEY_WATCHER_CONFIG_FOR_GENERATOR = "watcher_config_for_generator"
1333+
NON_BACKENDS_KEYS = [KEY_WATCHER_CONFIG_FOR_GENERATOR]
13171334

1318-
@seen
1319-
end
1335+
def initialize(state_file_path, state_file_ttl, haproxy)
1336+
@state_file_path = state_file_path
1337+
@state_file_ttl = state_file_ttl
1338+
@haproxy = haproxy
1339+
end
13201340

1321-
def update_state_file(watchers)
1322-
# if we don't support the state file, do nothing
1323-
return if @state_file_path.nil?
1324-
1325-
log.info "synapse: writing state file"
1326-
timestamp = Time.now.to_i
1327-
1328-
# Remove stale backends
1329-
seen.each do |watcher_name, backends|
1330-
backends.each do |backend_name, backend|
1331-
ts = backend.fetch('timestamp', 0)
1332-
delta = (timestamp - ts).abs
1333-
if delta > @state_file_ttl
1334-
log.info "synapse: expiring #{backend_name} with age #{delta}"
1335-
backends.delete(backend_name)
1336-
end
1341+
def backends(watcher_name)
1342+
if seen.key?(watcher_name)
1343+
seen[watcher_name].select { |section, data| !NON_BACKENDS_KEYS.include?(section) }
1344+
else
1345+
{}
13371346
end
13381347
end
13391348

1340-
# Remove any services which no longer have any backends
1341-
seen.reject!{|watcher_name, backends| backends.keys.length == 0}
1349+
def config_for_generator(watcher_name)
1350+
cache_config = {}
1351+
if seen.key?(watcher_name) && seen[watcher_name].key?(KEY_WATCHER_CONFIG_FOR_GENERATOR)
1352+
cache_config = seen[watcher_name][KEY_WATCHER_CONFIG_FOR_GENERATOR]
1353+
end
13421354

1343-
# Add backends from watchers
1344-
watchers.each do |watcher|
1345-
seen[watcher.name] ||= {}
1355+
cache_config
1356+
end
13461357

1347-
watcher.backends.each do |backend|
1348-
backend_name = construct_name(backend)
1349-
data = {
1350-
'timestamp' => timestamp,
1351-
}
1352-
server_id = @server_id_map[watcher.name][backend_name].to_i
1353-
if server_id && server_id > 0 && server_id <= MAX_SERVER_ID
1354-
data['haproxy_server_id'] = server_id
1358+
def update_state_file(watchers)
1359+
# if we don't support the state file, do nothing
1360+
return if @state_file_path.nil?
1361+
1362+
log.info "synapse: writing state file"
1363+
timestamp = Time.now.to_i
1364+
1365+
# Remove stale backends
1366+
seen.each do |watcher_name, data|
1367+
backends(watcher_name).each do |backend_name, backend|
1368+
ts = backend.fetch('timestamp', 0)
1369+
delta = (timestamp - ts).abs
1370+
if delta > @state_file_ttl
1371+
log.info "synapse: expiring #{backend_name} with age #{delta}"
1372+
data.delete(backend_name)
1373+
end
13551374
end
1375+
end
13561376

1357-
seen[watcher.name][backend_name] = data.merge(backend)
1377+
# Remove any services which no longer have any backends
1378+
seen.reject!{|watcher_name, data| backends(watcher_name).keys.length == 0}
1379+
1380+
# Add backends and config from watchers
1381+
watchers.each do |watcher|
1382+
seen[watcher.name] ||= {}
1383+
1384+
watcher.backends.each do |backend|
1385+
backend_name = @haproxy.construct_name(backend)
1386+
data = {
1387+
'timestamp' => timestamp,
1388+
}
1389+
server_id = @haproxy.server_id_map[watcher.name][backend_name].to_i
1390+
if server_id && server_id > 0 && server_id <= MAX_SERVER_ID
1391+
data['haproxy_server_id'] = server_id
1392+
end
1393+
1394+
seen[watcher.name][backend_name] = data.merge(backend)
1395+
end
1396+
1397+
# Add config for generator from watcher
1398+
if watcher.config_for_generator.key?(@haproxy.name)
1399+
seen[watcher.name][KEY_WATCHER_CONFIG_FOR_GENERATOR] =
1400+
watcher.config_for_generator[@haproxy.name]
1401+
end
13581402
end
1403+
1404+
# write the data!
1405+
write_data_to_state_file(seen)
13591406
end
13601407

1361-
# write the data!
1362-
write_data_to_state_file(seen)
1363-
end
1408+
private
13641409

1365-
def read_state_file
1366-
# Some versions of JSON return nil on an empty file ...
1367-
JSON.load(File.read(@state_file_path)) || {}
1368-
rescue StandardError => e
1369-
# It's ok if the state file doesn't exist or contains invalid data
1370-
# The state file will be rebuilt automatically
1371-
{}
1372-
end
1410+
def seen
1411+
# if we don't support the state file, return nothing
1412+
return {} if @state_file_path.nil?
1413+
1414+
# if we've never needed the backends, now is the time to load them
1415+
@seen = read_state_file if @seen.nil?
1416+
1417+
@seen
1418+
end
13731419

1374-
# we do this atomically so the state file is always consistent
1375-
def write_data_to_state_file(data)
1376-
tmp_state_file_path = @state_file_path + ".tmp"
1377-
File.write(tmp_state_file_path, JSON.pretty_generate(data))
1378-
FileUtils.mv(tmp_state_file_path, @state_file_path)
1420+
def read_state_file
1421+
# Some versions of JSON return nil on an empty file ...
1422+
JSON.load(File.read(@state_file_path)) || {}
1423+
rescue StandardError => e
1424+
# It's ok if the state file doesn't exist or contains invalid data
1425+
# The state file will be rebuilt automatically
1426+
{}
1427+
end
1428+
1429+
# we do this atomically so the state file is always consistent
1430+
def write_data_to_state_file(data)
1431+
tmp_state_file_path = @state_file_path + ".tmp"
1432+
File.write(tmp_state_file_path, JSON.pretty_generate(data))
1433+
FileUtils.mv(tmp_state_file_path, @state_file_path)
1434+
end
13791435
end
13801436
end
13811437
end

lib/synapse/service_watcher/base.rb

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
require 'synapse/log'
22
require 'set'
3+
require 'hashdiff'
34

45
class Synapse::ServiceWatcher
56
class BaseWatcher
67
include Synapse::Logging
78

89
LEADER_WARN_INTERVAL = 30
910

10-
attr_reader :name, :config_for_generator, :revision
11+
attr_reader :name, :revision
1112

1213
def initialize(opts={}, synapse)
1314
super()
@@ -99,6 +100,11 @@ def ping?
99100
true
100101
end
101102

103+
# deep clone the hash to protect its readonly property
104+
def config_for_generator
105+
Marshal.load( Marshal.dump(@config_for_generator))
106+
end
107+
102108
def backends
103109
filtered = backends_filtered_by_labels
104110

@@ -152,7 +158,7 @@ def backends_filtered_by_labels
152158
end
153159
end
154160

155-
def set_backends(new_backends)
161+
def set_backends(new_backends, new_config_for_generator = {})
156162
# Aggregate and deduplicate all potential backend service instances.
157163
new_backends = (new_backends + @default_servers) if @keep_default_servers
158164
# Substitute backend_port_override for the provided port
@@ -165,7 +171,20 @@ def set_backends(new_backends)
165171
[b['host'], b['port'], b.fetch('name', '')]
166172
}
167173

174+
backends_updated = update_backends(new_backends)
175+
config_updated = update_config_for_generator(new_config_for_generator)
176+
177+
if backends_updated || config_updated
178+
reconfigure!
179+
return true
180+
else
181+
return false
182+
end
183+
end
184+
185+
def update_backends(new_backends)
168186
if new_backends.to_set == @backends.to_set
187+
log.info "synapse: backends for service #{@name} do not change."
169188
return false
170189
end
171190

@@ -192,11 +211,28 @@ def set_backends(new_backends)
192211
@backends = new_backends
193212
end
194213

195-
reconfigure!
196-
197214
return true
198215
end
199216

217+
def update_config_for_generator(new_config_for_generator)
218+
if new_config_for_generator.empty?
219+
log.info "synapse: no config_for_generator data from #{name} for" \
220+
" service #{@name}; keep existing config_for_generator: #{@config_for_generator.inspect}"
221+
return false
222+
else
223+
log.info "synapse: discovered config_for_generator for service #{@name}"
224+
diff = HashDiff.diff(new_config_for_generator, config_for_generator)
225+
226+
if diff.empty?
227+
log.info "synapse: config_for_generator for service #{@name} does not change."
228+
return false
229+
else
230+
@config_for_generator = new_config_for_generator
231+
return true
232+
end
233+
end
234+
end
235+
200236
# Subclasses should not invoke this directly; it's only exposed so that it
201237
# can be overridden in subclasses.
202238
def reconfigure!

0 commit comments

Comments
 (0)