779513719 5 роки тому
батько
коміт
be9655cad1
20 змінених файлів з 597 додано та 64 видалено
  1. 5 1
      jim-common/src/main/java/org/jim/common/ImConst.java
  2. 14 3
      jim-common/src/main/java/org/jim/common/ImStatus.java
  3. 20 4
      jim-common/src/main/java/org/jim/common/message/MessageHelper.java
  4. 130 0
      jim-common/src/main/java/org/jim/common/packets/CancelMessageReqBody.java
  5. 2 2
      jim-common/src/main/java/org/jim/common/packets/ChatBody.java
  6. 4 8
      jim-common/src/main/java/org/jim/common/utils/ChatKit.java
  7. 3 1
      jim-server/src/main/java/org/jim/server/command/command.properties
  8. 1 1
      jim-server/src/main/java/org/jim/server/command/handler/ChatReqHandler.java
  9. 76 0
      jim-server/src/main/java/org/jim/server/command/handler/IMCancelMessageHandler.java
  10. 4 4
      jim-server/src/main/java/org/jim/server/command/handler/LoginReqHandler.java
  11. 13 11
      jim-server/src/main/java/org/jim/server/command/handler/processor/chat/AutoMessageProcessor.java
  12. 7 8
      jim-server/src/main/java/org/jim/server/command/handler/processor/chat/AutoMsgQueueRunnable.java
  13. 91 0
      jim-server/src/main/java/org/jim/server/command/handler/processor/chat/IMCancelMessageProcessor.java
  14. 28 0
      jim-server/src/main/java/org/jim/server/enums/CancelMessageTypeEnum.java
  15. 11 4
      jim-server/src/main/java/org/jim/server/helper/db/MysqlMessageHelper.java
  16. 69 4
      jim-server/src/main/java/org/jim/server/helper/redis/RedisMessageHelper.java
  17. 116 0
      jim-server/src/main/java/org/jim/server/util/CancelMessageKit.java
  18. 0 11
      server-chat/src/main/java/com/cn/listener/IMChatGroupListener.java
  19. 2 1
      server-chat/src/main/java/com/cn/processor/IMChatAsyncChatMessageProcessor.java
  20. 1 1
      server-chat/src/main/resources/jim.properties

+ 5 - 1
jim-common/src/main/java/org/jim/common/ImConst.java

@@ -57,10 +57,14 @@ public interface ImConst
 
 	public static final String IM_CHAT_LOGIN_SERVICE_PROCESSOR = "im_chat_login_service_processor";
 
-	public static final String  AUTO_MESSAGE_PROCESSOR = "auto_message_processor";
+	public static final String  IM_AUTO_MESSAGE_PROCESSOR = "im_auto_message_processor";
 
 	public static final String SEPARATOR = ":";
 
 	public static final String ADMIN = "ADMIN";
+
+	public static final String IM_CANCEL_MESSAGE_PROCESSOR = "im_cancel_message_processor";
+
+
 	
 }

+ 14 - 3
jim-common/src/main/java/org/jim/common/ImStatus.java

@@ -12,11 +12,10 @@ public enum ImStatus implements Status{
 	
 	C10000(10000,"ok","发送成功"),
 	C10001(10001,"offline","用户不在线"),
-	C10019(10019,"online","用户在线"),
 	C10002(
 			10002,"send failed"
 			,"消息发送失败,数据格式不正确,请参考:" +
-			"{'from':来源ID,'to':目标ID,'cmd':'消息命令码','createTime':消息创建时间(Long型)" +
+			"{'from':String来源ID,'to':目标ID,'cmd':'消息命令码','createTime':Long消息创建时间" +
 			",'msgType':Integer消息类型,'chatType':Integer聊天类型{0:未知,1:群聊,2:私聊}" +
 			",'group_id':String群组id仅在chatType为1时需要,'content':内容}"
 	),
@@ -36,9 +35,21 @@ public enum ImStatus implements Status{
 	C10016(10016,"get user message ok!","获取离线消息成功!"),
 	C10017(10017,"cmd failed!","未知的cmd命令!"),
 	C10018(10018,"get user message ok!","获取历史消息成功!"),
+	C10019(10019,"online","用户在线"),
 	C10020(10020,"Invalid verification!","不合法校验"),
 	C10021(10021,"close ok!","关闭成功"),
-	C10022(10022,"online service account is zero!","当前平台部门下没有在线客服,请稍后!");
+	C10022(10022,"online service account is zero!","当前平台部门下没有在线客服,请稍后!"),
+	C10023(
+			10023,"cancel message failed!"
+			,"撤销消息失败,数据格式不正确,请参考:{ 'id':String消息id,'createTime':Long消息创建时间,'from':String来源ID" +
+			",'msgState':Integer消息状态{0:正常消息,1:撤销消息},'group_id':String群组id仅在chatType为1时需要" +
+			",'chatType':Integer聊天类型{0:未知,1:群聊,2:私聊} }!"
+	),
+	C10024(10024,"ok","消息撤销成功"),
+	C10025(10025,"cancel message failed!","消息撤销失败,当前撤销消息不是您发的消息,不可以被撤销!")
+
+	;
+
 
 	private int status;
 	

+ 20 - 4
jim-common/src/main/java/org/jim/common/message/MessageHelper.java

@@ -1,10 +1,7 @@
 package org.jim.common.message;
 
 import org.jim.common.listener.ImBindListener;
-import org.jim.common.packets.ChatBody;
-import org.jim.common.packets.Group;
-import org.jim.common.packets.User;
-import org.jim.common.packets.UserMessageData;
+import org.jim.common.packets.*;
 
 import java.util.List;
 /**
@@ -84,6 +81,25 @@ public interface MessageHelper {
 	 * @param chatBody
 	 */
 	public void writeMessage(String timelineTable , String timelineId , ChatBody chatBody);
+
+	/**
+	 * 移除redis中 需要撤销的消息
+	 * @param cancelMessageReqBody
+	 * @return {@link }
+	 * @author Darren
+	 * @date 2020/2/13 11:53
+	 */
+	public void removeCancelMessage(CancelMessageReqBody cancelMessageReqBody);
+
+	/**
+	 * 移除redis中 推送给 离线用户 的消息
+	 * @param cancelMessageReqBody
+	 * @return {@link }
+	 * @author Darren
+	 * @date 2020/2/13 11:53
+	 */
+	public void removeCancelMessageByOffOnlineUserId(String userId, CancelMessageReqBody cancelMessageReqBody);
+
 	/**
 	 * 移除群组用户
 	 * @param userId

+ 130 - 0
jim-common/src/main/java/org/jim/common/packets/CancelMessageReqBody.java

@@ -0,0 +1,130 @@
+package org.jim.common.packets;
+
+/**
+ * 撤销消息请求体
+ * @author Darren
+ * @date 2020/2/12 16:58
+ */
+public class CancelMessageReqBody extends Message  {
+
+    /**
+     * 发送用户id;
+     */
+    private String from;
+    /**
+     * 消息状态 0:正常消息 1:撤销消息
+     */
+    private Integer msgState;
+    /**
+     * 消息发到哪个群组;
+     */
+    private String group_id;
+    /**
+     * 聊天类型;(如公聊、私聊)
+     */
+    private Integer chatType;
+
+    private CancelMessageReqBody() {
+    }
+
+    private CancelMessageReqBody(String from, Integer msgState, String group_id, Integer chatType) {
+        this.from = from;
+        this.msgState = msgState;
+        this.group_id = group_id;
+        this.chatType = chatType;
+    }
+
+    public static CancelMessageReqBody.Builder newBuilder(){
+        return new CancelMessageReqBody.Builder();
+    }
+
+    public String getGroup_id() {
+        return group_id;
+    }
+
+    public CancelMessageReqBody setGroup_id(String group_id) {
+        this.group_id = group_id;
+        return this;
+    }
+
+    public Integer getChatType() {
+        return chatType;
+    }
+
+    public CancelMessageReqBody setChatType(Integer chatType) {
+        this.chatType = chatType;
+        return this;
+    }
+
+    public String getFrom() {
+        return from;
+    }
+
+    public CancelMessageReqBody setFrom(String from) {
+        this.from = from;
+        return this;
+    }
+
+    public Integer getMsgState() {
+        return msgState;
+    }
+
+    public CancelMessageReqBody setMsgState(Integer msgState) {
+        this.msgState = msgState;
+        return this;
+    }
+
+    public static class Builder extends Message.Builder<CancelMessageReqBody, CancelMessageReqBody.Builder> {
+
+        /**
+         * 发送用户id;
+         */
+        private String from;
+        /**
+         * 消息状态 0:正常消息 1:撤销消息
+         */
+        private Integer msgState;
+        /**
+         * 消息发到哪个群组;
+         */
+        private String group_id;
+        /**
+         * 聊天类型;(如公聊、私聊)
+         */
+        private Integer chatType;
+
+        public Builder(){};
+
+        public Builder setFrom(String from) {
+            this.from = from;
+            return this;
+        }
+
+        public Builder setMsgState(Integer msgState) {
+            this.msgState = msgState;
+            return this;
+        }
+
+        public Builder setGroup_id(String group_id) {
+            this.group_id = group_id;
+            return this;
+        }
+
+        public Builder setChatType(Integer chatType) {
+            this.chatType = chatType;
+            return this;
+        }
+
+        @Override
+        protected Builder getThis() {
+            return this;
+        }
+
+        @Override
+        public CancelMessageReqBody build() {
+            return new CancelMessageReqBody(this.from, this.msgState, this.group_id, this.chatType);
+        }
+    }
+
+
+}

+ 2 - 2
jim-common/src/main/java/org/jim/common/packets/ChatBody.java

@@ -126,9 +126,9 @@ public class ChatBody extends Message {
 		 * 消息发到哪个群组;
 		 */
 		private String group_id;
-		
+
 		public Builder(){};
-		
+
 		public Builder setFrom(String from) {
 			this.from = from;
 			return this;

+ 4 - 8
jim-common/src/main/java/org/jim/common/utils/ChatKit.java

@@ -32,21 +32,17 @@ public class ChatKit {
 	/**
 	 * 转换为聊天消息结构;
 	 * @param body
-	 * @param channelContext
 	 * @return
 	 */
 	public static ChatBody toChatBody(byte[] body,ChannelContext channelContext){
+		// 设置chatBodyid 和 创建时间
 		ChatBody chatReqBody = parseChatBody(body);
 		if(chatReqBody != null){
+			//如果fromId为空 则返回null
 			if(StringUtils.isEmpty(chatReqBody.getFrom())){
-				ImSessionContext imSessionContext = (ImSessionContext)channelContext.getAttribute();
-				User user = imSessionContext.getClient().getUser();
-				if(user != null){
-					chatReqBody.setFrom(user.getNick());
-				}else{
-					chatReqBody.setFrom(channelContext.getId());
-				}
+				return null;
 			}
+			return chatReqBody;
 		}
 		return chatReqBody;
 	}

+ 3 - 1
jim-server/src/main/java/org/jim/server/command/command.properties

@@ -5,7 +5,7 @@
 #\u9274\u6743\u8BF7\u6C42\u5904\u7406\u5668
 3 = org.jim.server.command.handler.AuthReqHandler
 #\u804A\u5929\u8BF7\u6C42\u5904\u7406\u5668
-11 = org.jim.server.command.handler.ChatReqHandler,org.jim.server.command.handler.processor.chat.DefaultAsyncChatMessageProcessor,com.cn.processor.IMChatAsyncChatMessageProcessor,org.jim.server.command.handler.processor.chat.AutoMessageProcessor
+11 = org.jim.server.command.handler.ChatReqHandler,org.jim.server.command.handler.processor.chat.DefaultAsyncChatMessageProcessor,com.cn.processor.IMChatAsyncChatMessageProcessor,org.jim.server.command.handler.processor.chat.IMAutoMessageProcessor
 #\u5173\u95ED\u3001\u9000\u51FA\u8BF7\u6C42\u5904\u7406\u5668
 14 = org.jim.server.command.handler.CloseReqHandler
 #\u63E1\u624B\u8BF7\u6C42\u5904\u7406\u5668(TCP\u534F\u8BAE\u63E1\u624B\u5904\u7406\u5668,WS\u534F\u8BAE\u63E1\u624B\u5904\u7406\u5668)
@@ -20,3 +20,5 @@
 17 = org.jim.server.command.handler.UserReqHandler
 #\u83B7\u53D6\u7528\u6237\u6D88\u606F\u5904\u7406\u5668
 19 = org.jim.server.command.handler.MessageReqHandler
+#\u64A4\u56DE\u6D88\u606F\u5904\u7406\u5668
+15 = org.jim.server.command.handler.IMCancelMessageHandler,org.jim.server.command.handler.processor.chat.IMCancelMessageProcessor

+ 1 - 1
jim-server/src/main/java/org/jim/server/command/handler/ChatReqHandler.java

@@ -36,7 +36,7 @@ public class ChatReqHandler extends AbstractCmdHandler {
 		if (packet.getBody() == null) {
 			throw new Exception("body is null");
 		}
-		ChatBody chatBody = ChatKit.toChatBody(packet.getBody(), channelContext);
+		ChatBody chatBody = ChatKit.toChatBody(packet.getBody(),channelContext);
 		packet.setBody(chatBody.toByte());
 		//聊天数据格式不正确
 		if(chatBody == null || chatBody.getChatType() == null){

+ 76 - 0
jim-server/src/main/java/org/jim/server/command/handler/IMCancelMessageHandler.java

@@ -0,0 +1,76 @@
+package org.jim.server.command.handler;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.jim.common.ImAio;
+import org.jim.common.ImConst;
+import org.jim.common.ImPacket;
+import org.jim.common.packets.*;
+import org.jim.server.command.AbstractCmdHandler;
+import org.jim.server.command.handler.processor.chat.IMCancelMessageProcessor;
+import org.jim.server.enums.CancelMessageTypeEnum;
+import org.jim.server.util.CancelMessageKit;
+import org.tio.core.Aio;
+import org.tio.core.ChannelContext;
+
+import java.util.List;
+
+/**
+ * 消息撤销处理器
+ * @author Darren
+ * @date 2020/2/12 10:00
+ */
+public class IMCancelMessageHandler extends AbstractCmdHandler {
+
+    @Override
+    public Command command() {
+        return Command.COMMAND_CANCEL_MSG_REQ;
+    }
+
+    /**
+     * 消息撤销处理方法
+     * {
+     *   'id':String消息id,
+     *   'createTime':Long消息创建时间,
+     *   'from':String来源ID,
+     *   'msgState':Integer消息状态{0:正常消息,1:撤销消息},
+     *   'group_id':String群组id仅在chatType为1时需要,
+     *   'chatType':Integer聊天类型{0:未知,1:群聊,2:私聊
+     * }
+     * @param packet
+     * @param channelContext
+     * @return
+     * @throws Exception
+     */
+    @Override
+    public ImPacket handler(ImPacket packet, ChannelContext channelContext) throws Exception {
+        if( null == packet.getBody() ){
+            Aio.remove(channelContext, "body is null");
+            return null;
+        }
+        //调用 CancelMessageKit.toCancelMessageBody() 方法 转换消息结构 并验证 参数
+        CancelMessageReqBody cancelMessageReqBody = CancelMessageKit.toCancelMessageBody(packet.getBody());
+        if( cancelMessageReqBody == null ){
+            ImPacket respChatPacket = CancelMessageKit.dataInCorrectRespPacket(channelContext);
+            return respChatPacket;
+        }
+        List<IMCancelMessageProcessor> cancelMessageProcessor = this.getProcessor(ImConst.IM_CANCEL_MESSAGE_PROCESSOR, IMCancelMessageProcessor.class);
+        if(CollectionUtils.isEmpty(cancelMessageProcessor)){
+            Aio.remove(channelContext, "no cancel message serviceHandler processor!");
+            return null;
+        }
+        if( CancelMessageTypeEnum.CANCEL.getKey() == cancelMessageReqBody.getMsgState() ){
+            ImPacket respPacket = cancelMessageProcessor.get(0).handler(cancelMessageReqBody,channelContext);
+
+            ImPacket cancelMessagePacket = new ImPacket(Command.COMMAND_CANCEL_MSG_REQ,new RespBody(Command.COMMAND_CANCEL_MSG_REQ, cancelMessageReqBody).toByte());
+            cancelMessagePacket.setSynSeq(packet.getSynSeq());
+            if(ChatType.CHAT_TYPE_PUBLIC.getNumber() == cancelMessageReqBody.getChatType()){
+                String group_id = cancelMessageReqBody.getGroup_id();
+                //告知 群组 此消息 撤销
+                ImAio.sendToGroup(group_id, cancelMessagePacket);
+                //发送成功响应包
+                return CancelMessageKit.sendSuccessRespPacket(channelContext);
+            }
+        }
+        return null;
+    }
+}

+ 4 - 4
jim-server/src/main/java/org/jim/server/command/handler/LoginReqHandler.java

@@ -19,7 +19,7 @@ import org.jim.common.utils.JsonKit;
 import org.jim.server.ImServerGroupContext;
 import org.jim.server.command.AbstractCmdHandler;
 import org.jim.server.command.CommandManager;
-import org.jim.server.command.handler.processor.chat.AutoMsgQueueRunnable;
+import org.jim.server.command.handler.processor.chat.IMAutoMsgQueueRunnable;
 import org.jim.server.command.handler.processor.login.LoginCmdProcessor;
 import org.jim.server.enums.VisitTypeEnum;
 import org.slf4j.Logger;
@@ -123,11 +123,11 @@ public class LoginReqHandler extends AbstractCmdHandler {
 	private void sendServiceAccountAutoMsg(User user) {
 		log.info("执行自动发送客服设置的消息...");
 		ImServerGroupContext groupContext = (ImServerGroupContext)imConfig.getGroupContext();
-		AutoMsgQueueRunnable autoMsgQueueRunnable = new AutoMsgQueueRunnable(groupContext.getTimExecutor());
+		IMAutoMsgQueueRunnable IMAutoMsgQueueRunnable = new IMAutoMsgQueueRunnable(groupContext.getTimExecutor());
 		List<Group> groups = user.getGroups();
 		for (Group group : groups) {
-			autoMsgQueueRunnable.addMsg(group);
-			autoMsgQueueRunnable.getExecutor().execute(autoMsgQueueRunnable);
+			IMAutoMsgQueueRunnable.addMsg(group);
+			IMAutoMsgQueueRunnable.getExecutor().execute(IMAutoMsgQueueRunnable);
 		}
 
 	}

+ 13 - 11
jim-server/src/main/java/org/jim/server/command/handler/processor/chat/AutoMessageProcessor.java

@@ -24,9 +24,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * @author Darren
  * @date 2020/2/11 10:07
  */
-public class AutoMessageProcessor implements CmdProcessor, ImConst {
+public class IMAutoMessageProcessor implements CmdProcessor, ImConst {
 
-    private static Logger log = LoggerFactory.getLogger(AutoMessageProcessor.class);
+    private static Logger log = LoggerFactory.getLogger(IMAutoMessageProcessor.class);
 
     /**
      * 自动消息处理方法
@@ -48,6 +48,7 @@ public class AutoMessageProcessor implements CmdProcessor, ImConst {
                 //已经睡眠的时间
                 int alreadySleepTime = 0;
                 for(AutoReplyMessage message : list) {
+
                     //需要睡眠几秒 单位 s
                     Integer needSleepTime = message.getInt("sendtime");
                     needSleepTime -= alreadySleepTime;
@@ -87,6 +88,7 @@ public class AutoMessageProcessor implements CmdProcessor, ImConst {
                             readLock.unlock();
                         }
                     }
+
                 }
             }
         }
@@ -103,17 +105,17 @@ public class AutoMessageProcessor implements CmdProcessor, ImConst {
      * @date 2020/2/11 15:01
      */
     private ChatBody createChatBody(AutoReplyMessage message, String accountRoleDepartmentId, String group_id){
-        ChatBody chatBody = ChatBody.newBuilder().build();
-        chatBody.setChatType(ChatType.CHAT_TYPE_PUBLIC.getNumber());
-        chatBody.setContent(message.getStr("content"));
-        chatBody.setFrom(accountRoleDepartmentId);
-        chatBody.setCreateTime( System.currentTimeMillis() );
-        chatBody.setGroup_id(group_id);
-        chatBody.setMsgType(MsgType.MSG_TYPE_TEXT.getNumber());
-        chatBody.setId(UUID.randomUUID().toString().replace("-",""));
         Map<String, Object> map = new HashMap<>();
         //发送的消息类型 0:客户/游客发送的消息 1:客服发送的消息 2:客服自动发送的消息
         map.put("sendType",2);
+        ChatBody chatBody = ChatBody.newBuilder().build();
+        chatBody.setChatType(ChatType.CHAT_TYPE_PUBLIC.getNumber())
+                .setContent(message.getStr("content"))
+                .setFrom(accountRoleDepartmentId)
+                .setGroup_id(group_id)
+                .setMsgType(MsgType.MSG_TYPE_TEXT.getNumber())
+                .setId(UUID.randomUUID().toString().replace("-",""));
+        chatBody.setCreateTime(System.currentTimeMillis());
         chatBody.setExtras(new JSONObject(map));
         return chatBody;
     }
@@ -143,6 +145,6 @@ public class AutoMessageProcessor implements CmdProcessor, ImConst {
 
     @Override
     public String name() {
-        return AUTO_MESSAGE_PROCESSOR;
+        return IM_AUTO_MESSAGE_PROCESSOR;
     }
 }

+ 7 - 8
jim-server/src/main/java/org/jim/server/command/handler/processor/chat/AutoMsgQueueRunnable.java

@@ -7,7 +7,6 @@ import org.jim.server.command.CommandManager;
 import org.jim.server.command.handler.ChatReqHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.tio.core.ChannelContext;
 import org.tio.utils.thread.pool.AbstractQueueRunnable;
 
 import java.util.concurrent.Executor;
@@ -16,16 +15,16 @@ import java.util.concurrent.Executor;
  * @author Darren
  * @date 2020/2/11 9:31
  */
-public class AutoMsgQueueRunnable extends AbstractQueueRunnable<Group> {
+public class IMAutoMsgQueueRunnable extends AbstractQueueRunnable<Group> {
 
-    private static Logger log = LoggerFactory.getLogger(AutoMsgQueueRunnable.class);
+    private static Logger log = LoggerFactory.getLogger(IMAutoMsgQueueRunnable.class);
 
-    private AutoMessageProcessor autoMessageProcessor;
+    private IMAutoMessageProcessor iMAutoMessageProcessor;
 
-    public AutoMsgQueueRunnable(Executor executor) {
+    public IMAutoMsgQueueRunnable(Executor executor) {
         super(executor);
         ChatReqHandler chatReqHandler = CommandManager.getCommand(Command.COMMAND_CHAT_REQ,ChatReqHandler.class);
-        autoMessageProcessor = chatReqHandler.getProcessor(ImConst.AUTO_MESSAGE_PROCESSOR,AutoMessageProcessor.class).get(0);
+        iMAutoMessageProcessor = chatReqHandler.getProcessor(ImConst.IM_AUTO_MESSAGE_PROCESSOR, IMAutoMessageProcessor.class).get(0);
     }
 
     @Override
@@ -47,8 +46,8 @@ public class AutoMsgQueueRunnable extends AbstractQueueRunnable<Group> {
         //ConcurrentLinkedQueue 中 poll() 方法:
         //检索并删除此队列的头部,如果此队列为空,则返回 null
         while( (group = msgQueue.poll()) != null ){
-            if(autoMessageProcessor != null){
-                autoMessageProcessor.handler(group);
+            if(iMAutoMessageProcessor != null){
+                iMAutoMessageProcessor.handler(group);
             }
         }
     }

+ 91 - 0
jim-server/src/main/java/org/jim/server/command/handler/processor/chat/IMCancelMessageProcessor.java

@@ -0,0 +1,91 @@
+package org.jim.server.command.handler.processor.chat;
+
+import org.jim.common.ImConfig;
+import org.jim.common.ImConst;
+import org.jim.common.ImPacket;
+import org.jim.common.message.MessageHelper;
+import org.jim.common.packets.CancelMessageReqBody;
+import org.jim.common.packets.ChatType;
+import org.jim.common.utils.ChatKit;
+import org.jim.server.command.CommandManager;
+import org.jim.server.command.handler.processor.CmdProcessor;
+import org.jim.server.enums.CancelMessageTypeEnum;
+import org.jim.server.model.ConversationRecord;
+import org.jim.server.util.CancelMessageKit;
+import org.tio.core.ChannelContext;
+
+import java.util.List;
+
+/**
+ * @author Darren
+ * @date 2020/2/12 10:35
+ */
+public class IMCancelMessageProcessor implements CmdProcessor {
+
+    protected ImConfig imConfig;
+
+    /**
+     * 修改redis中 和 mysql 中的 撤回消息状态
+     * @param cancelMessageReqBody
+     * @param channelContext
+     * @return
+     */
+    public ImPacket handler(CancelMessageReqBody cancelMessageReqBody, ChannelContext channelContext){
+        if(imConfig == null) {
+            imConfig = CommandManager.getImConfig();
+        }
+        //判断 fromId 与 数据库中的fromId 是否一致
+        String chatBodyId = cancelMessageReqBody.getId();
+        ConversationRecord conversationRecord = ConversationRecord.dao
+                .findFirst("select * from sw_conversation_record where chat_body_id = ? ", chatBodyId);
+        String oldFromId = conversationRecord.getStr("from_id");
+        if( oldFromId.equals(cancelMessageReqBody.getFrom()) ){
+            //表示当前撤销的信息 是 当前人 发的消息 可以撤销
+            //1.修改 sw_conversation_record 数据中相关对应字段 is_recall 消息是否撤回 1:是 0:否
+            conversationRecord.set("is_recall", CancelMessageTypeEnum.CANCEL.getKey()).update();
+            //2. 对redis 对应数据 进行更改
+            if(ImConst.ON.equals(imConfig.getIsStore())){
+                //更改reids中 已经 持久化的 消息
+                if(ChatType.CHAT_TYPE_PUBLIC.getNumber() == cancelMessageReqBody.getChatType()){
+                    removeGroupMessage(cancelMessageReqBody);
+                }
+            }
+            return CancelMessageKit.sendSuccessRespPacket(channelContext);
+        }
+        return CancelMessageKit.fromIdInCorrectRespPacket(channelContext);
+    }
+
+    /**
+     * 撤回消息 需要删除已经在redis中持久化的消息(撤回的消息)
+     * @param cancelMessageReqBody
+     */
+    private void removeGroupMessage(CancelMessageReqBody cancelMessageReqBody) {
+        MessageHelper messageHelper = imConfig.getMessageHelper();
+        String group_id = cancelMessageReqBody.getGroup_id();
+        messageHelper.removeCancelMessage(cancelMessageReqBody);
+        List<String> userIds = messageHelper.getGroupUsers(group_id);
+        //将群需要消息同步到所有的群成员
+        for(String userId : userIds){
+            boolean isOnline = false;
+            if(ImConst.ON.equals(imConfig.getIsStore()) && ImConst.ON.equals(imConfig.getIsCluster())){
+                isOnline = messageHelper.isOnline(userId);
+            }else{
+                isOnline = ChatKit.isOnline(userId,imConfig);
+            }
+            if(!isOnline){
+                messageHelper.removeCancelMessageByOffOnlineUserId(userId, cancelMessageReqBody);
+            }
+        }
+    }
+
+    @Override
+    public boolean isProtocol(ChannelContext channelContext) {
+        return true;
+    }
+
+    @Override
+    public String name() {
+        return ImConst.IM_CANCEL_MESSAGE_PROCESSOR;
+    }
+
+}

+ 28 - 0
jim-server/src/main/java/org/jim/server/enums/CancelMessageTypeEnum.java

@@ -0,0 +1,28 @@
+package org.jim.server.enums;
+
+/**
+ * 消息撤回状态 1:撤回 0:未撤回
+ * @author Darren
+ * @date 2020/2/13 10:33
+ */
+public enum CancelMessageTypeEnum {
+
+    CANCEL(1,"撤回状态"),
+    NOT_CANCEL(0,"正常状态");
+
+    private int key;
+    private String desc;
+
+    private CancelMessageTypeEnum(int key, String desc){
+        this.key = key;
+        this.desc = desc;
+    }
+
+    public int getKey() {
+        return key;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+}

+ 11 - 4
jim-server/src/main/java/org/jim/server/helper/db/MysqlMessageHelper.java

@@ -4,10 +4,7 @@ import java.util.List;
 
 import org.jim.common.listener.ImBindListener;
 import org.jim.common.message.MessageHelper;
-import org.jim.common.packets.ChatBody;
-import org.jim.common.packets.Group;
-import org.jim.common.packets.User;
-import org.jim.common.packets.UserMessageData;
+import org.jim.common.packets.*;
 
 /**
  * Mysql获取持久化+同步消息助手;
@@ -41,6 +38,16 @@ public class MysqlMessageHelper implements MessageHelper {
 	}
 
 	@Override
+	public void removeCancelMessage(CancelMessageReqBody cancelMessageReqBody) {
+		// TODO Auto-generated method stub
+	}
+
+	@Override
+	public void removeCancelMessageByOffOnlineUserId(String userId, CancelMessageReqBody cancelMessageReqBody) {
+		// TODO Auto-generated method stub
+	}
+
+	@Override
 	public void removeGroupUser(String userid, String group_id) {
 		// TODO Auto-generated method stub
 		

+ 69 - 4
jim-server/src/main/java/org/jim/server/helper/redis/RedisMessageHelper.java

@@ -9,10 +9,7 @@ import org.jim.common.cache.redis.RedisCache;
 import org.jim.common.cache.redis.RedisCacheManager;
 import org.jim.common.listener.ImBindListener;
 import org.jim.common.message.AbstractMessageHelper;
-import org.jim.common.packets.ChatBody;
-import org.jim.common.packets.Group;
-import org.jim.common.packets.User;
-import org.jim.common.packets.UserMessageData;
+import org.jim.common.packets.*;
 import org.jim.common.utils.ChatKit;
 import org.jim.common.utils.JsonKit;
 import org.slf4j.Logger;
@@ -22,6 +19,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Redis获取持久化+同步消息助手;
@@ -98,6 +96,60 @@ public class RedisMessageHelper extends AbstractMessageHelper{
 		RedisCacheManager.getCache(timelineTable).sortSetPush(timelineId, score, chatBody);
 	}
 
+	/**
+	 * 移除redis中 需要撤销的消息
+ 	 * @param cancelMessageReqBody
+	 * @return {@link }
+	 * @author Darren
+	 * @date 2020/2/13 11:53
+	 */
+	@Override
+	public void removeCancelMessage(CancelMessageReqBody cancelMessageReqBody){
+		String key = GROUP + SUBFIX + cancelMessageReqBody.getGroup_id();
+		List<String> messages = storeCache.sortSetGetAll(key);
+		if(CollectionUtils.isNotEmpty(messages)){
+			List<ChatBody> chatBodies = JsonKit.toArray(messages, ChatBody.class);
+			List<String> chatBodyIds = chatBodies.stream().map(ChatBody::getId).collect(Collectors.toList());
+			if(chatBodyIds.contains(cancelMessageReqBody.getId())){
+				storeCache.remove(key);
+				Iterator<ChatBody> it = chatBodies.iterator();
+				while(it.hasNext()){
+					ChatBody chatBody = it.next();
+					if( !chatBody.getId().equals(cancelMessageReqBody.getId()) ){
+						storeCache.sortSetPush(key,chatBody.getCreateTime(),chatBody);
+					}
+				}
+			}
+		}
+	}
+
+	/**
+	 * 移除redis中 推送给 离线用户的消息 中需要撤销的消息
+	 * @param cancelMessageReqBody
+	 * @return {@link }
+	 * @author Darren
+	 * @date 2020/2/13 11:53
+	 */
+	@Override
+	public void removeCancelMessageByOffOnlineUserId(String userId, CancelMessageReqBody cancelMessageReqBody){
+		String key = GROUP + SUBFIX + cancelMessageReqBody.getGroup_id() + SUBFIX + userId;
+		List<String> messages = pushCache.sortSetGetAll(key);
+		if(CollectionUtils.isNotEmpty(messages)){
+			List<ChatBody> chatBodies = JsonKit.toArray(messages, ChatBody.class);
+			List<String> chatBodyIds = chatBodies.stream().map(ChatBody::getId).collect(Collectors.toList());
+			if(chatBodyIds.contains(cancelMessageReqBody.getId())){
+				pushCache.remove(key);
+				Iterator<ChatBody> it = chatBodies.iterator();
+				while(it.hasNext()){
+					ChatBody chatBody = it.next();
+					if( !chatBody.getId().equals(cancelMessageReqBody.getId()) ){
+						pushCache.sortSetPush(key,chatBody.getCreateTime(),chatBody);
+					}
+				}
+			}
+		}
+	}
+
 
 	@Override
 	public void addGroupUser(String userid, String group_id) {
@@ -156,6 +208,9 @@ public class RedisMessageHelper extends AbstractMessageHelper{
 		return null;
 	}
 
+	/**
+	 * 获取用户指定群组离线消息;
+	 */
 	@Override
 	public UserMessageData getGroupOfflineMessage(String userid, String groupid) {
 		String key = GROUP+SUBFIX+groupid+SUBFIX+userid;
@@ -197,6 +252,16 @@ public class RedisMessageHelper extends AbstractMessageHelper{
 		return messageData;
 	}
 
+	/**
+	 * 获取群组历史消息
+	 * @param userid
+	 * @param groupid
+	 * @param beginTime 消息区间开始时间
+	 * @param endTime 消息区间结束时间
+	 * @param offset 分页偏移量
+	 * @param count 数量
+	 * @return
+	 */
 	@Override
 	public UserMessageData getGroupHistoryMessage(String userid, String groupid,Double beginTime,Double endTime,Integer offset,Integer count) {
 		String key = GROUP+SUBFIX+groupid;

+ 116 - 0
jim-server/src/main/java/org/jim/server/util/CancelMessageKit.java

@@ -0,0 +1,116 @@
+package org.jim.server.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+import org.jim.common.ImPacket;
+import org.jim.common.ImStatus;
+import org.jim.common.http.HttpConst;
+import org.jim.common.packets.CancelMessageReqBody;
+import org.jim.common.packets.Command;
+import org.jim.common.packets.RespBody;
+import org.jim.common.utils.ImKit;
+import org.jim.common.utils.JsonKit;
+import org.tio.core.ChannelContext;
+
+import java.io.UnsupportedEncodingException;
+
+/**
+ * IM撤销消息 命令工具类
+ * @author Darren
+ * @date 2020/2/12 17:26
+ */
+public class CancelMessageKit {
+
+    private static Logger log = Logger.getLogger(CancelMessageKit.class);
+
+    /**
+     * 转换为消息结构;
+     * @param body
+     * @return
+     */
+    private static CancelMessageReqBody parseCancelMessageBody(byte[] body){
+        if(body == null) {
+            return null;
+        }
+        CancelMessageReqBody cancelMessageReqBody = null;
+        try {
+            String text = new String(body, HttpConst.CHARSET_NAME);
+            cancelMessageReqBody = JsonKit.toBean(text, CancelMessageReqBody.class);
+            if( null != cancelMessageReqBody){
+                return cancelMessageReqBody;
+            }
+        } catch (UnsupportedEncodingException e) {
+            log.error(e.toString());
+        }
+        return cancelMessageReqBody;
+    }
+
+    /**
+     * 验证消息体中的值
+     * @param cancelMessageReqBody
+     * @return
+     */
+    private static boolean validateCancelMessageBody(CancelMessageReqBody cancelMessageReqBody){
+        boolean b1 = StringUtils.isNotEmpty(cancelMessageReqBody.getId());
+        boolean b2 = StringUtils.isNotEmpty(cancelMessageReqBody.getGroup_id());
+        boolean b3 = StringUtils.isNotEmpty(cancelMessageReqBody.getFrom());
+        boolean b4 = null == cancelMessageReqBody.getMsgState();
+        boolean b5 = null == cancelMessageReqBody.getChatType();
+        boolean b6 = null == cancelMessageReqBody.getCreateTime();
+        return (b1 && b2 && b3 && b4 && b5 && b6);
+    }
+
+    /**
+     * 转换为消息结构;
+     * @param body
+     * @return
+     */
+    public static CancelMessageReqBody toCancelMessageBody(byte[] body){
+        CancelMessageReqBody cancelMessageReqBody = parseCancelMessageBody(body);
+        if( null != cancelMessageReqBody ){
+            if( validateCancelMessageBody(cancelMessageReqBody) ){
+                return cancelMessageReqBody;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * 消息数据格式不正确响应包
+     * @param channelContext
+     * @return imPacket
+     * @throws Exception
+     */
+    public static ImPacket dataInCorrectRespPacket(ChannelContext channelContext) {
+        RespBody chatDataInCorrectRespPacket = new RespBody(Command.COMMAND_CANCEL_MSG_RESP, ImStatus.C10023);
+        ImPacket respPacket = ImKit.ConvertRespPacket(chatDataInCorrectRespPacket, channelContext);
+        respPacket.setStatus(ImStatus.C10023);
+        return respPacket;
+    }
+
+    /**
+     * 消息数据格式不正确响应包
+     * @param channelContext
+     * @return imPacket
+     * @throws Exception
+     */
+    public static ImPacket fromIdInCorrectRespPacket(ChannelContext channelContext) {
+        RespBody chatDataInCorrectRespPacket = new RespBody(Command.COMMAND_CANCEL_MSG_RESP, ImStatus.C10025);
+        ImPacket respPacket = ImKit.ConvertRespPacket(chatDataInCorrectRespPacket, channelContext);
+        respPacket.setStatus(ImStatus.C10025);
+        return respPacket;
+    }
+
+    /**
+     * 消息撤销成功响应包
+     * @param channelContext
+     * @return
+     * @throws Exception
+     */
+    public static ImPacket sendSuccessRespPacket(ChannelContext channelContext) {
+        RespBody chatDataInCorrectRespPacket = new RespBody(Command.COMMAND_CANCEL_MSG_RESP,ImStatus.C10024);
+        ImPacket respPacket = ImKit.ConvertRespPacket(chatDataInCorrectRespPacket, channelContext);
+        respPacket.setStatus(ImStatus.C10024);
+        return respPacket;
+    }
+}

+ 0 - 11
server-chat/src/main/java/com/cn/listener/IMChatGroupListener.java

@@ -27,17 +27,6 @@ public class IMChatGroupListener extends ImGroupListener {
 
     private static Logger logger = LoggerFactory.getLogger(IMChatGroupListener.class);
 
-    private static JedisTemplate jedisTemplate = null;
-
-    static {
-        try {
-            jedisTemplate = JedisTemplate.me();
-        } catch (Exception e) {
-            logger.info("JedisTemplate初始化失败!");
-            e.printStackTrace();
-        }
-    }
-
     @Override
     public void onAfterUnbind(ChannelContext channelContext, String group) throws Exception {
 

+ 2 - 1
server-chat/src/main/java/com/cn/processor/IMChatAsyncChatMessageProcessor.java

@@ -46,7 +46,7 @@ public class IMChatAsyncChatMessageProcessor extends DefaultAsyncChatMessageProc
             ChatGroup group = ChatGroup.dao.findById(chatBody.getGroup_id());
             if( null != group){
                 GroupConversationMiddle groupConversationMiddle
-                        = getGroupConversationMiddleByGroupId(group.getLong("group_id"));
+                        = this.getGroupConversationMiddleByGroupId(group.getLong("group_id"));
                 if( null != groupConversationMiddle ){
                     Conversation conversation = Conversation.dao.findById(groupConversationMiddle.getLong("conversation_id"));
                     if( null != conversation){
@@ -74,6 +74,7 @@ public class IMChatAsyncChatMessageProcessor extends DefaultAsyncChatMessageProc
         conversationRecordAttrs.put("chat_body_id",chatBody.getId());
         conversationRecordAttrs.put("content",chatBody.getContent());
         conversationRecordAttrs.put("is_recall",0);
+        conversationRecordAttrs.put("create_time",sdf.format(chatBody.getCreateTime()));
         String fromId = chatBody.getFrom();
         // customerDepartmentId  visitorDepartmentId  serviceAccountRoleDepartmentId
         User user = ImAio.getUser(fromId);

+ 1 - 1
server-chat/src/main/resources/jim.properties

@@ -45,4 +45,4 @@ jim.redis.database = 1
 #redis\u7AEF\u53E3\u53F7
 jim.redis.port = 6379
 #redis\u5BC6\u7801
-jim.redis.auth =dsafsafd43243
+jim.redis.auth = dsafsafd43243