Vert.x线程与Future

Wed, Feb 13, 2019 阅读时间 1 分钟

Vert.x有两个重要的线程池: Event loop线程池Worker线程池

Event loop线程池

在一个标准的反应器实现中,有一个独立的 Event Loop会循环执行,处理所有到达的事件并传递给处理器处理。Vert.x的工作方式有所不同,每个Vertx实例维护多个Event Loop 线程。Event Loop数量默认为核数的2倍,也可以通过如下方式进行设置:

Vertx vertx = Vertx.vertx(new VertxOptions().setEventLoopPoolSize(5));

这种模式称为Multi-Reactor模式(多反应器模式)。但是需要注意的是,即使一个Vertx实例维护了多个Event Loop,任何一个特定的处理器永远不会被并发执行。大部分情况下(除了Worker Verticle以外)它们总是在同一个Event Loop 线程中被调用。

Handler是依次分发给每一个Event Loop的,且特定的Handler永远不会被并发执行。如果某一个Handler耗时过多,则Handler所在的Event Loop线程会被该Handler所拖慢。

Worker线程池

不要在处理器中阻塞Event loop线程。如果需要执行阻塞代码,需要在worker线程池中之行。Vert.x的Worker Pool数量默认为20,可以通过如下方式设置:

Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(5));

有两种方式可以使代码使用woker线程执行:

  1. **通过调用executeBlocking方法来指定阻塞式代码的执行以及阻塞式代码执行后处理结果的异步回调。**默认情况下,如果executeBlocking在同一个上下文环境中(如:同一个Verticle实例)被调用了多次,那么这些不同的executeBlocking代码块会顺序执行(一个接一个)。若不需要关心您调用executeBlocking的顺序,可以将ordered参数的值设为false。这样任何executeBlocking都会在Worker Pool中并行执行。
  2. 使用Worker Verticle,一个Worker Verticle始终会使用Worker Pool中的某个线程来执行。
vertx.executeBlocking(future -> {
  // 调用一些需要耗费显著执行时间返回结果的阻塞式API
  String result = someAPI.blockingMethod("hello");
  future.complete(result);
}, res -> {
  System.out.println("The result is: " + res.result());
});

可以为不同的用途创建不同的池:

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool");
executor.executeBlocking(future -> {
  // 调用一些需要耗费显著执行时间返回结果的阻塞式API
  String result = someAPI.blockingMethod("hello");
  future.complete(result);
}, res -> {
  System.out.println("The result is: " + res.result());
});

Worker Executor在不需要的时候必须被关闭。当使用同一个名字创建了许多worker时,它们将共享同一个pool。当所有的worker executor调用了close方法被关闭过后,对应的worker pool会被销毁。如果Worker Executor在Verticle中创建,那么Verticle实例销毁的同时Vert.x将会自动关闭这个Worker Executor。

Worker Executor可以在创建的时候配置:

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime); //maxExecuteTime单位为毫秒

Future

Vert.x中的Future可以用来协调多个异步操作的结果。它支持并发组合(并行执行多个异步调用)和顺序组合(依次执行异步调用)。Future接管目标异步处理函数的handle事件,触发自身的completefail事件,最后再触发Future注册的Handler的handle事件。

也可以把Future看作一个事件对象,通过setHandler方法注册Handler监听对象,用来监听Future事件;异步调用函数通过Future的completer方法在函数完成时触发Future事件。

  • setHandler: 设置事件处理Handler。complete方法中会回调Future注册的Handler的handle方法。即:Future发生complete事件后,向Future注册的Handler传递handle事件。

  • complete: Future的complete方法一般由异步处理函数注册的Handler调用,表示请求已结束。即:complete方法表示Future的complete事件。

  • completer:返回包含handle接口实现的Handler。FutureImpl中的实现是返回对象自己。异步函数注册handler时可以注册该方法的返回值。FutureImpl中handle处理过程是:如果结果成功完成,则调用complete方法;否则调用failed方法。即:异步处理函数注册该方法的返回值作为Handler,该Handler在异步处理函数完成,completer的返回值Handler 发生 handle 事件后,向Future 发送complete事件。

Future<HttpServer> httpServerFuture = Future.future();
httpServerFuture.setHandler(...);
httpServer.listen(httpServerFuture.completer());

并发合并

CompositeFuture.all方法接受多个Future对象作为参数(最多6个,或者传入List)。当所有的Future都成功完成,该方法将返回一个成功的Future;当任一个Future执行失败,则返回一个失败的Future:

Future<HttpServer> httpServerFuture = Future.future();
httpServer.listen(httpServerFuture.completer());

Future<NetServer> netServerFuture = Future.future();
netServer.listen(netServerFuture.completer());

CompositeFuture.all(httpServerFuture, netServerFuture).setHandler(ar -> {
  if (ar.succeeded()) {
    // 所有服务器启动完成
  } else {
    // 有一个服务器启动失败
  }
});

CompositeFuture.all 方法注册了自己的Handler,并重新设置了所有Future对象的Handler。