订阅与发布的场景在现代分布式系统中非常的常见,而且使用场景也非常的多。比如,我现在有一个配置中心,当我更新配置后,我希望相关的系统都能够自动的把缓存给替换掉。
再比如,最常见的场景,群聊。只要群里已有人发消息,在这个群里的所有人都能收到。我这里只列举了两个例子,实际工作中的使用场景非常的广泛,我在前面的文章中讲 Spring 的发布订阅的时候,也讲过我们电商系统中的一些使用场景。今天,我重点讲一下 Redis 中的发布订阅功能。
与 Redis 发布订阅相关的命令有 6 个,分别如下:
- PSUBSCRIBE pattern [pattern …]:订阅一个或者多个符合pattern格式的频道
- PUBLISH channel message:发布消息到chanel中
- PUBSUB subcommand [argument [argument …]]:查看订阅与发布系统状态
- PUNSUBSCRIBE [pattern [pattern …]]:退订所有符合格式的频道
- SUBSCRIBE channel [channel …]:订阅一个或者多个频道
- UNSUBSCRIBE [channel [channel …]]:取消订阅频道
而在 Jedis 中,也提供了一个类 JedisPubSub,用来对订阅的 channel 进行监听。
- onPMessage:监听到订阅模式接受到消息时的回调
- onMessage:监听到订阅频道接受到消息时的回调
- onSubscribe:订阅频道时的回调
- onUnsubscribe:取消订阅频道时的回调
- onPSubscribe:订阅频道模式时的回调
- onPUnsubscribe:取消订阅模式时的回调
实现 JedisPubSub 类,我们可以做到将消息持久化等操作。
public class Subscriber extends JedisPubSub {
public Subscriber() {}
public void onMessage(String channel, String message) {}
public void onSubscribe(String channel, int subscribedChannels) {}
public void onUnsubscribe(String channel, int subscribedChannels) {}
// 省略其他方法
}
需要注意的是,实现 JedisPubSub 的这种方式是阻塞的。效率不高,因此最好是你自己在通过多线程的方式来维护!看下面的简单例子。
public class SubThread extends Thread {
private final JedisPool jedisPool;
private final Subscriber subscriber = new Subscriber();
private final String channel = "xttblogChannel";
public SubThread(JedisPool jedisPool) {
super("SubThread");
this.jedisPool = jedisPool;
}
@Override
public void run() {
System.out.println(String.format("subscribe redis, channel %s, "
+ "thread will be blocked", channel));
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.subscribe(subscriber, channel);
} catch (Exception e) {
System.out.println(String.format("subsrcibe channel error, %s", e));
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
我们从 JedisPool 获取一个 Jedis 实例,并使用这个 Jedis 实例进行 subscribe 的操作。整个实现可以说是非常的简单,我们只需关注业务逻辑就好!
需要注意的是一个 Redis client 发布消息,其他多个 redis client 订阅消息,发布的消息“即发即失”,redis 不会持久保存发布的消息;消息订阅者也将只能得到订阅之后的消息,通道中此前的消息将无从获得。这就类似于 JMS 中“非持久”类型的消息。
消息发布者,即 publish 客户端,无需独占链接,你可以在 publish 消息的同时,使用同一个 redis-client 链接进行其他操作(例如:INCR 等)。
消息订阅者,即 subscribe 客户端,需要独占链接,即进行 subscribe 期间,redis-client 无法穿插其他操作,此时 client 以阻塞的方式等待“publish 端”的消息;这一点很好理解,因此 subscribe 端需要使用单独的链接,甚至需要在额外的线程中使用。
一旦 subscribe 端断开链接,将会失去部分消息,即链接失效期间的消息将会丢失。
: » Redis 的订阅与发布JedisPubSub
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/252117.html