Redis深度历险阅读笔记(3)

前言

和RMQ、Kafka的区别大概就是Redis的消息队列并不是专业的,尽管它的延迟比RMQ、Kafka要低,但是不保证可靠。除此之外,Redis缺少非常多的高级特性。如果对消息队列没有太详细的定制化需求,那么可以考虑使用Redis-list来实现。

消息队列

Redis的消息队列可以通过list,用 rpush+lpop / lpush+rpop 来实现。

通过push塞入消息,通过pop获取消息。获取完消息后,对消息进行处理,处理完成后再从队列中pop获取消息。这样会造成一个问题:

空队列

当通过pop无法从队列中获取到消息时,会一直执行pop,进入死循环,导致Redis的QPS飙升。

我们通过time.sleep(1s)的方式来解决这个问题。但是很明显这不是一个最优解:会造成处理一个消息的延时莫名其妙多了1s,这个结果不是我们希望看到的。

有更好的解决方案:brpush/blpop(b: blocking)。阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。

空闲连接

如果队列长期为空,线程一直阻塞,那么Redis的客户端连接会成为空闲连接,服务器会主动断开连接,这时再去brpush/blpop就会抛出异常。

所以要谨慎使用:注意捕获异常&重连Redis。

延时队列

背景

客户端在处理请求时,如果加锁没成功,有3种策略可以解决:

  1. 直接通知用户稍后重试
  2. sleep一会儿再重试,会导致队列后续消息处理出现延时,有一定风险
  3. 将请求转移到延时队列,过一会儿再重试

实现

通过ZSet来实现。以消息为value,以到期时间为score,用多个线程轮询zset获取到期的任务进行处理(多线程是为了保障可用性,但是也需要确保一个任务不被多个线程同时执行)

基于Go的代码写在了:https://github.com/Carey6918/RedisGo/blob/master/delay_queue.go

优化

为了避免多个线程同时读取到一个任务,但是只有一个能执行的情况。我们可以将zrangebyscore和zrem封装到lua脚本中,以减少浪费。

local message = redis.Call('zrangebyscore', KEYS[1], 0, os.time());
if ( #message > 0 ) then
    redis.Call('zrem', KEYS[1], message[1]);
    return message[1];
else
    return {};
end
Table of Contents