Redis 的订阅与发布JedisPubSub

订阅与发布的场景在现代分布式系统中非常的常见,而且使用场景也非常的多。比如,我现在有一个配置中心,当我更新配置后,我希望相关的系统都能够自动的把缓存给替换掉。

再比如,最常见的场景,群聊。只要群里已有人发消息,在这个群里的所有人都能收到。我这里只列举了两个例子,实际工作中的使用场景非常的广泛,我在前面的文章中讲 Spring 的发布订阅的时候,也讲过我们电商系统中的一些使用场景。今天,我重点讲一下 Redis 中的发布订阅功能。

JedisPubSub

与 Redis 发布订阅相关的命令有 6 个,分别如下:

  • PSUBSCRIBE pattern [pattern …]:订阅一个或者多个符合pattern格式的频道
  • PUBLISH channel message:发布消息到chanel中
  • PUBSUB subcommand [argument [argument …]]:查看订阅与发布系统状态
  • PUNSUBSCRIBE [pattern [pattern …]]:退订所有符合格式的频道
  • SUBSCRIBE channel [channel …]:订阅一个或者多个频道
  • UNSUBSCRIBE [channel [channel …]]:取消订阅频道

而在 Jedis 中,也提供了一个类 JedisPubSub,用来对订阅的 channel 进行监听。

  • onPMessage:监听到订阅模式接受到消息时的回调
  • onMessage:监听到订阅频道接受到消息时的回调
  • onSubscribe:订阅频道时的回调
  • onUnsubscribe:取消订阅频道时的回调
  • onPSubscribe:订阅频道模式时的回调
  • onPUnsubscribe:取消订阅模式时的回调

实现 JedisPubSub 类,我们可以做到将消息持久化等操作。

public class Subscriber extends JedisPubSub {
    public Subscriber() {}
    public void onMessage(String channel, String message) {}
    public void onSubscribe(String channel, int subscribedChannels) {}
    public void onUnsubscribe(String channel, int subscribedChannels) {}
    // 省略其他方法
}

需要注意的是,实现 JedisPubSub 的这种方式是阻塞的。效率不高,因此最好是你自己在通过多线程的方式来维护!看下面的简单例子。

public class SubThread extends Thread {
    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();
    private final String channel = "xttblogChannel";
    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }
    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, "
            + "thread will be blocked", channel));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(subscriber, channel);
        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}

我们从 JedisPool 获取一个 Jedis 实例,并使用这个 Jedis 实例进行 subscribe 的操作。整个实现可以说是非常的简单,我们只需关注业务逻辑就好!

需要注意的是一个 Redis client 发布消息,其他多个 redis client 订阅消息,发布的消息“即发即失”,redis 不会持久保存发布的消息;消息订阅者也将只能得到订阅之后的消息,通道中此前的消息将无从获得。这就类似于 JMS 中“非持久”类型的消息。

消息发布者,即 publish 客户端,无需独占链接,你可以在 publish 消息的同时,使用同一个 redis-client 链接进行其他操作(例如:INCR 等)。

消息订阅者,即 subscribe 客户端,需要独占链接,即进行 subscribe 期间,redis-client 无法穿插其他操作,此时 client 以阻塞的方式等待“publish 端”的消息;这一点很好理解,因此 subscribe 端需要使用单独的链接,甚至需要在额外的线程中使用。

一旦 subscribe 端断开链接,将会失去部分消息,即链接失效期间的消息将会丢失。

Redis 的订阅与发布JedisPubSub

: » Redis 的订阅与发布JedisPubSub

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/252117.html

(0)
上一篇 2022年5月4日
下一篇 2022年5月4日

相关推荐

发表回复

登录后才能评论