Redis 使用中遇到的相关问题



1.1 Redis使用过程中的问题

问题1:JedisDataException: ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context

redis发布订阅过程中,使用该连接进行其他处理,导致订阅失败。错误栈如下:

redis.clients.jedis.exceptions.JedisDataException: ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context
        at redis.clients.jedis.Protocol.processError(Protocol.java:66)
        at redis.clients.jedis.Protocol.process(Protocol.java:73)
        at redis.clients.jedis.Protocol.read(Protocol.java:138)
        at redis.clients.jedis.Connection.getBinaryBulkReply(Connection.java:185)
        at redis.clients.jedis.Connection.getBulkReply(Connection.java:174)
        at redis.clients.jedis.Jedis.get(Jedis.java:77)

我们公司实现Redis发布订阅,实现方式和以下代码类似,使用一个Redis订阅的类继承至JedisPubSub,然后定义一个JedisSubscribeHandler接口,该接口的实现类需要调用RedisSubListener的注册方法registerHandler,这样每次redis发布一条消息就可以通过channle分发到具体的handler做处理。


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import redis.clients.jedis.JedisPubSub;

import com.lkzlee.redis.handler.JedisSubscribeHandler;

/**
 * 继承自JedisPubSub实现redis的发布订阅
 * @author lkzlee
 *
 */
public class RedisSubListener extends JedisPubSub
{
	private final static Map<String, JedisSubscribeHandler> registerHandlerMap = new HashMap<String, JedisSubscribeHandler>();
	private final static Log log = LogFactory.getLog(RedisSubListener.class);

	public static void registerHandler(String channel, JedisSubscribeHandler handler)
	{
		registerHandlerMap.put(channel, handler);
	}

	@Override
	public void onMessage(String channel, String message)
	{
		if (registerHandlerMap.containsKey(channel))
		{
			log.fatal("找不到相关channel订阅中信息,channel=" + channel + "|message=" + message + ",请检查是否有注册");
			return;
		}
		JedisSubscribeHandler handler = registerHandlerMap.get(channel);
		handler.handler(message);
	}
}

我们公司内部对于获取Jedis连接做了一些优化,实现了JedisProxy类,这个类可以防止同步调用方法类获取多个Jedis连接,实现了redis连接的线程安全性,用ThreadLocal来提高redis利用率,并通过一个JedisProxyAop注入的方法不用再执行Jedis方法的同时获取和释放连接,具体实现原理如下: JedisProxy内部有一个成员:

	/**使用threadLocal避免释放的时候传递jedis对象*/
	private static ThreadLocal<Jedis> jedisLocal = new ThreadLocal<Jedis>();
	/**
	 * 获取jedis
	 * @return
	 */
	public Jedis getJedis()
	{
		Jedis jedis = null;
		try
		{
			/**去本线程中的jedis*/
			jedis = jedisLocal.get();
			if (null != jedis)
			{
				try
				{
					// 取出来后执行ping检查下是否依然存活
					if (jedis.isConnected())
					{
						return jedis;
					}
				}
				catch (Exception e)
				{
					releaseBrokenJedis();
				}
			}
			if (null == redisSentinelPool)
			{
				this.initPool();
			}
			jedis = redisSentinelPool.getResource();
			jedisLocal.set(jedis);// 设置本线程中的jedis
		}
		catch (Exception e)
		{
			releaseBrokenJedis();
			if (null != jedisLocal.get())
			{
				jedisLocal.remove();
			}
			log.fatal("Could not get a resource from the pool, pls check the host and port settings", e);
		}
		return jedis;
	}
	/**
	 * 释放
	 */
	public void releaseJedis()
	{
		Jedis jedis = jedisLocal.get();
		if (null != jedis && null != redisSentinelPool)
		{
			if (jedis.isConnected())
			{
				redisSentinelPool.returnResource(jedis);
			}
		}
		jedisLocal.remove();
	}

	/**
	 * 释放损坏资源
	 */
	public void releaseBrokenJedis()
	{
		Jedis jedis = jedisLocal.get();
		if (null != jedis && null != redisSentinelPool)
		{
			redisSentinelPool.returnBrokenResource(jedis);
		}
		jedisLocal.remove();
	}

redisSentinelPool这个类基于JedisSentinelPool实现了Jedis连接池管理,但是各位有没有发现handler.handler(message);调用的这个方法是同步调用的,如果此方法处理的业务逻辑较多,比如又用到了redis,此时获取的redis连接和JedisPubSub【可以看看源码,有内置的client】连接是同一个的话,就会造成Redis上述的报错。

问题2:Jedis返回JedisConnectionException:Unknown reply: /

Jedis返回JedisConnectionException:Unknown reply: / 类似的错误,其本质可以通过源码看到Redis获取连接和使用是通过Pool来进行管理的,当时获取一个redis连接的时候internalPool.borrowObject()从Jedis实现的连接池中获取一个,用完释放returnResourceObject(resource)还给连接池,如果有一次调用Jedis连接获取缓存内容,返回过程中异常,而该连接的数据流中的数据还存在,那下次另一个线程获取到连接请求获取的消息时,上次信息会继续从数据流中读取到并返回,此时返现连接请求数据与返回数据判断格式头不一致就会报错JedisConnectionException:Unknown reply: /

如下代码:

   private static Object process(final RedisInputStream is) {
	try {
	    byte b = is.readByte();
	    if (b == MINUS_BYTE) {
		processError(is);
	    } else if (b == ASTERISK_BYTE) {
		return processMultiBulkReply(is);
	    } else if (b == COLON_BYTE) {
		return processInteger(is);
	    } else if (b == DOLLAR_BYTE) {
		return processBulkReply(is);
	    } else if (b == PLUS_BYTE) {
		return processStatusCodeReply(is);
	    } else {
		throw new JedisConnectionException("Unknown reply: " + (char) b);
	    }
	} catch (IOException e) {
	    throw new JedisConnectionException(e);
	}
	return null;
    }