Hadoop2源码分析-RPC探索实战详解大数据

1.概述

  在《Hadoop2源码分析-RPC机制初识》博客中,我们对RPC机制有了初步的认识和了解,下面我们对Hadoop V2的RPC机制做进一步探索,在研究Hadoop V2的RPC机制,我们需要掌握相关的Java基础知识,如:Java NIO、动态代理与反射等。本篇博客介绍的内容目录如下所示:

  • Java NIO简述
  • Java NIO实例演示
  • 动态代理与反射简述
  • 动态代理与反射实例演示
  • Hadoop V2 RPC框架使用实例

  下面开始今天的博客介绍。

2.Java NIO简述

  Java NIO又称Java New IO,它替代了Java IO API,提供了与标准IO不同的IO工作方式。Java NIO由一下核心组件组成:

  • Channels:连接通道,即能从通道读取数据,又能写数据到通道。可以异步读写,读写从Buffer开始。
  • Buffers:消息缓冲区,用于和NIO通道进行交互。所谓缓冲区,它是一块可以读写的内存,该内存被封装成NIO的Buffer对象,并提供相应的方法,以便于访问。
  • Selectors:通道管理器,它能检测到Java NIO中多个通道,单独的线程可以管理多个通道,间接的管理多个网络连接。

  下图为Java NIO的工作原理图,如下图所示:

Hadoop2源码分析-RPC探索实战详解大数据

3.Java NIO实例演示

  • NIOServer

  首先,我们来看NIOServer的代码块。代码内容如下所示:

package cn.hadoop.nio; 
 
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.util.Iterator; 
 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
 
import cn.hadoop.conf.ConfigureAPI; 
 
/** 
 * @Date May 8, 2015 
 * 
 * @Author dengjie 
 * 
 * @Note Defined nio server 
 */ 
public class NIOServer { 
 
    private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); 
 
    // The channel manager 
    private Selector selector; 
 
    /** 
     * Get ServerSocket channel and initialize 
     *  
     * 1.Get a ServerSocket channel 
     *  
     * 2.Set channel for non blocking 
     *  
     * 3.The channel corresponding to the ServerSocket binding to port port 
     *  
     * 4.Get a channel manager 
     *  
     * 5.The channel manager and the channel binding, and the channel registered 
     * SelectionKey.OP_ACCEPT event 
     *  
     * @param port 
     * @throws IOException 
     */ 
    public void init(int port) throws IOException { 
        ServerSocketChannel serverChannel = ServerSocketChannel.open(); 
        serverChannel.configureBlocking(false); 
        serverChannel.socket().bind(new InetSocketAddress(port)); 
        this.selector = Selector.open(); 
        serverChannel.register(selector, SelectionKey.OP_ACCEPT); 
    } 
 
    /** 
     * listen selector 
     *  
     * @throws IOException 
     */ 
    public void listen() throws IOException { 
        LOGGER.info("Server has start success"); 
        while (true) { 
            selector.select(); 
            Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); 
            while (ite.hasNext()) { 
                SelectionKey key = (SelectionKey) ite.next(); 
                ite.remove(); 
                if (key.isAcceptable()) { 
                    ServerSocketChannel server = (ServerSocketChannel) key.channel(); 
                    SocketChannel channel = server.accept(); 
                    channel.configureBlocking(false);// 非阻塞 
                    channel.write(ByteBuffer.wrap(new String("Send test info to client").getBytes())); 
                    channel.register(this.selector, SelectionKey.OP_READ);// 设置读的权限 
                } else if (key.isReadable()) { 
                    read(key); 
                } 
            } 
        } 
    } 
 
    /** 
     * Deal client send event 
     */ 
    public void read(SelectionKey key) throws IOException { 
        SocketChannel channel = (SocketChannel) key.channel(); 
        ByteBuffer buffer = ByteBuffer.allocate(1024); 
        channel.read(buffer); 
        byte[] data = buffer.array(); 
        String info = new String(data).trim(); 
        LOGGER.info("Server receive info : " + info); 
        ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes()); 
        channel.write(outBuffer);// 将消息回送给客户端 
    } 
 
    public static void main(String[] args) { 
        try { 
            NIOServer server = new NIOServer(); 
            server.init(ConfigureAPI.ServerAddress.NIO_PORT); 
            server.listen(); 
        } catch (Exception ex) { 
            ex.printStackTrace(); 
            LOGGER.error("NIOServer main run error,info is " + ex.getMessage()); 
        } 
    } 
}
  • NIOClient

  然后,我们在来看NIOClient的代码块,代码具体内容如下所示:

package cn.hadoop.nio; 
 
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.SocketChannel; 
import java.util.Iterator; 
 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
 
import cn.hadoop.conf.ConfigureAPI; 
 
/** 
 * @Date May 8, 2015 
 * 
 * @Author dengjie 
 * 
 * @Note Defined NIO client 
 */ 
public class NIOClient { 
 
    private static final Logger LOGGER = LoggerFactory.getLogger(NIOClient.class); 
 
    private Selector selector; 
 
    /** 
     * Get ServerSocket channel and initialize 
     */ 
    public void init(String ip, int port) throws Exception { 
        SocketChannel channel = SocketChannel.open(); 
        channel.configureBlocking(false); 
        this.selector = Selector.open(); 
        channel.connect(new InetSocketAddress(ip, port)); 
        channel.register(selector, SelectionKey.OP_CONNECT); 
    } 
 
    /** 
     * listen selector 
     */ 
    public void listen() throws Exception { 
        while (true) { 
            selector.select(); 
            Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); 
            while (ite.hasNext()) { 
                SelectionKey key = (SelectionKey) ite.next(); 
                ite.remove(); 
                if (key.isConnectable()) { 
                    SocketChannel channel = (SocketChannel) key.channel(); 
                    if (channel.isConnectionPending()) { 
                        channel.finishConnect(); 
                    } 
                    channel.configureBlocking(false);// 非阻塞 
 
                    channel.write(ByteBuffer.wrap(new String("Send test info to server").getBytes())); 
                    channel.register(this.selector, SelectionKey.OP_READ); 
                } else if (key.isReadable()) { 
                    read(key); 
                } 
 
            } 
 
        } 
    } 
 
    /** 
     * Deal client send event 
     */ 
    public void read(SelectionKey key) throws IOException { 
        SocketChannel channel = (SocketChannel) key.channel(); 
        ByteBuffer buffer = ByteBuffer.allocate(1024); 
        channel.read(buffer); 
        byte[] data = buffer.array(); 
        String info = new String(data).trim(); 
        LOGGER.info("Client receive info : " + info); 
        ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes()); 
        channel.write(outBuffer); 
    } 
 
    public static void main(String[] args) { 
        try { 
            NIOClient client = new NIOClient(); 
            client.init(ConfigureAPI.ServerAddress.NIO_IP, ConfigureAPI.ServerAddress.NIO_PORT); 
            client.listen(); 
        } catch (Exception ex) { 
            ex.printStackTrace(); 
            LOGGER.error("NIOClient main run has error,info is " + ex.getMessage()); 
        } 
    } 
}
  • ConfigureAPI

  下面给出ConfigureAPI类的代码,内容如下所示:

package cn.hadoop.conf; 
 
/** 
 * @Date May 7, 2015 
 * 
 * @Author dengjie 
 * 
 * @Note Defined rpc info 
 */ 
public class ConfigureAPI { 
 
    public interface VersionID { 
        public static final long RPC_VERSION = 7788L; 
    } 
 
    public interface ServerAddress { 
        public static final int NIO_PORT = 8888; 
        public static final String NIO_IP = "127.0.0.1"; 
    } 
 
}

4.动态代理和反射简述

  在Java中,动态代理主要用来做方法的增强,可以在不修改源码的情况下,增强一些方法。另外,还有一个作用就是做远程调用,比如现在有Java接口,该接口的实现部署在非本地服务器上,在编写客户端代码时,由于没法直接生成该对象,这个时候就需要考虑使用动态代理了。

  而反射,利用了Class类作为反射实例化对象的基本应用,对于一个实例化对象而言,它需要调用类中的构造方法,属性和一般方法,这些操作都可以通过反射机制来完成。下面我们用一个实例来理解这些理论。

5.动态代理和反射实例演示

5.1动态代理

  • JProxy
package cn.java.base; 
 
import java.lang.reflect.InvocationHandler; 
import java.lang.reflect.Method; 
import java.lang.reflect.Proxy; 
 
/** 
 * @Date May 7, 2015 
 *  
 * @Author dengjie 
 */ 
public class JProxy { 
     
    public static void main(String[] args) { 
        JInvocationHandler ji = new JInvocationHandler(); 
        Subject sub = (Subject) ji.bind(new RealSubject()); 
        System.out.println(sub.say("dengjie", 25)); 
    } 
 
} 
 
interface Subject { 
    public String say(String name, int age); 
} 
 
class RealSubject implements Subject { 
 
    @Override 
    public String say(String name, int age) { 
        return name + "," + age; 
    } 
 
} 
 
class JInvocationHandler implements InvocationHandler { 
 
    private Object object = null; 
 
    public Object bind(Object object) { 
        this.object = object; 
        return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), this); 
    } 
 
    @Override 
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 
        Object tmp = method.invoke(this.object, args); 
        return tmp; 
    } 
 
}

5.2反射

  • JReflect
package cn.java.base; 
 
/** 
 * @Date May 7, 2015 
 *  
 * @Author dengjie 
 */ 
public class JReflect { 
    public static void main(String[] args) { 
        Fruit f = Factory.getInstance(Orange.class.getName()); 
        if (f != null) { 
            f.eat(); 
        } 
    } 
} 
 
interface Fruit { 
    public abstract void eat(); 
} 
 
class Apple implements Fruit { 
 
    @Override 
    public void eat() { 
        System.out.println("apple"); 
    } 
 
} 
 
class Orange implements Fruit { 
 
    @Override 
    public void eat() { 
        System.out.println("orange"); 
    } 
 
} 
 
class Factory { 
    public static Fruit getInstance(String className) { 
        Fruit f = null; 
        try { 
            f = (Fruit) Class.forName(className).newInstance(); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
        return f; 
    } 
}

6.Hadoop V2 RPC框架使用实例

  本实例主要演示通过Hadoop V2的RPC框架实现一个计算两个整数的Add和Sub,服务接口为 CaculateService ,继承于 VersionedProtocol ,具体代码如下所示:

  • CaculateService
package cn.hadoop.service; 
 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.ipc.ProtocolInfo; 
import org.apache.hadoop.ipc.VersionedProtocol; 
 
import cn.hadoop.conf.ConfigureAPI; 
 
/** 
 * @Date May 7, 2015 
 * 
 * @Author dengjie 
 * 
 * @Note Data calculate service interface 
 */ 
@ProtocolInfo(protocolName = "", protocolVersion = ConfigureAPI.VersionID.RPC_VERSION) 
public interface CaculateService extends VersionedProtocol { 
 
    // defined add function 
    public IntWritable add(IntWritable arg1, IntWritable arg2); 
 
    // defined sub function 
    public IntWritable sub(IntWritable arg1, IntWritable arg2); 
 
}

  注意,本工程使用的是Hadoop-2.6.0版本,这里CaculateService接口需要加入注解,来声明版本号。

  CaculateServiceImpl类实现CaculateService接口。代码如下所示:

  • CaculateServiceImpl
package cn.hadoop.service.impl; 
 
import java.io.IOException; 
 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.ipc.ProtocolSignature; 
 
import cn.hadoop.conf.ConfigureAPI; 
import cn.hadoop.service.CaculateService; 
 
/** 
 * @Date May 7, 2015 
 * 
 * @Author dengjie 
 * 
 * @Note Implements CaculateService class 
 */ 
public class CaculateServiceImpl implements CaculateService { 
 
    public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException { 
        return this.getProtocolSignature(arg0, arg1, arg2); 
    } 
 
    /** 
     * Check the corresponding version 
     */ 
    public long getProtocolVersion(String arg0, long arg1) throws IOException { 
        return ConfigureAPI.VersionID.RPC_VERSION; 
    } 
 
    /** 
     * Add nums 
     */ 
    public IntWritable add(IntWritable arg1, IntWritable arg2) { 
        return new IntWritable(arg1.get() + arg2.get()); 
    } 
 
    /** 
     * Sub nums 
     */ 
    public IntWritable sub(IntWritable arg1, IntWritable arg2) { 
        return new IntWritable(arg1.get() - arg2.get()); 
    } 
 
}

  CaculateServer服务类,对外提供服务,具体代码如下所示:

  • CaculateServer
package cn.hadoop.rpc; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.ipc.RPC; 
import org.apache.hadoop.ipc.RPC.Server; 
import org.slf4j.LoggerFactory; 
import org.slf4j.Logger; 
 
import cn.hadoop.service.CaculateService; 
import cn.hadoop.service.impl.CaculateServiceImpl; 
 
/** 
 * @Date May 7, 2015 
 * 
 * @Author dengjie 
 * 
 * @Note Server Main 
 */ 
public class CaculateServer { 
 
    private static final Logger LOGGER = LoggerFactory.getLogger(CaculateServer.class); 
 
    public static final int IPC_PORT = 9090; 
 
    public static void main(String[] args) { 
        try { 
            Server server = new RPC.Builder(new Configuration()).setProtocol(CaculateService.class) 
                    .setBindAddress("127.0.0.1").setPort(IPC_PORT).setInstance(new CaculateServiceImpl()).build(); 
            server.start(); 
            LOGGER.info("CaculateServer has started"); 
            System.in.read(); 
        } catch (Exception ex) { 
            ex.printStackTrace(); 
            LOGGER.error("CaculateServer server error,message is " + ex.getMessage()); 
        } 
    } 
 
}

  注意,在Hadoop V2版本中,获取RPC下的Server对象不能在使用RPC.getServer()方法了,该方法已被移除,取而代之的是使用Builder方法来构建新的Server对象。

  RPCClient客户端类,用于访问Server端,具体代码实现如下所示:

  • RPCClient
package cn.hadoop.rpc; 
 
import java.net.InetSocketAddress; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.ipc.RPC; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
 
import cn.hadoop.service.CaculateService; 
 
/** 
 * @Date May 7, 2015 
 * 
 * @Author dengjie 
 * 
 * @Note RPC Client Main 
 */ 
public class RPCClient { 
 
    private static final Logger LOGGER = LoggerFactory.getLogger(RPCClient.class); 
 
    public static void main(String[] args) { 
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", CaculateServer.IPC_PORT); 
        try { 
            RPC.getProtocolVersion(CaculateService.class); 
            CaculateService service = (CaculateService) RPC.getProxy(CaculateService.class, 
                    RPC.getProtocolVersion(CaculateService.class), addr, new Configuration()); 
            int add = service.add(new IntWritable(2), new IntWritable(3)).get(); 
            int sub = service.sub(new IntWritable(5), new IntWritable(2)).get(); 
            LOGGER.info("2+3=" + add); 
            LOGGER.info("5-2=" + sub); 
        } catch (Exception ex) { 
            ex.printStackTrace(); 
            LOGGER.error("Client has error,info is " + ex.getMessage()); 
        } 
    } 
 
}

  Hadoop V2 RPC服务端截图预览,如下所示:

Hadoop2源码分析-RPC探索实战详解大数据

  Hadoop V2 RPC客户端截图预览,如下所示:

Hadoop2源码分析-RPC探索实战详解大数据

7.总结

  Hadoop V2 RPC框架对Socket通信进行了封装,定义了自己的基类接口VersionProtocol。该框架需要通过网络以序列化的方式传输对象,关于Hadoop V2的序列化可以参考《Hadoop2源码分析-序列化篇》,传统序列化对象较大。框架内部实现了基于Hadoop自己的服务端对象和客户端对象。服务端对象通过new RPC.Builder().builder()的方式来获取,客户端对象通过RPC.getProxy()的方式来获取。并且都需要接受Configuration对象,该对象实现了Hadoop相关文件的配置。

8.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9940.html

(0)
上一篇 2021年7月19日 11:43
下一篇 2021年7月19日 11:43

相关推荐

发表回复

登录后才能评论