本篇内容主要讲解“redis流数据推送多用户的方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“redis流数据推送多用户的方法是什么”吧!
1 当用户几百 几千个时 如何推送?取缔线程池 采用单线程异步同步推送。
2 现在的逻辑:
每次项目重新启动: 初始化channel 、服务端断开连接-重新连接。当有服务连接不上的时候定时器连接。
这样会产生一个问题:异步连接的任务特别多 导致服务奔溃
/** * 关闭连接时 */@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); int port = ipSocket.getPort(); String host = ipSocket.getHostString(); String serverUrl = host + ":" + port; String prev = redisTemplate.opsForValue().get(SOCKET_CONNECT_PREFIX + serverUrl); Integer cur = Math.toIntExact(System.currentTimeMillis() / 1000); if (null == prev || cur - Integer.parseInt(prev) > 20 * 60) { redisTemplate.opsForValue().set(SOCKET_CONNECT_PREFIX + serverUrl, cur + ""); log.info("服务端断开连接=====" + host + port + "20分钟之后 重新连接"); final EventLoop eventLoop = ctx.channel().eventLoop(); Bootstrap bootstrap = defaultProcessHandler.getBootstrap(eventLoop); eventLoop.schedule( () -> defaultProcessHandler.doConnect(bootstrap, host + ":" + port, new AtomicInteger(0)), 20, TimeUnit.MINUTES); } else { log.warn(serverUrl + "距离上次断开连接不足20分钟==" + (cur - Integer.parseInt(prev)) + "s"); } super.channelInactive(ctx);}
在系统启动初始化 连接10次:
3 如何保证发送数据的完整性 TCP 粘包问题
服务端添加按换行符分隔的解码器;
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
//此方法每次客户端连接都会调用,是为通道初始化的方法
//获得通道channel中的管道链(执行链、handler链)
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(Short.MAX_VALUE * 10));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringHandler());
log.info("success to initHandler!");
}
});
4 关于推送服务断开之后 重新连接上来 接着上次发送的位置继续推送。
ChannelFuture future = channels.get(callbackUrl.getUrl());if (null != future) { try { boolean result = sendMsg(future, pushDataStr); this.redisTemplate.opsForValue().set(callbackUrl.getUrl(), offset.toString()); if (!result) { log.error("this channel push failed {}", callbackUrl.getUrl()); returnVal = false; } } catch (Exception e) { log.error("push exception", e); returnVal = false; }}return returnVal;
当推送成功 会记录次用户 此次的数据游标数据到redis.
当推送服务挂断之后,会进行任务的初始化 此时会从redis中读取每个客户上次读取的位置offest 提交到任务线程池
这个问题同样解决了服务重启之后,依然可以从上次读取结束的位置接着读取。读取任务的开始游标位置 :是上次服务成功处理后的游标。
5 当接收数据的服务端 重新断开之后,如何保证接着上次读取的位置?
BaseReceiver 接口 handler 方法 每次接收到数据 返回当前游标值。当进行业务处理成功之后 返回true .会自动进行后续数据读取。当客户那边的接收数据服务端挂了之后 首先会进行自动重连操作,此时读取datahub数据的线程依然在返回数据 但是不能推送成功,所以游标值 不会后移。
到此,相信大家对“redis流数据推送多用户的方法是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
原创文章,作者:745907710,如若转载,请注明出处:https://blog.ytso.com/229442.html