瀏覽代碼

redis中消息保存 24小时 超过24小时的消息数据 删除

779513719 5 年之前
父節點
當前提交
d1a664c959

+ 4 - 4
jim-common/src/main/java/org/jim/common/cache/redis/JedisTemplate.java

@@ -328,7 +328,7 @@ public  class JedisTemplate implements  Serializable{
                return jedis.setex(key, expire, value);  
            }  
        }.getResult();  
-   }  
+   }
  
    /** 
     * 将 key 的值设为 value ,当且仅当 key 不存在。若给定的 key 已经存在,则 setStringIfNotExists 不做任何动作。 
@@ -1073,7 +1073,7 @@ public  class JedisTemplate implements  Serializable{
     * 将信息 message 发送到指定的频道 channel。 
     * 时间复杂度:O(N+M),其中 N 是频道 channel 的订阅者数量,而 M 则是使用模式订阅(subscribed patterns)的客户端的数量。 
     * @param channel 频道 
-    * @param List<message> 要发布的信息 
+    * @param List<message> 要发布的信息
     */  
    public void publishAll(final String channel, final List<String> messages) {
 	   if(messages == null || messages.size() == 0) {
@@ -1098,8 +1098,8 @@ public  class JedisTemplate implements  Serializable{
     */  
    public void subscribe(final JedisPubSub jedisPubSub, final String channel) {  
        new Executor<Object>(jedisPool) {  
- 
-           @Override  
+
+           @Override
            Object execute() {  
                jedis.subscribe(jedisPubSub, channel);  
                return null;  

+ 2 - 2
jim-server/src/main/java/org/jim/server/command/handler/processor/chat/BaseAsyncChatMessageProcessor.java

@@ -44,7 +44,7 @@ public abstract class BaseAsyncChatMessageProcessor implements AsyncChatMessageP
 			//存储群聊消息;
 			if(ChatType.CHAT_TYPE_PUBLIC.getNumber() == chatBody.getChatType()){
 				pushGroupMessages(PUSH,STORE, chatBody);
-			}else{
+			}/*else{
 				String from = chatBody.getFrom();
 				String to = chatBody.getTo();
 				String sessionId = ChatKit.sessionId(from,to);
@@ -53,7 +53,7 @@ public abstract class BaseAsyncChatMessageProcessor implements AsyncChatMessageP
 				if(!isOnline){
 					writeMessage(PUSH,USER+":"+to+":"+from,chatBody);
 				}
-			}
+			}*/
 		}
 		doHandler(chatBody, channelContext);
 	}

+ 31 - 0
jim-server/src/main/java/org/jim/server/helper/redis/RedisMessageHelper.java

@@ -92,11 +92,42 @@ public class RedisMessageHelper extends AbstractMessageHelper{
 	
 	@Override
 	public void writeMessage(String timelineTable, String timelineId, ChatBody chatBody) {
+		this.onlySaveTwentyFourHoursData(timelineTable,timelineId);
 		double score = chatBody.getCreateTime();
 		RedisCacheManager.getCache(timelineTable).sortSetPush(timelineId, score, chatBody);
 	}
 
 	/**
+	 * 只保存24小时内的数据,其余数据删除
+	 * @param timelineTable
+ 	 * @param timelineId
+	 * @return {@link }
+	 * @author Darren
+	 * @date 2020/2/15 15:37
+	 */
+	private void onlySaveTwentyFourHoursData(String timelineTable, String timelineId){
+		//获取当前时间毫秒数
+		long currentTimeMillis = System.currentTimeMillis();
+		double endTime = Double.longBitsToDouble(currentTimeMillis);
+		//当前时间 24小时前的时间毫秒数
+		long twentyFourHoursBeforeTimeMillis = currentTimeMillis - 24L * 60L * 60L * 1000L;
+		double beginTime = Double.longBitsToDouble(twentyFourHoursBeforeTimeMillis);
+		//获取24小时内的数据
+		List<String> messages = RedisCacheManager.getCache(timelineTable).sortSetGetAll(timelineId, beginTime, endTime);
+		//删除全部数据
+		RedisCacheManager.getCache(timelineTable).remove(timelineId);
+		//保存24小时内的数据
+		if(CollectionUtils.isNotEmpty(messages)){
+			List<ChatBody> chatBodies = JsonKit.toArray(messages, ChatBody.class);
+			Iterator<ChatBody> it = chatBodies.iterator();
+			while(it.hasNext()){
+				ChatBody data = it.next();
+				storeCache.sortSetPush(timelineId,data.getCreateTime(),data);
+			}
+		}
+	}
+
+	/**
 	 * 移除redis中 需要撤销的消息
  	 * @param cancelMessageReqBody
 	 * @return {@link }