2828import javax .net .SocketFactory ;
2929import javax .security .auth .login .LoginException ;
3030
31+ import org .apache .commons .logging .*;
32+
3133import org .apache .hadoop .conf .Configuration ;
3234import org .apache .hadoop .io .Writable ;
3335import org .apache .hadoop .security .UserGroupInformation ;
4143 * does not give cross-language wire compatibility, since the Hadoop RPC wire
4244 * format is non-standard, but it does permit use of Avro's protocol versioning
4345 * features for inter-Java RPCs. */
44- public class AvroRpc {
46+ class AvroRpcEngine implements RpcEngine {
47+ private static final Log LOG = LogFactory .getLog (RPC .class );
48+
4549 private static int VERSION = 0 ;
4650
51+ // the implementation we tunnel through
52+ private static final RpcEngine ENGINE = new WritableRpcEngine ();
53+
4754 /** Tunnel an Avro RPC request and response through Hadoop's RPC. */
4855 private static interface TunnelProtocol extends VersionedProtocol {
4956 /** All Avro methods and responses go through this. */
@@ -91,8 +98,9 @@ public ClientTransceiver(InetSocketAddress addr,
9198 UserGroupInformation ticket ,
9299 Configuration conf , SocketFactory factory )
93100 throws IOException {
94- this .tunnel = (TunnelProtocol )RPC .getProxy (TunnelProtocol .class , VERSION ,
95- addr , ticket , conf , factory );
101+ this .tunnel =
102+ (TunnelProtocol )ENGINE .getProxy (TunnelProtocol .class , VERSION ,
103+ addr , ticket , conf , factory );
96104 this .remote = addr ;
97105 }
98106
@@ -111,44 +119,48 @@ public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
111119 throw new UnsupportedOperationException ();
112120 }
113121
114- public void close () throws IOException {}
122+ public void close () throws IOException {
123+ ENGINE .stopProxy (tunnel );
124+ }
115125 }
116-
126+
117127 /** Construct a client-side proxy object that implements the named protocol,
118128 * talking to a server at the named address. */
119- public static Object getProxy (Class <?> protocol ,
120- InetSocketAddress addr ,
121- Configuration conf )
129+ public Object getProxy (Class protocol , long clientVersion ,
130+ InetSocketAddress addr , UserGroupInformation ticket ,
131+ Configuration conf , SocketFactory factory )
122132 throws IOException {
123- UserGroupInformation ugi = null ;
133+ return Proxy .newProxyInstance
134+ (protocol .getClassLoader (),
135+ new Class [] { protocol },
136+ new Invoker (protocol , addr , ticket , conf , factory ));
137+ }
138+
139+ /** Stop this proxy. */
140+ public void stopProxy (Object proxy ) {
124141 try {
125- ugi = UserGroupInformation . login ( conf );
126- } catch (LoginException le ) {
127- throw new RuntimeException ( "Couldn't login!" );
142+ (( Invoker ) Proxy . getInvocationHandler ( proxy )). close ( );
143+ } catch (IOException e ) {
144+ LOG . warn ( "Error while stopping " + proxy , e );
128145 }
129- return getProxy (protocol , addr , ugi , conf ,
130- NetUtils .getDefaultSocketFactory (conf ));
131146 }
132147
133- /** Construct a client-side proxy object that implements the named protocol,
134- * talking to a server at the named address. */
135- public static Object getProxy
136- (final Class <?> protocol , final InetSocketAddress addr ,
137- final UserGroupInformation ticket ,
138- final Configuration conf , final SocketFactory factory )
139- throws IOException {
140-
141- return Proxy .newProxyInstance
142- (protocol .getClassLoader (), new Class [] { protocol },
143- new InvocationHandler () {
144- public Object invoke (Object proxy , Method method , Object [] args )
145- throws Throwable {
146- return new ReflectRequestor
147- (protocol ,
148- new ClientTransceiver (addr , ticket , conf , factory ))
149- .invoke (proxy , method , args );
150- }
151- });
148+ private static class Invoker implements InvocationHandler , Closeable {
149+ private final ClientTransceiver tx ;
150+ private final ReflectRequestor requestor ;
151+ public Invoker (Class <?> protocol , InetSocketAddress addr ,
152+ UserGroupInformation ticket , Configuration conf ,
153+ SocketFactory factory ) throws IOException {
154+ this .tx = new ClientTransceiver (addr , ticket , conf , factory );
155+ this .requestor = new ReflectRequestor (protocol , tx );
156+ }
157+ @ Override public Object invoke (Object proxy , Method method , Object [] args )
158+ throws Throwable {
159+ return requestor .invoke (proxy , method , args );
160+ }
161+ public void close () throws IOException {
162+ tx .close ();
163+ }
152164 }
153165
154166 /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
@@ -170,24 +182,20 @@ public BufferListWritable call(final BufferListWritable request)
170182 }
171183 }
172184
173- /** Construct a server for a protocol implementation instance listening on a
174- * port and address. */
175- public static Server getServer (Object impl , String bindAddress , int port ,
176- Configuration conf )
177- throws IOException {
178- return RPC .getServer (new TunnelResponder (impl .getClass (), impl ),
179- bindAddress , port , conf );
180-
185+ public Object [] call (Method method , Object [][] params ,
186+ InetSocketAddress [] addrs , UserGroupInformation ticket ,
187+ Configuration conf ) throws IOException {
188+ throw new UnsupportedOperationException ();
181189 }
182190
183191 /** Construct a server for a protocol implementation instance listening on a
184192 * port and address. */
185- public static RPC .Server getServer (Object impl , String bindAddress , int port ,
186- int numHandlers , boolean verbose ,
187- Configuration conf )
188- throws IOException {
189- return RPC . getServer ( new TunnelResponder (impl . getClass () , impl ),
190- bindAddress , port , numHandlers , verbose , conf );
193+ public RPC .Server getServer (Class iface , Object impl , String bindAddress ,
194+ int port , int numHandlers , boolean verbose ,
195+ Configuration conf ) throws IOException {
196+ return ENGINE . getServer ( TunnelProtocol . class ,
197+ new TunnelResponder (iface , impl ),
198+ bindAddress , port , numHandlers , verbose , conf );
191199 }
192200
193201}
0 commit comments