做网站签订合同兔子bt樱桃搜索磁力天堂
文章目录
- Pre
- Netty主从Reactor线程模型
- 服务端channel注册流程
- 源码解读
- 入口 `serverBootstrap.bind(port)`
- 源码流程图

Pre
Netty Review - ServerBootstrap源码解析
Netty Review - NioServerSocketChannel源码分析
Netty主从Reactor线程模型
Netty 使用主从 Reactor 线程模型来处理并发连接和网络事件。
在 Netty 中,通常有两种类型的线程池:
-
Boss 线程池:用于接受客户端连接请求,并将接受到的连接注册到 Worker 线程池的 EventLoop 中。Boss 线程池中的线程负责监听 ServerSocketChannel,并将接受到的连接分配给 Worker 线程池中的某个 EventLoop 处理。
-
Worker 线程池:每个 Worker 线程池包含多个 EventLoop,每个 EventLoop 负责处理一组连接的读写和事件处理。当一个连接被注册到某个 Worker 线程池的 EventLoop 中时,该 EventLoop 将负责处理这个连接的所有事件,包括读取数据、写入数据、处理网络事件等。
主从 Reactor 线程模型的工作流程如下:
-
主线程池(Boss 线程池)负责监听 ServerSocketChannel 上的连接请求,并将接受到的连接请求分配给 Worker 线程池中的某个 EventLoop。
-
Worker 线程池中的每个 EventLoop 都独立负责一组连接的读写和事件处理。当一个连接被注册到某个 EventLoop 上时,该 EventLoop 将会不断地轮询连接上是否有可读事件或可写事件,并在事件发生时进行相应的处理。
-
当有读写事件发生时,EventLoop 将调用对应的 ChannelHandler 进行处理。这些 ChannelHandler 可以进行数据解析、业务逻辑处理等操作。
-
处理完事件后,EventLoop 可能会将结果写回到连接中,或者关闭连接等。
通过主从 Reactor 线程模型,Netty 可以高效地处理大量的并发连接和网络事件,提高了网络应用程序的性能和可扩展性。
服务端channel注册流程
在Netty中,服务端Channel注册流程涉及以下几个关键步骤:
-
创建ServerBootstrap实例: 首先,需要创建一个ServerBootstrap实例,它是Netty提供的用于启动服务端的引导类。
-
配置ServerBootstrap: 使用ServerBootstrap实例,设置一系列参数,包括线程模型、Channel类型、处理器等。
-
绑定端口并启动服务: 调用ServerBootstrap的bind方法,指定端口并启动服务端。在bind方法内部,会进行以下操作:
-
创建NioServerSocketChannel实例:用于表示服务端的Channel,内部封装了Java NIO中的ServerSocketChannel。
-
初始化ChannelPipeline:为NioServerSocketChannel实例创建一个ChannelPipeline对象,用于管理ChannelHandler链。
-
创建ChannelInitializer并添加到ChannelPipeline:ChannelInitializer是一个特殊的ChannelHandler,它用于在Channel注册到EventLoop之后初始化ChannelPipeline。在ChannelInitializer的initChannel方法中,可以向ChannelPipeline中添加自定义的ChannelHandler。
-
获取EventLoopGroup并注册Channel:从ServerBootstrap中获取Boss EventLoopGroup,然后调用其register方法注册NioServerSocketChannel到EventLoop上。
-
-
注册Channel到EventLoop: 在调用register方法时,会将NioServerSocketChannel注册到Boss EventLoop上。在注册过程中,会执行以下操作:
-
获取EventLoop:根据配置,从Boss EventLoopGroup中选择一个EventLoop。
-
调用EventLoop的register方法:将NioServerSocketChannel注册到选定的EventLoop上。注册过程中,会创建一个NioServerSocketChannelUnsafe实例来处理注册过程,其中会调用底层的Java NIO方法将ServerSocketChannel注册到Selector上,并监听ACCEPT事件。
-
-
事件处理: 一旦NioServerSocketChannel注册到了EventLoop上,就会开始监听ACCEPT事件。当有新的连接接入时,会触发ACCEPT事件,EventLoop会调用相关的ChannelHandler进行处理,如调用ChannelInitializer的initChannel方法,添加用户自定义的ChannelHandler到新的连接的ChannelPipeline中。接着,新的连接就可以接受和处理客户端的请求了。
通过以上流程,服务端Channel在Netty中的注册过程就完成了,它可以接受客户端的连接,并将连接注册到EventLoop上进行事件处理。
源码解读
当我们梳理完
Netty Review - ServerBootstrap源码解析
Netty Review - NioServerSocketChannel源码分析
接下来让我们从下面这一行代码开始
ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
channelFuture.channel().closeFuture().sync();
这段代码用于启动服务端并阻塞当前线程直到服务端关闭。
-
serverBootstrap.bind(9000)
:调用serverBootstrap
的bind()
方法绑定端口9000,并返回一个ChannelFuture
对象,表示绑定操作的异步结果。 -
.sync()
:调用sync()
方法阻塞当前线程,直到绑定操作完成。这样做是为了确保服务端在端口绑定完成后再继续执行后续代码。 -
channelFuture.channel().closeFuture().sync()
:获取channelFuture
中的channel()
,然后调用其closeFuture()
方法获取一个表示关闭操作的ChannelFuture
对象。接着,再次调用sync()
方法阻塞当前线程,直到关闭操作完成。这样做是为了让当前线程一直等待直到服务端关闭。
入口 serverBootstrap.bind(port)
这段代码是bind(int inetPort)
方法的实现。让我们逐步解释其含义:
/*** Create a new {@link Channel} and bind it.*/
public ChannelFuture bind(int inetPort) {// 调用bind方法,传入一个InetSocketAddress对象,该对象使用指定的端口号创建return bind(new InetSocketAddress(inetPort));
}
创建一个新的Channel并绑定到指定的端口。
doBind(final SocketAddress localAddress)
这段代码是doBind(final SocketAddress localAddress)
方法的实现。
private ChannelFuture doBind(final SocketAddress localAddress) {// 初始化并注册Channel,并返回一个ChannelFuturefinal ChannelFuture regFuture = initAndRegister();// 获取注册完成的Channelfinal Channel channel = regFuture.channel();// 如果注册过程中发生异常,则直接返回注册的ChannelFutureif (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// 如果注册已经完成,则创建一个新的ChannelPromise,并执行绑定操作ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// 如果注册尚未完成,则创建一个PendingRegistrationPromise,并添加一个监听器等待注册完成后再执行绑定操作final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// 如果注册过程中发生异常,则直接设置失败状态promise.setFailure(cause);} else {// 注册成功后执行绑定操作promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}
这段代码的作用是执行绑定操作,并返回一个与绑定操作相关的ChannelFuture。
initAndRegister()
final ChannelFuture initAndRegister() {Channel channel = null;try {// 使用channelFactory创建一个新的Channel实例channel = channelFactory.newChannel();// 对新创建的Channel进行初始化init(channel);} catch (Throwable t) {if (channel != null) {// 如果初始化过程中发生异常,关闭Channelchannel.unsafe().closeForcibly();// 创建一个新的ChannelPromise,并设置失败状态return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// 如果channel为null,则创建一个FailedChannel实例,并设置失败状态return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 使用ChannelConfig的EventLoopGroup进行注册ChannelFuture regFuture = config().group().register(channel);// 如果注册过程中发生异常,则关闭Channelif (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// 返回注册的ChannelFuturereturn regFuture;
}
创建一个新的Channel实例并对其进行初始化,然后使用EventLoopGroup将其注册到事件循环中。最后返回一个与注册操作相关的ChannelFuture。
channelFactory.newChannel()
channelFactory.newChannel() 中的实现,请移步 Netty Review - NioServerSocketChannel源码分析
init(channel)
@Override
void init(Channel channel) throws Exception {// 设置Channel的选项final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);}// 设置Channel的属性final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}// 获取Channel的PipelineChannelPipeline p = channel.pipeline();// 复制当前的子组、子处理器、子选项和子属性final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}// 向Channel的Pipeline中添加一个ChannelInitializerp.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {// 添加用户配置的处理器到Pipeline中pipeline.addLast(handler);}// 在Channel的事件循环中执行ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 添加一个ServerBootstrapAcceptor到Pipeline中,用于接收新连接pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}
init()
方法的作用是初始化Channel
,设置Channel
的选项和属性,然后向Channel
的Pipeline
中添加一个ChannelInitializer
,该Initializer
在Channel
的事件循环中执行,并向Pipeline
中添加一个ServerBootstrapAcceptor
,用于接收新连接。
config().group().register(channel)
@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}
next()
io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser#next
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {// 使用原子整数来维护索引,以确保在多线程环境中安全地获取下一个 EventExecutor 实例。private final AtomicInteger idx = new AtomicInteger();// 存储所有可用的 EventExecutor 实例的数组。private final EventExecutor[] executors;// 构造方法,接收一个 EventExecutor 实例数组作为参数,并将其存储在 executors 成员变量中。PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}// 选择下一个要使用的 EventExecutor 实例。// 通过对索引进行按位与操作(idx.getAndIncrement() & executors.length - 1),// 来确保索引始终在 executors 数组的有效范围内。// 由于 executors.length 必须是 2 的幂次方,因此使用按位与运算(&)可以有效地实现取模操作,// 从而将索引限制在数组长度范围内。@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}
}
源码流程图
图都给你画好了,戳这里