1717 */
1818package org .apache .hadoop .portmap ;
1919
20- import java .util .HashMap ;
20+ import java .util .concurrent . ConcurrentHashMap ;
2121
2222import org .apache .commons .logging .Log ;
2323import org .apache .commons .logging .LogFactory ;
4040import org .jboss .netty .handler .timeout .IdleStateAwareChannelUpstreamHandler ;
4141import org .jboss .netty .handler .timeout .IdleStateEvent ;
4242
43- final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler implements PortmapInterface {
43+ final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
4444 static final int PROGRAM = 100000 ;
4545 static final int VERSION = 2 ;
46+
47+ static final int PMAPPROC_NULL = 0 ;
48+ static final int PMAPPROC_SET = 1 ;
49+ static final int PMAPPROC_UNSET = 2 ;
50+ static final int PMAPPROC_GETPORT = 3 ;
51+ static final int PMAPPROC_DUMP = 4 ;
52+ static final int PMAPPROC_GETVERSADDR = 9 ;
53+
4654 private static final Log LOG = LogFactory .getLog (RpcProgramPortmap .class );
4755
48- /** Map synchronized usis monitor lock of this instance */
49- private final HashMap <String , PortmapMapping > map ;
56+ private final ConcurrentHashMap <String , PortmapMapping > map = new ConcurrentHashMap <String , PortmapMapping >();
5057
5158 /** ChannelGroup that remembers all active channels for gracefully shutdown. */
5259 private final ChannelGroup allChannels ;
5360
5461 RpcProgramPortmap (ChannelGroup allChannels ) {
5562 this .allChannels = allChannels ;
56- map = new HashMap <String , PortmapMapping >(256 );
5763 PortmapMapping m = new PortmapMapping (PROGRAM , VERSION ,
5864 PortmapMapping .TRANSPORT_TCP , RpcProgram .RPCB_PORT );
5965 PortmapMapping m1 = new PortmapMapping (PROGRAM , VERSION ,
6066 PortmapMapping .TRANSPORT_UDP , RpcProgram .RPCB_PORT );
6167 map .put (PortmapMapping .key (m ), m );
6268 map .put (PortmapMapping .key (m1 ), m1 );
6369 }
64-
65- @ Override
66- public XDR nullOp (int xid , XDR in , XDR out ) {
70+
71+ /**
72+ * This procedure does no work. By convention, procedure zero of any protocol
73+ * takes no parameters and returns no results.
74+ */
75+ private XDR nullOp (int xid , XDR in , XDR out ) {
6776 return PortmapResponse .voidReply (out , xid );
6877 }
6978
70- @ Override
71- public XDR set (int xid , XDR in , XDR out ) {
79+ /**
80+ * When a program first becomes available on a machine, it registers itself
81+ * with the port mapper program on the same machine. The program passes its
82+ * program number "prog", version number "vers", transport protocol number
83+ * "prot", and the port "port" on which it awaits service request. The
84+ * procedure returns a boolean reply whose value is "TRUE" if the procedure
85+ * successfully established the mapping and "FALSE" otherwise. The procedure
86+ * refuses to establish a mapping if one already exists for the tuple
87+ * "(prog, vers, prot)".
88+ */
89+ private XDR set (int xid , XDR in , XDR out ) {
7290 PortmapMapping mapping = PortmapRequest .mapping (in );
7391 String key = PortmapMapping .key (mapping );
7492 if (LOG .isDebugEnabled ()) {
7593 LOG .debug ("Portmap set key=" + key );
7694 }
7795
78- PortmapMapping value = null ;
79- synchronized (this ) {
80- map .put (key , mapping );
81- value = map .get (key );
82- }
83- return PortmapResponse .intReply (out , xid , value .getPort ());
96+ map .put (key , mapping );
97+ return PortmapResponse .intReply (out , xid , mapping .getPort ());
8498 }
8599
86- @ Override
87- public synchronized XDR unset (int xid , XDR in , XDR out ) {
100+ /**
101+ * When a program becomes unavailable, it should unregister itself with the
102+ * port mapper program on the same machine. The parameters and results have
103+ * meanings identical to those of "PMAPPROC_SET". The protocol and port number
104+ * fields of the argument are ignored.
105+ */
106+ private XDR unset (int xid , XDR in , XDR out ) {
88107 PortmapMapping mapping = PortmapRequest .mapping (in );
89- synchronized (this ) {
90- map .remove (PortmapMapping .key (mapping ));
91- }
108+ String key = PortmapMapping .key (mapping );
109+
110+ if (LOG .isDebugEnabled ())
111+ LOG .debug ("Portmap remove key=" + key );
112+
113+ map .remove (key );
92114 return PortmapResponse .booleanReply (out , xid , true );
93115 }
94116
95- @ Override
96- public synchronized XDR getport (int xid , XDR in , XDR out ) {
117+ /**
118+ * Given a program number "prog", version number "vers", and transport
119+ * protocol number "prot", this procedure returns the port number on which the
120+ * program is awaiting call requests. A port value of zeros means the program
121+ * has not been registered. The "port" field of the argument is ignored.
122+ */
123+ private XDR getport (int xid , XDR in , XDR out ) {
97124 PortmapMapping mapping = PortmapRequest .mapping (in );
98125 String key = PortmapMapping .key (mapping );
99126 if (LOG .isDebugEnabled ()) {
100127 LOG .debug ("Portmap GETPORT key=" + key + " " + mapping );
101128 }
102- PortmapMapping value = null ;
103- synchronized (this ) {
104- value = map .get (key );
105- }
129+ PortmapMapping value = map .get (key );
106130 int res = 0 ;
107131 if (value != null ) {
108132 res = value .getPort ();
@@ -115,13 +139,13 @@ public synchronized XDR getport(int xid, XDR in, XDR out) {
115139 return PortmapResponse .intReply (out , xid , res );
116140 }
117141
118- @ Override
119- public synchronized XDR dump ( int xid , XDR in , XDR out ) {
120- PortmapMapping [] pmapList = null ;
121- synchronized ( this ) {
122- pmapList = new PortmapMapping [ map . values (). size ()];
123- map . values (). toArray ( pmapList );
124- }
142+ /**
143+ * This procedure enumerates all entries in the port mapper's database. The
144+ * procedure takes no parameters and returns a list of program, version,
145+ * protocol, and port values.
146+ */
147+ private XDR dump ( int xid , XDR in , XDR out ) {
148+ PortmapMapping [] pmapList = map . values (). toArray ( new PortmapMapping [ 0 ]);
125149 return PortmapResponse .pmapList (out , xid , pmapList );
126150 }
127151
@@ -131,23 +155,23 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
131155
132156 RpcInfo info = (RpcInfo ) e .getMessage ();
133157 RpcCall rpcCall = (RpcCall ) info .header ();
134- final Procedure portmapProc = Procedure . fromValue ( rpcCall .getProcedure () );
158+ final int portmapProc = rpcCall .getProcedure ();
135159 int xid = rpcCall .getXid ();
136160 XDR in = new XDR (info .data ().toByteBuffer ().asReadOnlyBuffer (),
137161 XDR .State .READING );
138162 XDR out = new XDR ();
139163
140- if (portmapProc == Procedure . PMAPPROC_NULL ) {
164+ if (portmapProc == PMAPPROC_NULL ) {
141165 out = nullOp (xid , in , out );
142- } else if (portmapProc == Procedure . PMAPPROC_SET ) {
166+ } else if (portmapProc == PMAPPROC_SET ) {
143167 out = set (xid , in , out );
144- } else if (portmapProc == Procedure . PMAPPROC_UNSET ) {
168+ } else if (portmapProc == PMAPPROC_UNSET ) {
145169 out = unset (xid , in , out );
146- } else if (portmapProc == Procedure . PMAPPROC_DUMP ) {
170+ } else if (portmapProc == PMAPPROC_DUMP ) {
147171 out = dump (xid , in , out );
148- } else if (portmapProc == Procedure . PMAPPROC_GETPORT ) {
172+ } else if (portmapProc == PMAPPROC_GETPORT ) {
149173 out = getport (xid , in , out );
150- } else if (portmapProc == Procedure . PMAPPROC_GETVERSADDR ) {
174+ } else if (portmapProc == PMAPPROC_GETVERSADDR ) {
151175 out = getport (xid , in , out );
152176 } else {
153177 LOG .info ("PortmapHandler unknown rpc procedure=" + portmapProc );
@@ -161,7 +185,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
161185 RpcResponse rsp = new RpcResponse (buf , info .remoteAddress ());
162186 RpcUtil .sendRpcResponse (ctx , rsp );
163187 }
164-
188+
165189 @ Override
166190 public void channelOpen (ChannelHandlerContext ctx , ChannelStateEvent e )
167191 throws Exception {
0 commit comments