Node.js 这几年火的不要不要的,借助 js 天生的事件驱动机制和 V8 高性能引擎,让编写高并发的 web 应用门槛降低了许多,当然这背后还要得益于 Douglas C. Schmidt 在 1995 年提出的基于事件驱动的 Reactor 模式,让本身只支持单线程执行的 js 能够胜任如今高并发环境下的服务端应用。
不过作为一名服务端开发人员,我对 js 的使用程度并不高,所以也一直没有机会去切身体会 Node.js 的魅力,好在 Reactor 只是一个设计模式,与具体语言和平台无关的。前段时间将负责的项目中的一个比较新的服务引入了 Vert.x 组件进行改造,也算是与 Reactor 模式有了一次亲密接触。Vert.x 是一个被称为运行在 JVM 上的 Node.js,用于在任何层次上编写非阻塞、响应式的模块或服务,关于 Vert.x 的发展历程还多少有些坎坷,具体可以移步官网。
线程硬抗 or 事件驱动
服务端在响应请求设计方面主要可以分为 线程驱动 和 事件驱动 两条主线,前者是大部分 java 服务端开发人员熟知和常用的模式(不要说你不知道 servlet),而后者则是 Reactor 模式的设计基础。
我们先来看一下 线程驱动 的模式设计,这一模式针对每一个请求都创建一个独立的线程。以 web 应用为例,web 服务器会为每一个客户端连接创建一个独立的线程,该线程用于接受请求参数、响应业务逻辑,并最后将结果进行渲染返回给客户端,如下图是对该模式的描绘。
针对内建多线程支持的语言来说,我们通常认为这样的设计是理所当然的,事实也确实如此。基于该模式衍生出了众多的框架和组件,且有数不清的服务正在基于这样的模式运行着,其中也不乏大型项目。然而我们也不能否认这一模式在高并发场景下的乏力,“thread-per-connection” 势必导致相当一部分线程处于阻塞状态,而每一个线程的存活都需要占用一定的操作系统资源,这部分阻塞线程所持有的资源对于操作系统来说是一笔不小的开销。此外,CPU 也不得不在频繁的线程上下文切换上浪费不少的时间,如果遇上一些 I/O 密集型业务,情况会更加糟糕。下面是针对该模式的简单示例实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class Server implements Runnable { private int port; public Server (int port) { this .port = port; } @Override public void run () { try { ServerSocket ss = new ServerSocket(port); System.out.println("Server listening on port: " + this .port); while (!Thread.interrupted()) new Thread(new Handler(ss.accept())).start(); } catch (IOException ex) { ex.printStackTrace(); } } private class Handler implements Runnable { private final Socket socket; public Handler (Socket socket) { this .socket = socket; } @Override public void run () { try { System.out.println("[thread-" + Thread.currentThread().getId() + "] is processing data from client." ); byte [] input = new byte [1024 ]; socket.getInputStream().read(input); byte [] output = this .process(input); socket.getOutputStream().write(output); } catch (IOException ex) { ex.printStackTrace(); } } private byte [] process(byte [] cmd) { } } }
针对 事件驱动 模式来说,则不会为每一个连接都创建一个相应的处理线程,这里的线程数量是既定的,用于执行当前事件类型绑定的业务逻辑。这一模式有些类似于“观察者模式”的工作机制,事件就是被观察的消息。我们可以设置一个 “event-loop”,以单线程的方式不断的循环检查当前发生的具体事件,一旦有新的事件发生,则基于事件类型回调绑定的业务逻辑,而对于业务逻辑的处理则交由另外的线程(池)执行。
因为事件循环检测这一过程是非常轻量化的(计算量非常小),所以单线程即可以满足高并发的需求,但是这也不是绝对的,我们也可以基于实际情况设置多个“event-loop”,从而发挥 CPU 的最大性能。这里执行业务处理的线程数量可以是单线程也可以是线程池,但是不管怎样其目的都是为了在有限的计算资源前提下尽量提高并发量,不过相对于线程驱动的模式来说,事件驱动的模式可以保证线程数量是可控的。
Reactor 设计模式与示例实现
Reactor 是针对事件驱动这一思想的具体设计模式,该模式自被提出以来在多种语言上都有内建或第三方的实现。该模式主要定义了如下几种角色:
Handle :可以理解为操作系统中的句柄,是对资源在操作系统层面上的抽象,例如打开的文件、网络连接(Socket)等。
Synchronous Event Demultiplexer :用于阻塞监听 Handle 中的事件,一般采用操作系统的 select 实现,在 java NIO 中用 Selector 进行封装。
Initiation Dispatcher :用于管理 Event Handler,包括注册、注销等。此外它还是事件的分发器,根据 Synchronous Event Demultiplexer 监听到的事件类型,将其分发给对应的 Event Handler 进行处理。
Event Handler :事件处理器,与具体的事件类型绑定,一般被定义成抽象类或接口,其中声明了钩子方法以让实现类定义具体的处理逻辑。
Concrete Event Handler :Event Handler 实现类。
以上角色交互图如下,所有的 Event Handler 都会注册到 Initiation Dispatcher 上,Synchronous Event Demultiplexer 在应用启动后一直监听操作系统事件,当有新的事件发生时会回调 Initiation Dispatcher 的 handle_events()
方法,该方法会判断当前的事件类型,并调用事件绑定的 Event Handler 处理事件。
上述过程是 Douglas C. Schmidt 在其论文中的描述,参考 Doug Lea 的文章来看还可以描述的更加简单一点。实际上该模式主要包含两个角色:reactor 和 handler。其中 reactor 的主要责任就是用来监听事件(event-loop),并回调事件绑定的已注册的 handler,而 handler 则用来执行事件对应的具体业务逻辑。如下图所示,其中 event-loop 和 dispatcher 都是 ractor 的角色,而 handler 和 acceptor 都注册在 dispatcher 上,其中 acceptor 是特殊的 Handler,用于创建和绑定处理事件的 handler。
再生动一点,reactor 可以类比春风十里里面的老鸨,而 handler 就是菇凉们,自己脑补一下吧(邪恶…)。
说完了理论,下面我们编写一个示例程序来演示 reactor 的工作机制。Java NIO 对 reactor 提供了内建的支持,这里我们以 Socket 连接作为 Handle,即 java NIO 中的 Channel。Channel 注册到 Synchronous Event Demultiplexer 中以监听 Handle 事件(对 ServerSocketChannnel 来说可以是 CONNECT 事件,对 SocketChannel 可以是 READ、WRITE、CLOSE 等事件)。Synchronous Event Demultiplexer 监听事件的过程,对应到 java NIO 则采用 Selector 进行封装,当 Selector.select()
返回时,可以调用 Selector 的 selectedKeys()
方法获取 Set<SelectionKey>
集合,一个 SelectionKey 对象表示一个有事件发生的 Channel 以及对应的事件类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class Reactor implements Runnable { private final Selector selector; private final ServerSocketChannel serverSocketChannel; public Reactor (int port) throws IOException { this .selector = Selector.open(); this .serverSocketChannel = ServerSocketChannel.open(); this .serverSocketChannel.socket().bind(new InetSocketAddress(port)); this .serverSocketChannel.configureBlocking(false ); SelectionKey selectionKey = this .serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); selectionKey.attach(new Acceptor()); } @Override public void run () { System.out.println("Server listening on port: " + serverSocketChannel.socket().getLocalPort()); try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator itr = selected.iterator(); while (itr.hasNext()) { this .dispatch((SelectionKey) (itr.next())); } selected.clear(); } } catch (Exception e) { e.printStackTrace(); } } private void dispatch (SelectionKey key) throws Exception { Runnable acceptor = (Runnable) (key.attachment()); if (acceptor != null ) acceptor.run(); } private class Acceptor implements Runnable { @Override public void run () { try { SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null ) new Handler(selector, socketChannel); } catch (IOException e) { e.printStackTrace(); } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class Handler implements Runnable { private static final int READ = 0 , PROCESS = 1 , WRITE = 2 ; private final SocketChannel socketChannel; private final SelectionKey selectionKey; private static ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private ByteBuffer input = ByteBuffer.allocate(1024 ); private boolean isClosed; private int state = READ; private String data; public Handler (Selector selector, SocketChannel channel) throws IOException { this .socketChannel = channel; this .socketChannel.configureBlocking(false ); this .isClosed = !socketChannel.isConnected(); this .selectionKey = this .socketChannel.register(selector, 0 ); this .selectionKey.attach(this ); this .selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } @Override public void run () { try { if (isClosed) socketChannel.close(); if (READ == state) { this .read(); } else if (WRITE == state) { this .write(); } } catch (IOException e) { try { socketChannel.close(); } catch (IOException e1) { } } } private void read () throws IOException { System.out.println("[thread-" + Thread.currentThread().getId() + "] read data from client." ); int readCount = socketChannel.read(input); if (readCount > 0 ) { state = PROCESS; pool.execute(() -> this .process(readCount)); } else { this .isClosed = true ; } selectionKey.interestOps(SelectionKey.OP_WRITE); } private void process (int readCount) { System.out.println("[thread-" + Thread.currentThread().getId() + "] is processing data." ); StringBuilder sb = new StringBuilder(); input.flip(); byte [] subStringBytes = new byte [readCount]; byte [] array = input.array(); System.arraycopy(array, 0 , subStringBytes, 0 , readCount); sb.append(new String(subStringBytes)); input.clear(); this .data = sb.toString().trim(); state = WRITE; } private void write () throws IOException { System.out.println("[thread-" + Thread.currentThread().getId() + "] write data to client : " + this .data); ByteBuffer output = ByteBuffer.wrap(("Hello " + this .data + "\n" ).getBytes()); socketChannel.write(output); selectionKey.interestOps(SelectionKey.OP_READ); state = READ; } }
示例程序以 Doug Lea 大师在 “Scalable IO in Java ” 文章中的例子为原型并做了一些更改。该示例采用一个线程不断的监听客户端请求(具体实现时可以依据需要选择实现多个监听器),一旦有新的 socket 连接就会创建一个与之绑定的 Handler,并读取请求数据,至于对数据的处理则交由线程池中的线程进行。这里默认我们设置线程池的大小为当前宿主机核心数,并使用一个单线程不断的监听请求事件,在这样的设计下,不管客户端有多少连接并发量,服务端的线程数始终是 (核心数 + 1),我们甚至可以只用 2 个线程来处理客户端的所有请求(一个负责监听事件,一个用于处理事件)。
对应的客户端测试程序如下,真实环境下客户端的请求是不应该设置上限的,这里我们设置了 1024 个请求线程也只是为了演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class Client { private String host; private int port; public Client (String host, int port) { this .host = host; this .port = port; } private void sayHello () throws Exception { ExecutorService es = Executors.newCachedThreadPool(); List<Callable<Boolean>> tasks = new ArrayList<>(); for (int i = 0 ; i < 1024 ; i++) { tasks.add(() -> { Socket socket = null ; PrintWriter out = null ; BufferedReader in = null ; try { socket = new Socket(host, port); out = new PrintWriter(socket.getOutputStream(), true ); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); System.out.println("Client[" + Thread.currentThread().getId() + "] connect success, host : " + host + " port: " + port); String hay = RandomStringUtils.randomAlphanumeric(32 ); out.println(hay); String msg = in.readLine().trim(); System.out.println("Client[" + Thread.currentThread().getId() + "] receive data from server : " + msg); if (!("Hello " + hay).equals(msg)) { System.err.println("expect : " + hay + ", but : " + msg); System.exit(-1 ); } return true ; } catch (Exception e) { e.printStackTrace(); } finally { if (null != out) out.close(); if (null != in) in.close(); if (null != socket) socket.close(); } return false ; }); } List<Future<Boolean>> futures = es.invokeAll(tasks); for (final Future<Boolean> future : futures) { future.get(); } TimeUnit.SECONDS.sleep(5 ); es.shutdown(); } }
说了这么多,我们最后再来谈谈 Reactor 模式的不足,毕竟完美的事物是不存在的,Reactor 的不足主要表现在如下几个方面:
相对于传统模型来说,Reactor 在思想上稍显复杂性,因此也增加了实现和使用的门槛,并且不易于调试。
需要底层 Synchronous Event Demultiplexer 支持,比如 java 中的 Selector,操作系统的 select 等,如果要自己实现可能不会那么高效。
在 IO 读写数据时仍然在同一个线程中实现的,即使实现了多个 reactor,那些共享同一个 reactor 的 channel 如果执行长时间的数据读写,也会影响这个 reactor 中其他 channel 的响应时间。比如在大文件传输时,IO 操作就会影响其他 client 的响应时间,因而对这种操作,使用传统的 “thread-per-connection” 或许是更好的选择,或者使用 Proactor 模式。
参考
Reactor: An Object Behavioral Pattern forDemultiplexing and Dispatching Handles for Synchronous Events
Scalable IO in Java
Reactor Pattern Explained
Reactor模式详解