function _M.subscribe( self, channel )
local redis, err = redis_c:new()
if not redis then
return nil, err
end
local ok, err = self:connect_mod(redis)
if not ok or err then
return nil, err
end
local res, err = redis:subscribe(channel)
if not res then
return nil, err
end
res, err = redis:read_reply()
if not res then
return nil, err
end
redis:unsubscribe(channel)
self.set_keepalive_mod(redis)
return res, err
end
function _M.subscribe( self, channel )
local redis, err = redis_c:new()
if not redis then
return nil, err
end
local ok, err = self:connect_mod(redis)
if not ok or err then
return nil, err
end
local res, err = redis:subscribe(channel)
if not res then
return nil, err
end
-- 封装成一个函数,开始
local function do_read_func ( do_read )
if do_read == nil or do_read == true then
res, err = redis:read_reply()
if not res then
return nil, err
end
return res
end
redis:unsubscribe(channel)
self.set_keepalive_mod(redis)
return
end
-- 结束
return do_read_func -- 返回上面封装的函数
end
调用示例代码:
local red = redis:new({timeout=1000})
local func = red:subscribe( "channel" )
if not func then
return nil
end
while true do
local res, err = func()
if err then
func(false)
end
... ...
end
return cbfunc
local res, err = red:unsubscribe("ch")
if not res then
ngx.log(ngx.ERR, err)
return
else
-- redis 推送的消息格式,可能是
-- {"message", ...} 或
-- {"unsubscribe", $channel_name, $remain_channel_num}
-- 如果返回的是前者,说明我们还在读取 Redis 推送过的数据
if res[1] ~= "unsubscribe" then
repeat
-- 需要抽空已经接收到的消息
res, err = red:read_reply()
if not res then
ngx.log(ngx.ERR, err)
return
end
until res[1] == "unsubscribe"
end
-- 现在再复用连接,就足够安全了
self.set_keepalive_mod(redis)
end