Skip to content

Commit 600ed18

Browse files
committed
Implement GPRABBITMQ-29: allow full configuration of listeners.
This commit allows users to configure almost every aspect of their service listeners through a combination of the bean property override mechanism and the rabbitQueue/rabbitSubscribe properties. This includes determining whether a particular service listener is transactional, what message converter is uses, and how many consumers it has. The change involved a fairly significant refactor that made it easy to add automatic reloading of service listeners - so users can modify them on the fly now too!
1 parent 3c3ce20 commit 600ed18

File tree

14 files changed

+409
-84
lines changed

14 files changed

+409
-84
lines changed

RabbitmqGrailsPlugin.groovy

Lines changed: 47 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
import org.codehaus.groovy.grails.commons.GrailsClassUtils as GCU
1+
import org.codehaus.groovy.grails.commons.ServiceArtefactHandler
22
import org.grails.rabbitmq.AutoQueueMessageListenerContainer
33
import org.grails.rabbitmq.RabbitDynamicMethods
4+
import org.grails.rabbitmq.RabbitErrorHandler
45
import org.grails.rabbitmq.RabbitQueueBuilder
6+
import org.grails.rabbitmq.RabbitServiceConfigurer
57
import org.springframework.amqp.core.Binding
68
import org.springframework.amqp.core.Queue
79
import org.springframework.amqp.rabbit.core.RabbitAdmin
@@ -13,7 +15,7 @@ import org.grails.rabbitmq.RabbitConfigurationHolder
1315

1416
class RabbitmqGrailsPlugin {
1517
// the plugin version
16-
def version = "0.3.4-SNAPSHOT"
18+
def version = "0.4-SNAPSHOT"
1719
// the version or versions of Grails the plugin is designed for
1820
def grailsVersion = "1.2 > *"
1921
// the other plugins this plugin depends on
@@ -46,8 +48,6 @@ class RabbitmqGrailsPlugin {
4648
def loadAfter = ['services']
4749
def observe = ['*']
4850

49-
private static LISTENER_CONTAINER_SUFFIX = '_MessageListenerContainer'
50-
5151
def doWithSpring = {
5252

5353
def rabbitmqConfig = application.config.rabbitmq
@@ -69,7 +69,8 @@ class RabbitmqGrailsPlugin {
6969

7070
log.debug "Connecting to rabbitmq ${connectionFactoryUsername}@${connectionFactoryHostname} with ${configHolder.getDefaultConcurrentConsumers()} consumers."
7171

72-
def connectionFactoryClassName = connectionFactoryConfig?.className ?: 'org.springframework.amqp.rabbit.connection.CachingConnectionFactory'
72+
def connectionFactoryClassName = connectionFactoryConfig?.className ?:
73+
'org.springframework.amqp.rabbit.connection.CachingConnectionFactory'
7374
def parentClassLoader = getClass().classLoader
7475
def loader = new GroovyClassLoader(parentClassLoader)
7576
def connectionFactoryClass = loader.loadClass(connectionFactoryClassName)
@@ -87,72 +88,22 @@ class RabbitmqGrailsPlugin {
8788
if (messageConverterBean) messageConverter = ref(messageConverterBean)
8889
}
8990
adm(RabbitAdmin, rabbitMQConnectionFactory)
91+
rabbitErrorHandler(RabbitErrorHandler)
92+
93+
// Add beans to hook up services as AMQP listeners.
9094
Set registeredServices = new HashSet()
91-
application.serviceClasses.each { service ->
92-
def serviceClass = service.clazz
93-
def propertyName = service.propertyName
95+
for(service in application.serviceClasses) {
96+
def serviceConfigurer = new RabbitServiceConfigurer(service, rabbitmqConfig)
97+
if(!serviceConfigurer.isListener() || !configHolder.isServiceEnabled(service)) continue
9498

95-
def transactional = service.transactional
96-
if (!(rabbitmqConfig."${propertyName}".transactional instanceof ConfigObject)) {
97-
transactional = rabbitmqConfig."${propertyName}".transactional as Boolean
99+
def propertyName = service.propertyName
100+
if(!registeredServices.add(propertyName)) {
101+
throw new IllegalArgumentException(
102+
"Unable to initialize rabbitmq listeners properly." +
103+
" More than one service named ${propertyName}.")
98104
}
99105

100-
def rabbitQueue = GCU.getStaticPropertyValue(serviceClass, 'rabbitQueue')
101-
if(rabbitQueue) {
102-
if(configHolder.isServiceEnabled(service)) {
103-
def serviceConcurrentConsumers = configHolder.getServiceConcurrentConsumers(service)
104-
log.info("Setting up rabbitmq listener for ${service.clazz} with ${serviceConcurrentConsumers} consumer(s)")
105-
if(!registeredServices.add(propertyName)){
106-
throw new IllegalArgumentException("Unable to initialize rabbitmq listeners properly. More than one service named ${propertyName}.")
107-
}
108-
109-
"${propertyName}${LISTENER_CONTAINER_SUFFIX}"(SimpleMessageListenerContainer) {
110-
// We manually start the listener once we have attached the
111-
// service in doWithApplicationContext.
112-
autoStartup = false
113-
channelTransacted = transactional
114-
connectionFactory = rabbitMQConnectionFactory
115-
concurrentConsumers = serviceConcurrentConsumers
116-
queueNames = rabbitQueue
117-
}
118-
} else {
119-
log.info("Not listening to ${service.clazz} it is disabled in configuration")
120-
}
121-
}
122-
else {
123-
def rabbitSubscribe = GCU.getStaticPropertyValue(serviceClass, 'rabbitSubscribe')
124-
if (rabbitSubscribe) {
125-
if (!(rabbitSubscribe instanceof CharSequence) && !(rabbitSubscribe instanceof Map)) {
126-
log.error "The 'rabbitSubscribe' property on service ${service.fullName} must be a string or a map"
127-
}
128-
else {
129-
if(configHolder.isServiceEnabled(service)) {
130-
def serviceConcurrentConsumers = configHolder.getServiceConcurrentConsumers(service)
131-
log.info("Setting up rabbitmq listener for ${service.clazz} with ${serviceConcurrentConsumers} consumer(s)")
132-
if(!registeredServices.add(propertyName)){
133-
throw new IllegalArgumentException("Unable to initialize rabbitmq listeners properly. More than one service named ${propertyName}.")
134-
}
135-
"${propertyName}${LISTENER_CONTAINER_SUFFIX}"(AutoQueueMessageListenerContainer) {
136-
// We manually start the listener once we have attached the
137-
// service in doWithApplicationContext.
138-
autoStartup = false
139-
channelTransacted = transactional
140-
connectionFactory = rabbitMQConnectionFactory
141-
concurrentConsumers = serviceConcurrentConsumers
142-
if (rabbitSubscribe instanceof Map) {
143-
exchangeBeanName = "grails.rabbit.exchange.${rabbitSubscribe.name}"
144-
routingKey = rabbitSubscribe.routingKey ?: '#'
145-
}
146-
else {
147-
exchangeBeanName = "grails.rabbit.exchange.${rabbitSubscribe}"
148-
}
149-
}
150-
} else {
151-
log.info("Not listening to ${service.clazz} it is disabled in configuration")
152-
}
153-
}
154-
}
155-
}
106+
serviceConfigurer.configure(delegate)
156107
}
157108

158109
def queuesConfig = application.config.rabbitmq?.queues
@@ -223,16 +174,9 @@ class RabbitmqGrailsPlugin {
223174
}
224175

225176
def doWithApplicationContext = { applicationContext ->
226-
def rabbitTemplate = applicationContext.getBean('rabbitTemplate', RabbitTemplate.class)
227177
def containerBeans = applicationContext.getBeansOfType(SimpleMessageListenerContainer)
228178
containerBeans.each { beanName, bean ->
229-
if(beanName.endsWith(LISTENER_CONTAINER_SUFFIX)) {
230-
def adapter = new MessageListenerAdapter()
231-
def serviceName = beanName - LISTENER_CONTAINER_SUFFIX
232-
adapter.delegate = applicationContext.getBean(serviceName)
233-
adapter.messageConverter = rabbitTemplate.messageConverter
234-
bean.messageListener = adapter
235-
179+
if(isServiceListener(beanName)) {
236180
// Now that the listener is properly configured, we can start it.
237181
bean.start()
238182
}
@@ -242,6 +186,34 @@ class RabbitmqGrailsPlugin {
242186
def onChange = { evt ->
243187
if(evt.source instanceof Class) {
244188
addDynamicMessageSendingMethods ([evt.source], evt.ctx)
189+
190+
// If a service has changed, reload the associated beans
191+
if(isServiceEventSource(application, evt.source)) {
192+
def serviceGrailsClass = application.addArtefact(ServiceArtefactHandler.TYPE, evt.source)
193+
def serviceConfigurer = new RabbitServiceConfigurer(
194+
serviceGrailsClass,
195+
application.config.rabbitmq)
196+
if (serviceConfigurer.isListener()) {
197+
def beans = beans {
198+
serviceConfigurer.configure(delegate)
199+
}
200+
beans.registerBeans(evt.ctx)
201+
startServiceListener(serviceGrailsClass.propertyName, evt.ctx)
202+
}
203+
}
245204
}
246205
}
206+
207+
protected isServiceListener(beanName) {
208+
return beanName.endsWith(RabbitServiceConfigurer.LISTENER_CONTAINER_SUFFIX)
209+
}
210+
211+
protected isServiceEventSource(application, source) {
212+
return application.isArtefactOfType(ServiceArtefactHandler.TYPE, source)
213+
}
214+
215+
protected startServiceListener(servicePropertyName, applicationContext) {
216+
def beanName = servicePropertyName + RabbitServiceConfigurer.LISTENER_CONTAINER_SUFFIX
217+
applicationContext.getBean(beanName).start()
218+
}
247219
}

application.properties

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#Grails Metadata file
2-
#Wed Jan 25 17:24:02 GMT 2012
3-
app.grails.version=2.0.0
2+
#Sun Mar 04 13:14:00 GMT 2012
3+
app.grails.version=2.0.1
44
app.name=rabbitmq
5+
plugins.release=1.0.1
56
plugins.svn=1.0.2

grails-app/conf/BuildConfig.groovy

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,5 @@ grails.project.dependency.resolution = {
3030
}
3131

3232
plugins {
33-
build(":release:1.0.1") {
34-
export = false
35-
}
3633
}
3734
}

grails-app/conf/Config.groovy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
grails.doc.authors = 'Jeff Brown, Peter Ledbrook'
22
grails.doc.license = 'Apache License 2.0'
33
grails.doc.title = 'RabbitMQ Plugin'
4+
grails.doc.'api.org.springframework.amqp'='http://static.springsource.org/spring-amqp/docs/1.0.x/apidocs'
5+
46
grails.views.default.codec="none" // none, html, base64
57
grails.views.gsp.encoding="UTF-8"

src/docs/guide/configuration.gdoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ rabbitmq.connectionfactory.password | The password for connection to the server
2222
rabbitmq.connectionfactory.hostname | The host name of the server | (none)
2323
rabbitmq.connectionfactory.virtualHost | The name of the virtual host to connect to | '/'
2424
rabbitmq.connectionfactory.channelCacheSize | The connection channel cache size | 10
25-
rabbitmq.concurrentConsumers | The number of concurrent consumers to create per message handler. Raising the number is recommendable in order to scale the consumption of messages coming in from a queue. Note that ordering guarantees are lost when multiple consumers are registered. | 1
25+
rabbitmq.concurrentConsumers | The number of concurrent consumers to create per message handler. Raising the number is recommended in order to scale the consumption of messages coming in from a queue. Note that ordering guarantees are lost when multiple consumers are registered. | 1
26+
rabbitmq.disableListening | Disables all service listeners so that they won't receive any messages. | false
2627
{table}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
When you need fine-grained control over your service listeners, you can tap into the power of Spring. Since each service listener is implemented as a set of Spring beans, you can use Grails' [bean property override|http://grails.org/doc/latest/guide/spring.html#propertyOverrideConfiguration] mechanism to provide your own low-level settings.
2+
3+
So how are these beans set up? If a service has either a @rabbitQueue@ or @rabbitSubscribe@ property, then you will have these beans:
4+
5+
* @<serviceName>_MessageListenerContainer@ of type [SimpleMessageListenerContainer|api:org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer]
6+
* @<serviceName>RabbitAdapter@ of type [MessageListenerAdapter|api:org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter]
7+
8+
As an example, let's say you have a @MessageStoreService@ like so:
9+
10+
{code}
11+
class MessageStoreService {
12+
static rabbitSubscribe = [exchange: "amq.topic", routingKey: "logs.#"]
13+
...
14+
}
15+
{code}
16+
17+
You can then customise things like the number of concurrent consumers, whether the channel is transacted, what the prefetch count should be, and more! Simply add code like this to your runtime configuration (Config.groovy):
18+
19+
{code}
20+
beans {
21+
messageStoreService_MessageListenerContainer {
22+
channelTransacted = false
23+
concurrentConsumers = 10
24+
prefetchCount = 5
25+
queueNames = ["q1", "q2"] as String[]
26+
}
27+
28+
messageStoreServiceRabbitAdapter {
29+
encoding = "UTF-8"
30+
responseRoutingKey = "replyQueue"
31+
}
32+
}
33+
{code}
34+
35+
This approach works for any property that accepts a basic type. But what about bean references? In this case, you can't use the bean property overrides. Fortunately, the most common bean reference you are likely to want to override, the message converter, has a dedicated configuration option:
36+
37+
{code}
38+
rabbitmq.messageConverterBean = "myCustomMessageConverter"
39+
{code}
40+
41+
This is a global setting that accepts the name of a message converter bean. For the rare occasions that you need to override other bean references, you can declare your own @<serviceName>_MessageListenerContainer@ or @<serviceName>_RabbitAdapter@ beans in resources.groovy.
42+
43+
Finally, you can override some of the global config options on a per-service basis:
44+
45+
{code}
46+
rabbitmq {
47+
services {
48+
messageStoreService {
49+
concurrentConsumers = 50
50+
disableListening = true
51+
}
52+
}
53+
}
54+
{code}
55+
56+
There are many options for customisation and we hope the above will get you started.

src/docs/guide/configuration/configuringExchanges.gdoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ This approach isn't limited to topic exchanges: you can automatically bind queue
6868
# the 'binding' is ignored for fanout exchanges; and
6969
# the headers exchange requires a map of message header names and values for its binding.
7070

71+
{note}
72+
RabbitMQ has several built-in exchanges with names of the form 'amq.*', for example 'amq.direct'. If you want to bind to these, you currently have to declare them with the correct attributes, i.e.
73+
74+
{code}
75+
exchange name: "amq.direct", type: direct, durable: true, autoDelete: false
76+
{code}
77+
{note}
78+
7179
As you can imagine, these few building blocks allow you to configure some pretty complex messaging systems with very little effort. You can tailor the messaging system to your needs rather than tailor your applications to the messaging system.
7280

7381

src/docs/guide/consumingMessages/manualQueueManagement.gdoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,12 @@ class DemoService {
1515

1616
As with the pub/sub model, messages are delivered to the service by invoking the @handleMessage()@ method. That's all there is to it! The real trick is to configure your exchanges and queues with appropriate bindings, as we described in the configuration section.
1717

18+
If you want more say in the configuration of the underlying listener, then you can also specify a map:
19+
20+
{code}
21+
static rabbitQueue = [queues: "someQueueName", channelTransacted: true]
22+
{code}
23+
24+
The "queues" option can either be a simple queue name or a list of queue names. Again, have a look at the [advanced configuration section|guide:advancedConfig] for information about the extra properties you can set here.
25+
1826
One last subject to discuss is the form that the messages take.

src/docs/guide/consumingMessages/messages.gdoc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,23 @@ class DemoService {
3030
{code}
3131

3232
This is a great convenience, but be aware that using serializable Java objects limits the types of client you can interact with. If all the clients you're interested in are using Spring AMQP, then you should be fine, but don't expect Ruby or Python clients to handle @Map@ messages! For production systems, we recommend you use strings and byte arrays.
33+
34+
Sometimes you want access to the raw message, particularly if you want to look at the message headers. If so, just change the signature of the @handleMessage()@ method and add an extra option to your @rabbitQueue@ or @rabbitSubscribe@ property:
35+
36+
{code}
37+
package org.grails.rabbitmq.test
38+
39+
import org.springframework.amqp.core.Message
40+
41+
class DemoService {
42+
static rabbitQueue = [queues: 'someQueueName', messageConverterBean: '']
43+
44+
void handleMessage(Message msg) {
45+
// Do something with the message headers
46+
println "Received message with content type ${msg.contentType};${msg.encoding}"
47+
...
48+
}
49+
}
50+
{code}
51+
52+
As you can see, all you have to do is accept an argument of type @Message@ and add the @messageConverterBean@ option with an empty string as its value. This disables the automatic message conversion, allowing you to interrogate the raw message as required.

src/docs/guide/consumingMessages/pubSub.gdoc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,14 @@ In this example, the service will only receive messages that have a routing key
4040

4141
Under the hood, the plugin creates a temporary, exclusive queue for your service which is removed from the broker when your application shuts down. There is no way for you to control the name of the queue or attach another listener to it, but then that's the point in this case. If you do need more control, then you must manage the queues and their bindings yourself.
4242

43+
The map syntax also allows you to customise the properties of the Spring message listener container and the corresponding listener adapter (see the section on [advanced configuration|guide:advancedConfig] for more details on these). For example,
44+
45+
{code}
46+
static rabbitSubscribe = [
47+
name: 'shares',
48+
routingKey: 'NYSE.GE',
49+
encoding: "ISO-8859-1",
50+
prefetchCount: 1]
51+
{code}
52+
53+
will set the encoding and prefetch count for just this service listener. This technique is also possible for straight queue listeners as well.

0 commit comments

Comments
 (0)