上一篇介绍了Unix系统支持的I/O模型,以及相应的在Java中的实现方式。本篇重点讲解一下Reactor模型原理和实现机制。
I/O多路复用又被称为”事件驱动”,就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),
能够通知程序进行相应的读写操作,技术上是通过调用操作系统的select、pselect、poll、epoll来实现。
与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。
Reactor是一种应用在服务器端的开发模式,目的是提高服务端程序的并发能力,其实就是实现了I/O多路复用这种I/O模型。
图解Reactor模式
为了形象的解释这个模式,我用一个餐厅点餐的例子来直观的说明一下。
传统的多线程方式
每个顾客都有自己的服务团队(线程),在人少的情况下是可以良好的运作的,流程如下:
- 服务员给出菜单,并等待点菜
- 顾客查看菜单,并点菜
- 服务员把菜单交给厨师,厨师照着做菜
- 厨师做好菜后端到餐桌上
如下图所示
现在餐厅的口碑好,顾客人数不断增加,这时服务员就有点处理不过来了。这时老板发现,每个服务员在服务完客人后,都要去休息一下,
因此老板就说,”你们都别休息了,在旁边待命”。这样可能 10 个服务员也来得及服务 20 个顾客了。这也是”线程池”的方式,
通过重用线程来减少线程的创建和销毁时间,从而提高性能。
Reactor方式
但是客人又进一步增加了,仅仅靠剥削服务员的休息时间也没有办法服务这么多客人。老板仔细观察,
发现其实服务员并不是一直在干活的,大部分时间他们只是站在餐桌旁边等客人点菜。
于是老板就对服务员说,客人点菜的时候你们就别傻站着了,先去服务其它客人,有客人点好的时候喊你们再过去。对应于下图:
最后,老板发现根本不需要那么多的服务员,于是裁了一波员,最终甚至可以只有一个服务员。
这就是 Reactor 模式的核心思想:减少等待。当遇到需要等待 IO 时,先释放资源,而在 IO 完成时,
再通过事件驱动 (event driven) 的方式,继续接下来的处理。从整体上减少了资源的消耗。
Java中实现Reactor模式
本篇详细讲解一下Java中如何来实现Reactor模式。
Reactor模式又分为三种实现方式
经典模式
精典的Reactor模式示意图如下所示。
在Reactor模式中,包含如下角色
- Reactor 将I/O事件发派给对应的Handler
- Acceptor 处理客户端连接请求
- Handlers 执行非阻塞读/写
最简单的Reactor模式实现代码如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class NIOServer { private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(1234)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = acceptServerSocketChannel.accept(); socketChannel.configureBlocking(false); LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress()); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int count = socketChannel.read(buffer); if (count <= 0) { socketChannel.close(); key.cancel(); LOGGER.info("Received invalide data, close the connection"); continue; } LOGGER.info("Received message {}", new String(buffer.array())); } keys.remove(key); } } } }
|
从上示代码中可以看到,多个Channel可以注册到同一个Selector对象上,实现了一个线程同时监控多个请求状态(Channel)。
同时注册时需要指定它所关注的事件,例如上示代码中socketServerChannel对象只注册了OP_ACCEPT事件,
而socketChannel对象只注册了OP_READ事件。
selector.select()是阻塞的,当有至少一个通道可用时该方法返回可用通道个数。同时该方法只捕获Channel注册时指定的所关注的事件。
多工作线程模式
经典Reactor模式中,尽管一个线程可同时监控多个请求(Channel),但是所有读/写请求以及对新连接请求的处理都在同一个线程中处理,
无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理。因此可以引入多线程,并行处理多个读/写操作,如下图所示。
多线程Reactor模式示例代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class NIOServer { private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(1234)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { if(selector.selectNow() < 0) { continue; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while(iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = acceptServerSocketChannel.accept(); socketChannel.configureBlocking(false); LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress()); SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); readKey.attach(new Processor()); } else if (key.isReadable()) { Processor processor = (Processor) key.attachment(); processor.process(key); } } } } }
|
从上示代码中可以看到,注册完SocketChannel的OP_READ事件后,可以对相应的SelectionKey
绑定一个对象(本例中绑定了一个Processor对象,该对象处理读请求),并且在获取到可读事件后,可以取出该对象。
具体的读请求处理在如下所示的Processor类中。该类中设置了一个静态的线程池处理所有请求。而process方法并不直接处理I/O请求,
而是把该I/O操作提交给上述线程池去处理,这样就充分利用了多线程的优势,同时将对新连接的处理和读/写操作的处理放在了不同的线程中,
读/写操作不再阻塞对新连接请求的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class Processor { private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class); private static final ExecutorService service = Executors.newFixedThreadPool(16); public void process(SelectionKey selectionKey) { service.submit(() -> { ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); int count = socketChannel.read(buffer); if (count < 0) { socketChannel.close(); selectionKey.cancel(); LOGGER.info("{}\t Read ended", socketChannel); return null; } else if(count == 0) { return null; } LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array())); return null; }); } }
|
多Reactor模式
Netty中使用的Reactor模式,引入了多Reactor,也即一个主Reactor负责监控所有的连接请求,多个子Reactor负责监控并处理读/写请求,
减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟。并且每个子Reactor分别属于一个独立的线程,
每个成功连接后的Channel的所有操作由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一个线程中,
避免了不必要的上下文切换,同时也方便了监控请求响应状态。
多Reactor模式示意图如下所示。
多Reactor示例代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class NIOServer { private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(1234)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); int coreNum = Runtime.getRuntime().availableProcessors(); Processor[] processors = new Processor[coreNum]; for (int i = 0; i < processors.length; i++) { processors[i] = new Processor(); } int index = 0; while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { keys.remove(key); if (key.isAcceptable()) { ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = acceptServerSocketChannel.accept(); socketChannel.configureBlocking(false); LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress()); Processor processor = processors[(int) ((index++) % coreNum)]; processor.addChannel(socketChannel); processor.wakeup(); } } } } }
|
如上代码所示,本文设置的子Reactor个数是当前机器可用核数的两倍(与Netty默认的子Reactor个数一致)。
对于每个成功连接的SocketChannel,通过round robin的方式交给不同的子Reactor。
子Reactor对SocketChannel的处理如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class Processor { private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class); private static final ExecutorService service = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors()); private Selector selector; public Processor() throws IOException { this.selector = SelectorProvider.provider().openSelector(); start(); } public void addChannel(SocketChannel socketChannel) throws ClosedChannelException { socketChannel.register(this.selector, SelectionKey.OP_READ); } public void wakeup() { this.selector.wakeup(); } public void start() { service.submit(() -> { while (true) { if (selector.select(500) <= 0) { continue; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); int count = socketChannel.read(buffer); if (count < 0) { socketChannel.close(); key.cancel(); LOGGER.info("{}\t Read ended", socketChannel); continue; } else if (count == 0) { LOGGER.info("{}\t Message size is 0", socketChannel); continue; } else { LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array())); } } } } }); } }
|
在Processor中,同样创建了一个静态的线程池,且线程池的大小为机器核数的两倍。每个Processor实例均包含一个Selector实例。
同时每次获取Processor实例时均提交一个任务到该线程池,并且该任务正常情况下一直循环处理,不会停止。
而提交给该Processor的SocketChannel通过在其Selector注册事件,加入到相应的任务中。
由此实现了每个子Reactor包含一个Selector对象,并由一个独立的线程处理。