2222import com .kaazing .net .ws .amqp .ConnectionEvent ;
2323import com .kaazing .net .ws .amqp .ConnectionListener ;
2424
25+ import javax .net .ssl .SSLHandshakeException ;
26+
2527public class JavaAmqpClientDemo {
2628 private AmqpClient amqpClient ;
2729 private AmqpChannel publishChannel = null ;
@@ -31,10 +33,19 @@ public class JavaAmqpClientDemo {
3133 private final String myConsumerTag = "clientkey" ;
3234 private final String routingKey = "broadcastkey" ;
3335 private final String virtualHost = "/" ;
34- private String login ;
36+ private final String url ;
37+ private final String login ;
38+ private final String password ;
39+
3540
36- public JavaAmqpClientDemo (URI url , String login , String password ) throws InterruptedException {
41+ public JavaAmqpClientDemo (String url , String login , String password ) throws InterruptedException {
42+ this .url = url ;
3743 this .login = login ;
44+ this .password = password ;
45+ }
46+
47+ public void handleConnection () throws InterruptedException {
48+
3849 AmqpClientFactory amqpClientFactory = AmqpClientFactory .createAmqpClientFactory ();
3950 amqpClient = amqpClientFactory .createAmqpClient ();
4051 System .out .println ("CONNECTING: " + url + " " + login + "/" + password );
@@ -62,6 +73,7 @@ public void onConnectionClose(ConnectionEvent e) {
6273 if (consumeChannel != null ) {
6374 consumeChannel .closeChannel (0 , "" , 0 , 0 );
6475 }
76+ System .exit (0 );
6577
6678 }
6779
@@ -70,7 +82,7 @@ public void onConnectionError(ConnectionEvent e) {
7082 System .exit (-1 );
7183 }
7284 });
73- amqpClient .connect (url . toString () , virtualHost , login , password );
85+ amqpClient .connect (url , virtualHost , login , password );
7486 connectionLatch .await (10 , TimeUnit .SECONDS );
7587
7688 final CountDownLatch pubChannelLatch = new CountDownLatch (1 );
@@ -119,7 +131,8 @@ public void onClose(ChannelEvent e) {
119131
120132 @ Override
121133 public void onConsumeBasic (ChannelEvent e ) {
122- System .out .println ("CONSUME FROM QUEUE: " + queueName );
134+ System .out .println ("CONSUME FROM QUEUE: " + queueName +"\n " );
135+ System .out .print ("User input: " );
123136 }
124137
125138 @ Override
@@ -141,15 +154,15 @@ public void onMessage(final ChannelEvent e) {
141154 final Long dt = (Long ) e .getArgument ("deliveryTag" );
142155 final String value = new String (bytes , Charset .forName ("UTF-8" ));
143156
144- System .out .println (">>> MESSAGE RECEIVED: " + value );
157+ System .out .println ("<- MESSAGE RECEIVED: " + value );
145158 AmqpProperties props = e .getAmqpProperties ();
146159 if (props != null ) {
147160 AmqpArguments headers = props .getHeaders ();
148-
161+ System . out . println ( "Amqp properties: " );
149162 if (headers != null ) {
150- System .out .println ("Headers: " + headers .toString ());
163+ System .out .println ("- Headers: " + headers .toString ());
151164 }
152- System .out .println ("Properties " + (String ) props .toString ());
165+ System .out .println ("- Properties " + (String ) props .toString ());
153166
154167 // Acknowledge the message as we passed in a 'false' for
155168 // noAck in AmqpChannel.consumeBasic() call. If the
@@ -160,6 +173,7 @@ public void onMessage(final ChannelEvent e) {
160173 AmqpChannel channel = e .getChannel ();
161174 channel .ackBasic (dt .longValue (), true );
162175 }
176+ System .out .print ("\n User input: " );
163177 }
164178
165179 @ Override
@@ -177,6 +191,11 @@ public void disconnect() {
177191 }
178192
179193 public void sendMessage (String message ) {
194+ // This check needs to be done, otherwise if the user would hit enter without a message,
195+ // nothing would be sent and the program would disconnect and terminate
196+ if (message .equals ("" )){
197+ message = " " ;
198+ }
180199
181200 ByteBuffer buffer = ByteBuffer .allocate (512 );
182201 buffer .put (message .getBytes (Charset .forName ("UTF-8" )));
@@ -201,22 +220,6 @@ public void sendMessage(String message) {
201220 props .setHeaders (customHeaders );
202221
203222 publishChannel .publishBasic (buffer , props , exchangeName , routingKey , false , false );
204- System .out .println ("MESSAGE PUBLISHED: " + message );
223+ System .out .println ("-> MESSAGE PUBLISHED: " + message );
205224 }
206-
207- public static void main (String [] args ) throws InterruptedException , URISyntaxException , IOException {
208- JavaAmqpClientDemo demo = new JavaAmqpClientDemo (new URI ("wss://sandbox.kaazing.net/amqp091" ), "guest" , "guest" );
209- System .out .println ("Kaazing Java AMQP Demo App. Copyright (C) 2016 Kaazing, Inc." );
210- System .out .println ("Type the message to send or <exit> to stop." );
211- BufferedReader console = new BufferedReader (new InputStreamReader (System .in ));
212- while (true ) {
213- String text = console .readLine ();
214- if (text .toLowerCase ().equals ("<exit>" ))
215- break ;
216- // Send as a text
217- demo .sendMessage (text );
218- }
219- demo .disconnect ();
220- }
221-
222225}
0 commit comments