netty断线重连应该怎么实现

2022-12-14 06:28:28 发布:网友投稿 作者:网友投稿
热度:40


多条告白如次剧本只需引入一次

媒介

在实行TCP长贯穿功效中,存户端断线重连是一个很罕见的题目,当咱们运用netty实行断线重连时,能否商量过如次几个题目:

怎样监听到存户端和效劳端贯穿割断 ?怎样实行断线后从新贯穿 ?netty存户端线程给多大比拟有理 ?本来上头都是笔者在做断线重连时所遇到的题目,而 “netty存户端线程给多大比拟有理?” 这个题目更是笔者在做断线重连时因一个特殊激励的推敲。 底下讲讲所有进程:

由于本节解说实质重要波及在存户端,然而为了读者群不妨运转所有步调,以是这边先给出效劳端及大众的依附和实业类。

效劳端及common代码

maven依附:

<dependencies> <!--不过用到了spring-boot的日记框架--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.56.Final</version> </dependency> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>2.0.10.Final</version> </dependency></dependencies>效劳端交易处置代码

重要用来记载打字与印刷暂时存户端贯穿数,当接受到存户端消息后归来“hello netty”字符串

@ChannelHandler.Sharablepublic class SimpleServerHandler extends ChannelInboundHandlerAdapter { private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleServerHandler.class); public static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { channels.add(ctx.channel()); log.info("存户端贯穿胜利: client address :{}", ctx.channel().remoteAddress()); log.info("暂时公有{}个存户端贯穿", channels.size()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("server channelRead:{}", msg); ctx.channel().writeAndFlush("hello netty"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive: client close"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof java.io.IOException) { log.warn("exceptionCaught: client close"); } else { cause.printStackTrace(); } }}效劳端心跳查看代码

当接受心跳”ping”消息后,归来存户端’’pong”消息。 即使存户端在指定功夫内没有发送任何消息则封闭存户端。

public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter { private static final InternalLogger log = InternalLoggerFactory.getInstance(ServerHeartbeatHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("server channelRead:{}", msg); if (msg.equals("ping")) { ctx.channel().writeAndFlush("pong"); } else { //由下一个handler处置,示例中则为SimpleServerHandler ctx.fireChannelRead(msg); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { //该事变须要共同 io.netty.handler.timeout.IdleStateHandler运用 IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.READER_IDLE) { //胜过指定功夫没有读事变,封闭贯穿 log.info("胜过心跳功夫,封闭和效劳端的贯穿:{}", ctx.channel().remoteAddress()); //ctx.channel().close(); } } else { super.userEventTriggered(ctx, evt); } }}编解码东西类

重要运用jboss-marshalling-serial编解码东西,可自行查问其优缺陷,这边不过示例运用。

public final class MarshallingCodeFactory { /** 创造Jboss marshalling 解码器 */ public static MarshallingDecoder buildMarshallingDecoder() { //参数serial表白创造的是Java序列化工场东西,由jboss-marshalling-serial供给 MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial"); MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration); return new MarshallingDecoder(provider, 1024); } /** 创造Jboss marshalling 源代码器 */ public static MarshallingEncoder buildMarshallingEncoder() { MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial"); MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); DefaultMarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration); return new MarshallingEncoder(provider); }}大众实业类

public class UserInfo implements Serializable { private static final long serialVersionUID = 6271330872494117382L; private String username; private int age; public UserInfo() { } public UserInfo(String username, int age) { this.username = username; this.age = age; } //简略getter/setter/toString}底下发端正文的中心,存户端断线重连以及题目推敲。

存户端实行

刚发端启用时须要举行同步贯穿,指定贯穿度数内没用经过则抛出特殊,过程退出。 存户端启用后,打开准时工作,模仿存户端数据发送。 存户端交易处置handler,接受到数据后,经过日记打字与印刷。

public class SimpleClientHandler extends ChannelInboundHandlerAdapter { private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class); private NettyClient client; public SimpleClientHandler(NettyClient client) { this.client = client; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("client receive:{}", msg); }}封装贯穿本领、割断贯穿本领、getChannel()归来io.netty.channel.Channel用来向效劳端发送数据。 boolean connect()是一个同步贯穿本领,即使贯穿胜利归来true,贯穿波折归来false。

public class NettyClient { private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class); private EventLoopGroup workerGroup; private Bootstrap bootstrap; private volatile Channel clientChannel; public NettyClient() { this(-1); } public NettyClient(int threads) { workerGroup = threads > 0 ? new NioEventLoopGroup(threads) : new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000) .handler(new ClientHandlerInitializer(this)); } public boolean connect() { log.info("试验贯穿到效劳端: 127.0.0.1:8088"); try { ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088); boolean notTimeout = channelFuture.awaitUninterruptibly(30, TimeUnit.SECONDS); clientChannel = channelFuture.channel(); if (notTimeout) { if (clientChannel != null && clientChannel.isActive()) { log.info("netty client started !!! {} connect to server", clientChannel.localAddress()); return true; } Throwable cause = channelFuture.cause(); if (cause != null) { exceptionHandler(cause); } } else { log.warn("connect remote host[{}] timeout {}s", clientChannel.remoteAddress(), 30); } } catch (Exception e) { exceptionHandler(e); } clientChannel.close(); return false; } private void exceptionHandler(Throwable cause) { if (cause instanceof ConnectException) { log.error("贯穿特殊:{}", cause.getMessage()); } else if (cause instanceof ClosedChannelException) { log.error("connect error:{}", "client has destroy"); } else { log.error("connect error:", cause); } } public void close() { if (clientChannel != null) { clientChannel.close(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } } public Channel getChannel() { return clientChannel; } static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> { private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class); private NettyClient client; public ClientHandlerInitializer(NettyClient client) { this.client = client; } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder()); pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder()); //pipeline.addLast(new IdleStateHandler(25, 0, 10)); //pipeline.addLast(new ClientHeartbeatHandler()); pipeline.addLast(new SimpleClientHandler(client)); } }}存户端启用类

public class NettyClientMain { private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClientMain.class); private static final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); public static void main(String[] args) { NettyClient nettyClient = new NettyClient(); boolean connect = false; //刚启用时试验贯穿10次,都没辙创造贯穿则不在试验 //即使想在刚启用后,从来试验贯穿,须要放在线程中,异步实行,提防阻碍步调 for (int i = 0; i < 10; i++) { connect = nettyClient.connect(); if (connect) { break; } //贯穿不可功,隔5s之后从新试验贯穿 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } if (connect) { log.info("准时发送数据"); send(nettyClient); } else { nettyClient.close(); log.info("过程退出"); } } /** 准时发送数据 */ static void send(NettyClient client) { scheduledExecutor.schedule(new SendTask(client,scheduledExecutor), 2, TimeUnit.SECONDS); }}存户端断线重连

断线重连需要:

效劳端和存户端之间搜集特殊,或相应超时(比方有个很长功夫的fullGC),存户端须要积极重连其余节点。 效劳端宕机时大概和存户端之间爆发任何特殊时,存户端须要积极重连其余节点。 效劳端积极向存户端发送(效劳端)底线报告时,存户端须要积极重连其余节点。 怎样监听到存户端和效劳端贯穿割断 ?

netty的io.netty.channel.ChannelInboundHandler接口中给咱们供给了很多要害的接口本领。 为了制止实行十足的接口本领,不妨经过接受io.netty.channel.ChannelInboundHandlerAdapter来重写相映的本领即可。

1.void channelInactive(ChannelHandlerContext ctx);在存户端封闭时被挪用,表白存户端割断贯穿。 当有以次几种情景爆发时会触发:

存户端在平常active状况下,积极挪用channel大概ctx的close本领。 效劳端积极挪用channel大概ctx的close本领封闭存户端的贯穿 。 爆发java.io.IOException(普遍情景下是两边贯穿割断)大概java.lang.OutOfMemoryError(4.1.52本子中新增)时2.void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;则是在入栈爆发任何特殊时被挪用。 即使特殊是java.io.IOException大概java.lang.OutOfMemoryError(4.1.52本子新增)时,还会触发channelInactive本领,也即是上头channelInactive被触发的第3条情景。

3.心跳查看也是查看存户端与效劳端之间贯穿状况的需要办法,由于在少许状况下,两头本质上仍旧割断贯穿,但存户端没辙感知,这功夫就须要经过心跳来确定两头的贯穿状况。 心跳不妨是存户端心跳和效劳端心跳。

存户端信跳:即为存户端发送心跳ping消息,效劳端恢复pong消息。 如许在指定功夫内,两边罕见据交互则觉得是平常贯穿状况。 效劳端消息:则是效劳端向存户端发送ping消息,存户端恢复pong消息。 在指定功夫内没有收到恢复,则觉得对方底线。 netty给咱们供给了特殊大略的心跳查看办法,只须要在channel的handler链上,增添io.netty.handler.timeout.IdleStateHandler即可实行。

IdleStateHandler犹如下几个要害的参数:

readerIdleTimeSeconds, 读超时. 即当在指定的功夫间隙内没有从 Channel 读取到数据时, 会触发一个READER_IDLE的IdleStateEvent 事变.writerIdleTimeSeconds, 写超时. 即当在指定的功夫间隙内没罕见据写入到 Channel 时, 会触发一个WRITER_IDLE的IdleStateEvent 事变.allIdleTimeSeconds, 读/写超时. 当在指定的功夫间隙内没有读或写操纵时, 会触发一个ALL_IDLE的IdleStateEvent 事变.为了不妨监听到那些事变的触发,还须要重写ChannelInboundHandler#userEventTriggered(ChannelHandlerContext ctx, Object evt)本领,经过参数evt确定事变典型。 在指定的功夫内即使没有读写则发送一条心跳的ping乞求,在指定功夫内没有收到读操纵则工作仍旧和效劳端割断贯穿。 则挪用channel大概ctx的close本领,使存户端Handler实行channelInactive本领。

到这边可见咱们只有在channelInactive和exceptionCaught两个本领中实行本人的重连论理即可,然而笔者遇到了第一个坑,重连本领实行了两次。

先看示例代码和截止,在com.bruce.netty.rpc.client.SimpleClientHandler中增添如次代码:

public class SimpleClientHandler extends ChannelInboundHandlerAdapter { private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class); //简略局部代码...... /** 存户端平常底线时实行该本领 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.warn("channelInactive:{}", ctx.channel().localAddress()); reconnection(ctx); } /** 入栈爆发特殊时实行exceptionCaught */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof IOException) { log.warn("exceptionCaught:存户端[{}]和长途割断贯穿", ctx.channel().localAddress()); } else { log.error(cause); } reconnection(ctx); } private void reconnection(ChannelHandlerContext ctx) { log.info("5s之后从新创造贯穿"); //姑且为空实行 }}ClientHandlerInitializer 中增添io.netty.handler.timeout.IdleStateHandler用来心跳查看,ClientHeartbeatHandler用来监听心跳事变,接受心跳pong恢复。

static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> { private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class); private NettyClient client; public ClientHandlerInitializer(NettyClient client) { this.client = client; } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder()); pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder()); //25s内没有read操纵则触发READER_IDLE事变 //10s内既没有read又没有write操纵则触发ALL_IDLE事变 pipeline.addLast(new IdleStateHandler(25, 0, 10)); pipeline.addLast(new ClientHeartbeatHandler()); pipeline.addLast(new SimpleClientHandler(client)); }}com.bruce.netty.rpc.client.ClientHeartbeatHandler

public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter { private static final InternalLogger log = InternalLoggerFactory.getInstance(ClientHeartbeatHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg.equals("pong")) { log.info("收到心跳恢复"); } else { super.channelRead(ctx, msg); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { //该事变须要共同 io.netty.handler.timeout.IdleStateHandler运用 IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.ALL_IDLE) { //向效劳端发送心跳检验和测定 ctx.writeAndFlush("ping"); log.info("发送心跳数据"); } else if (idleStateEvent.state() == IdleState.READER_IDLE) { //胜过指定功夫没有读事变,封闭贯穿 log.info("胜过心跳功夫,封闭和效劳端的贯穿:{}", ctx.channel().remoteAddress()); ctx.channel().close(); } } else { super.userEventTriggered(ctx, evt); } }}先启用server端,再启用client端,待贯穿胜利之后kill掉 server端过程。

经过存户端日记不妨看出,先是实行了exceptionCaught本领而后实行了channelInactive本领,然而这两个本领中都挪用了reconnection本领,引导同声实行了两次重连。

干什么实行了exceptionCaught本领又实行了channelInactive本领呢?

咱们不妨在exceptionCaught和channelInactive本领增添断点一步步察看源码

当NioEventLoop实行select操纵之后,处置相映的SelectionKey,爆发特殊后,会挪用AbstractNioByteChannel.NioByteUnsafe#handleReadException本领举行处置,并触发pipeline.fireExceptionCaught(cause),最后挪用到用户handler的fireExceptionCaught本领。

private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close, RecvByteBufAllocator.Handle allocHandle) { if (byteBuf != null) { if (byteBuf.isReadable()) { readPending = false; pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); } } allocHandle.readComplete(); pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(cause); // If oom will close the read event, release connection. // See https://github.com/netty/netty/issues/10434 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) { closeOnRead(pipeline); }}该本领结果会确定特殊典型,实行close贯穿的本领。 在贯穿断线的场景中,这边即为java.io.IOException,以是实行了close本领,当debug到AbstractChannel.AbstractUnsafe#close(ChannelPromise, Throwable, ClosedChannelException, notify)本领中会创造结果又挪用了AbstractUnsafe#fireChannelInactiveAndDeregister本领,连接debug结果则会实行自设置的fireChannelInactive本领。

到这边不妨归纳一个常识点:netty中当实行到handler地fireExceptionCaught本领时,大概会连接触发到fireChannelInactive,也大概不会触发fireChannelInactive。

除去netty按照特殊典型确定能否实行close本领外,本来开拓职员也不妨本人经过ctx大概channel去挪用close本领,代码如次:

@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof IOException) { log.warn("exceptionCaught:存户端[{}]和长途割断贯穿", ctx.channel().localAddress()); } else { log.error(cause); } //ctx.close(); ctx.channel().close();}但这种表露挪用close本领,能否确定会触发挪用fireChannelInactive呢?

即使是,那么只须要在exceptionCaught中挪用close本领,fireChannelInactive中做重连的论理即可!!

在笔者经过日记查看到,在exceptionCaught中挪用close本领历次城市挪用fireChannelInactive本领。 然而察看源码,笔者觉得这是不确定的,由于在AbstractChannel.AbstractUnsafe#close(ChannelPromise,Throwable, ClosedChannelException, notify)中会挪用io.netty.channel.Channel#isActive举行确定,惟有为true,才会实行fireChannelInactive本领。

//io.netty.channel.socket.nio.NioSocketChannel#isActive@Overridepublic boolean isActive() { SocketChannel ch = javaChannel(); return ch.isOpen() && ch.isConnected();}怎样处置同声实行两次题目呢?

在netty初始化时,咱们城市增添一系列的handler处置器,那些handler本质上会在netty创造Channel东西(NioSocketChannel)时,被封装在DefaultChannelPipeline中,而DefaultChannelPipeline本质上是一个双向链表,头节点为TailContext,尾节点为TailContext,而中央的节点则是咱们增添的一个个handler(被封装成DefaultChannelHandlerContext),当实行Pipeline上的本领时,会从链表上遍历handler实行,所以当实行exceptionCaught本领时,咱们只须要提早移除链表上自设置的Handler则没辙实行fireChannelInactive本领。

结果实行代码如次:

public class SimpleClientHandler extends ChannelInboundHandlerAdapter { private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class); @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.warn("channelInactive:{}", ctx.channel().localAddress()); ctx.pipeline().remove(this); ctx.channel().close(); reconnection(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof IOException) { log.warn("exceptionCaught:存户端[{}]和长途割断贯穿", ctx.channel().localAddress()); } else { log.error(cause); } ctx.pipeline().remove(this); //ctx.close(); ctx.channel().close(); reconnection(ctx); }}实行功效如次,不妨看到当爆发特殊时,不过实行了exceptionCaught本领,而且经过channel封闭了上一次贯穿资源,也没有实行暂时handler的fireChannelInactive本领。

怎样实行断线后从新贯穿 ?

经过上头领会,咱们仍旧领会在什么本领中实行本人的重连论理,然而简直该如何实行呢,怀着猎奇的心态探求了一下各大码友的实行计划。 大多做法是经过ctx.channel().eventLoop().schedule增添一个准时工作挪用存户端的贯穿本领。 笔者也参考该办法实行代码如次:

private void reconnection(ChannelHandlerContext ctx) { log.info("5s之后从新创造贯穿"); ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { boolean connect = client.connect(); if (connect) { log.info("从新贯穿胜利"); } else { reconnection(ctx); } } }, 5, TimeUnit.SECONDS);}尝试:先启用server端,再启用client端,待贯穿胜利之后kill掉 server端过程。 存户端准期准时实行重连,但也就去茶卤儿间倒杯水的功夫,回顾后创造了如次特殊。

......简略14条沟通的重试日记[2021-01-17 18:46:45.032] INFO [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s之后从新创造贯穿[2021-01-17 18:46:48.032] INFO [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 试验贯穿到效劳端: 127.0.0.1:8088[2021-01-17 18:46:50.038] ERROR [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 贯穿特殊:Connection refused: no further information: /127.0.0.1:8088[2021-01-17 18:46:50.038] INFO [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s之后从新创造贯穿[2021-01-17 18:46:53.040] INFO [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : 试验贯穿到效劳端: 127.0.0.1:8088[2021-01-17 18:46:53.048] ERROR [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : connect error:io.netty.util.concurrent.BlockingOperationException: DefaultChannelPromise@10122121(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:462) at io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:159) at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:667) at io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:305) at com.bruce.netty.rpc.client.NettyClient.connect(NettyClient.java:49) at com.bruce.netty.rpc.client.SimpleClientHandler$1.run(SimpleClientHandler.java:65) at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)按照特殊栈,不妨创造是com.bruce.netty.rpc.client.NettyClient#connect本领中挪用了等候本领

boolean notTimeout = channelFuture.awaitUninterruptibly(20, TimeUnit.SECONDS);而该本领里面会举行检验和测定,能否在io线程上实行了同步等候,这会引导抛出特殊BlockingOperationException。

@Overrideprotected void checkDeadLock() { if (channel().isRegistered()) { super.checkDeadLock(); }}protected void checkDeadLock() { EventExecutor e = executor(); if (e != null && e.inEventLoop()) { throw new BlockingOperationException(toString()); }}怪僻的是干什么不是历次试验重连都抛出该特殊,而是每隔16次抛出一次呢?

这让我设想到本人的条记本是8核处置器,而netty默许线程池是2 * c,即是16条线程,这之间犹如有些关系。

本质上在挪用ChannelFuture channelFuture = bootstrap.connect(“127.0.0.1”, 8088);,netty开始会创造一个io.netty.channel.Channel(示例中是NioSocketChannel),而后经过io.netty.util.concurrent.EventExecutorChooserFactory.EventExecutorChooser顺序采用一个NioEventLoop,将Channel绑定到NioEventLoop上。

io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop

//Return true if the given Thread is executed in the event loop, false otherwise.@Overridepublic boolean inEventLoop(Thread thread) { return thread == this.thread;}重连的本领是在一个NioEventLoop(也即是io线程)上被挪用,第1次重连本质上是采用了第2个NioEventLoop,第2次重连本质上是采用了第3个NioEventLoop,以该类推,当一轮采用事后,从新选到第一个NioEventLoop时,boolean inEventLoop()归来true,则抛出了BlockingOperationException。

计划1

不要在netty的io线程上实行同步贯穿,运用独立的线程池准时实行重试,该线程还不妨实行本人重连的交易论理操纵,不阻碍io线程。 (即使不须要交易操纵之后废弃线程池)。

com.bruce.netty.rpc.client.SimpleClientHandler 窜改reconnection本领

private static ScheduledExecutorService SCHEDULED_EXECUTOR;private void initScheduledExecutor() { if (SCHEDULED_EXECUTOR == null) { synchronized (SimpleClientHandler.class) { if (SCHEDULED_EXECUTOR == null) { SCHEDULED_EXECUTOR = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "Client-Reconnect-1"); t.setDaemon(true); return t; }); } } }}private void reconnection(ChannelHandlerContext ctx) { log.info("5s之后从新创造贯穿"); initScheduledExecutor(); SCHEDULED_EXECUTOR.schedule(() -> { boolean connect = client.connect(); if (connect) { //贯穿胜利,封闭线程池 SCHEDULED_EXECUTOR.shutdown(); log.info("从新贯穿胜利"); } else { reconnection(ctx); } }, 3, TimeUnit.SECONDS);}计划2

不妨在io线程上运用异步重连:

com.bruce.netty.rpc.client.NettyClient增添本领connectAsync本领,两者的辨别在乎connectAsync本领中没有挪用channelFuture的同步等候本领。 而是改成监听器(ChannelFutureListener)的办法,本质上这个监听器是运转在io线程上。

public void connectAsync() { log.info("试验贯穿到效劳端: 127.0.0.1:8088"); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088); channelFuture.addListener((ChannelFutureListener) future -> { Throwable cause = future.cause(); if (cause != null) { exceptionHandler(cause); log.info("等候下一次重连"); channelFuture.channel().eventLoop().schedule(this::connectAsync, 5, TimeUnit.SECONDS); } else { clientChannel = channelFuture.channel(); if (clientChannel != null && clientChannel.isActive()) { log.info("Netty client started !!! {} connect to server", clientChannel.localAddress()); } } });}com.bruce.netty.rpc.client.SimpleClientHandler

public class SimpleClientHandler extends ChannelInboundHandlerAdapter { private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class); private NettyClient client; public SimpleClientHandler(NettyClient client) { this.client = client; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("client receive:{}", msg); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.warn("channelInactive:{}", ctx.channel().localAddress()); ctx.pipeline().remove(this); ctx.channel().close(); reconnectionAsync(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof IOException) { log.warn("exceptionCaught:存户端[{}]和长途割断贯穿", ctx.channel().localAddress()); } else { log.error(cause); } ctx.pipeline().remove(this); ctx.close(); reconnectionAsync(ctx); } private void reconnectionAsync(ChannelHandlerContext ctx) { log.info("5s之后从新创造贯穿"); ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { client.connectAsync(); } }, 5, TimeUnit.SECONDS); }}netty存户端线程给多大比拟有理 ?

netty中一个NioEventLoopGroup默许创造的线程数是cpu中心数 * 2 ,那些线程都是用来io操纵,那么对于存户端运用步调来说真的须要这么多io线程么?

经过上头领会BlockingOperationException特殊时咱们领会到,本质上netty在创造一个Channel东西后只会从NioEventLoopGroup中采用一个NioEventLoop来绑定,惟有创造多个Channel才会顺序采用下一个NioEventLoop,也即是说一个Channel只会对应一个NioEventLoop,而NioEventLoop不妨绑定多个Channel。

1.对于存户端来说,即使不过贯穿的一个server节点,那么只有树立1条线程即可。 纵然展示了断线重连,在贯穿割断之后,之前的Channel会从NioEventLoop移除。 重连之后,仍旧只会在仅有的一个NioEventLoop备案一个新的Channel。

2.即使存户端同声如次办法屡次挪用io.netty.bootstrap.Bootstrap#connect(String inetHost, int inetPort)贯穿多个Server节点,那么线程不妨树立大学一年级点,但不要胜过2*c,并且只有展示断线重连,同样不许保护每个NioEventLoop城市绑定一个存户端Channel。

public boolean connect() { try { ChannelFuture channelFuture1 = boot

下一篇:表示心情的成语(表示心情的词语ABB式)
上一篇:庙的拼音(庙的拼音组词)