IMChatAsyncChatMessageProcessor.java 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package com.cn.processor;
  2. import com.alibaba.fastjson.JSONObject;
  3. import org.apache.commons.collections4.CollectionUtils;
  4. import org.jim.common.ImSessionContext;
  5. import org.jim.common.cache.redis.RedisCache;
  6. import org.jim.common.cache.redis.RedisCacheManager;
  7. import org.jim.common.packets.*;
  8. import org.jim.server.model.ChatGroup;
  9. import org.jim.server.model.Conversation;
  10. import org.jim.server.model.ConversationRecord;
  11. import org.jim.common.ImAio;
  12. import org.jim.server.command.handler.processor.chat.DefaultAsyncChatMessageProcessor;
  13. import org.jim.server.enums.VisitTypeEnum;
  14. import org.jim.server.model.GroupConversationMiddle;
  15. import org.jim.server.util.SnowflakeIdUtils;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import org.tio.core.ChannelContext;
  19. import java.sql.Timestamp;
  20. import java.text.SimpleDateFormat;
  21. import java.util.Date;
  22. import java.util.HashMap;
  23. import java.util.List;
  24. import java.util.Map;
  25. /**
  26. * 异步聊天持久化
  27. * @author Darren
  28. * @date 2020/2/6 19:42
  29. */
  30. public class IMChatAsyncChatMessageProcessor extends DefaultAsyncChatMessageProcessor {
  31. private static Logger log = LoggerFactory.getLogger(IMChatAsyncChatMessageProcessor.class);
  32. private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  33. //设置key的过期时间 30分钟
  34. private static final int TIME_OUT = 30 * 60;
  35. private final String SUBFIX = ":";
  36. private static RedisCache groupCache;
  37. private static RedisCache userCache;
  38. static{
  39. RedisCacheManager.register(USER, Integer.MAX_VALUE, Integer.MAX_VALUE);
  40. RedisCacheManager.register(GROUP, Integer.MAX_VALUE, Integer.MAX_VALUE);
  41. RedisCacheManager.register(STORE, Integer.MAX_VALUE, Integer.MAX_VALUE);
  42. RedisCacheManager.register(PUSH, Integer.MAX_VALUE, Integer.MAX_VALUE);
  43. groupCache = RedisCacheManager.getCache(GROUP);
  44. userCache = RedisCacheManager.getCache(USER);
  45. }
  46. @Override
  47. public void doHandler(ChatBody chatBody, ChannelContext channelContext) {
  48. log.info("聊天记录持久化... 开始:" + sdf.format(new Date()));
  49. //聊天类型(如 0:未知 1:群聊、2:私聊)
  50. Integer chatType = chatBody.getChatType();
  51. //消息类型;(如:0:text、1:image、2:voice、3:vedio、4:music、5:news)
  52. Integer msgType = chatBody.getMsgType();
  53. if( ChatType.CHAT_TYPE_PUBLIC.getNumber() == chatType ){
  54. //群聊
  55. ChatGroup group = ChatGroup.dao.findById(chatBody.getGroup_id());
  56. if( null != group){
  57. String group_id = group.getStr("group_id");
  58. GroupConversationMiddle groupConversationMiddle
  59. = this.getGroupConversationMiddleByGroupId(group_id);
  60. if( null != groupConversationMiddle ){
  61. Conversation conversation = Conversation.dao.findById(groupConversationMiddle.getLong("conversation_id"));
  62. if( null != conversation){
  63. ConversationRecord.dao
  64. ._setAttrs(this.getConversationRecordAttrs(chatBody,conversation,groupConversationMiddle,group))
  65. .save();
  66. }
  67. }
  68. }
  69. if(ON.equals(imConfig.getIsStore())){
  70. //延长redis数据过期时间
  71. this.extendRedisData(chatBody,channelContext);
  72. }
  73. }
  74. log.info("聊天记录持久化... 结束:" + sdf.format(new Date()));
  75. }
  76. /**
  77. * 延长redis数据过期时间
  78. * @param chatBody
  79. * @param channelContext
  80. * @return {@link }
  81. * @author Darren
  82. * @date 2020/2/18 21:02
  83. */
  84. private void extendRedisData(ChatBody chatBody, ChannelContext channelContext){
  85. ImSessionContext imSessionContext = (ImSessionContext)channelContext.getAttribute();
  86. Client client = imSessionContext.getClient();
  87. if(client != null) {
  88. User onlineUser = client.getUser();
  89. if(onlineUser != null){
  90. userCache.expire(chatBody.getFrom() + SUBFIX + TERMINAL + SUBFIX + onlineUser.getTerminal(),TIME_OUT);
  91. }
  92. }
  93. userCache.expire( chatBody.getFrom() + SUBFIX + GROUP,TIME_OUT);
  94. userCache.expire(chatBody.getFrom() + SUBFIX + INFO,TIME_OUT);
  95. userCache.expire(chatBody.getFrom() + SUBFIX + FRIENDS,TIME_OUT);
  96. groupCache.expire(chatBody.getGroup_id() + SUBFIX + USER,TIME_OUT);
  97. groupCache.expire(chatBody.getGroup_id() + SUBFIX + INFO,TIME_OUT);
  98. }
  99. /**
  100. * 组装ConversationRecord数据
  101. * @param chatBody
  102. * @param conversation
  103. * @return
  104. */
  105. private Map<String, Object> getConversationRecordAttrs(
  106. ChatBody chatBody
  107. , Conversation conversation
  108. , GroupConversationMiddle groupConversationMiddle
  109. , ChatGroup group
  110. ){
  111. Map<String, Object> conversationRecordAttrs = new HashMap<>();
  112. conversationRecordAttrs.put("id", SnowflakeIdUtils.getInstance().nextId());
  113. conversationRecordAttrs.put("chat_body_id",chatBody.getId());
  114. conversationRecordAttrs.put("content",chatBody.getContent());
  115. conversationRecordAttrs.put("chat_body_msg_type",chatBody.getMsgType());
  116. conversationRecordAttrs.put("is_recall",0);
  117. conversationRecordAttrs.put("create_time",sdf.format(chatBody.getCreateTime()));
  118. String fromId = chatBody.getFrom();
  119. conversationRecordAttrs.put("chat_body_from",fromId);
  120. // customerDepartmentId visitorDepartmentId serviceAccountRoleDepartmentId
  121. User user = ImAio.getUser(fromId);
  122. String visitType = user.getVisitType();
  123. if(VisitTypeEnum.CUSTOMER.getKey().equals(visitType)){
  124. // CUSTOMER 客户 发送的消息
  125. String companyId = user.getExtras().getString("customerId");
  126. conversationRecordAttrs.put("from_id",Long.valueOf(companyId));
  127. conversationRecordAttrs.put("to_id",conversation.getLong("service_account_id"));
  128. conversationRecordAttrs.put("type",0);
  129. conversationRecordAttrs.put("chat_body_to", group.getStr("service_account_role_department_middle_id"));
  130. }else if(VisitTypeEnum.VISITOR.getKey().equals(visitType)){
  131. // VISITOR 游客 发送的消息
  132. String visitorId = user.getExtras().getString("visitorId");
  133. conversationRecordAttrs.put("from_id",Long.valueOf(visitorId));
  134. conversationRecordAttrs.put("to_id",conversation.getLong("service_account_id"));
  135. conversationRecordAttrs.put("type",0);
  136. conversationRecordAttrs.put("chat_body_to", group.getStr("service_account_role_department_middle_id"));
  137. }else{
  138. //SERVICEACCOUNT 客服 发送的消息
  139. String serviceAccountId = user.getExtras().getString("serviceAccountId");
  140. conversationRecordAttrs.put("from_id",Long.valueOf(serviceAccountId));
  141. conversationRecordAttrs.put("to_id",conversation.getLong("user_id"));
  142. conversationRecordAttrs.put("chat_body_to", group.getStr("consumer_id"));
  143. JSONObject jsonObject = chatBody.getExtras();
  144. if(
  145. null != jsonObject
  146. && jsonObject.size() > 0
  147. && Integer.valueOf(2).equals(jsonObject.getInteger("sendType"))
  148. ){
  149. //客服自动发送的消息
  150. conversationRecordAttrs.put("type",jsonObject.getInteger("sendType"));
  151. }else{
  152. //客服手动发送的消息
  153. conversationRecordAttrs.put("type",1);
  154. //判断 客服 是否第一次回复 (依据:是之前 first_answer_time 字段 和 first_answer_lenth 字段 值 是否为空 )
  155. Timestamp firstAnswerTime = conversation.getTimestamp("first_answer_time");
  156. Integer firstAnswerLenth = conversation.getInt("first_answer_lenth");
  157. if( null == firstAnswerTime && null == firstAnswerLenth ){
  158. //说明客服是第一次回复
  159. long currentTimeMillis = System.currentTimeMillis();
  160. //当前时间的格林威治时间 毫秒数
  161. Timestamp currentTime = new Timestamp(currentTimeMillis);
  162. Timestamp createTime = conversation.getTimestamp("create_time");
  163. //会话创建时间的格林威治时间 毫秒数
  164. long createTimeMillis = createTime.getTime();
  165. if(currentTimeMillis > createTimeMillis){
  166. conversation.set("first_answer_time",currentTime)
  167. .set("first_answer_lenth",Integer.valueOf( String.valueOf( (currentTimeMillis - createTimeMillis)/1000 )))
  168. .update();
  169. }
  170. }
  171. }
  172. }
  173. conversationRecordAttrs.put(
  174. "conversation_id"
  175. ,groupConversationMiddle.getLong("conversation_id")
  176. );
  177. return conversationRecordAttrs;
  178. }
  179. /**
  180. * 获取最新的 GroupConversationMiddle 对象 (根据groupId)
  181. * @param
  182. * @return {@link {@link GroupConversationMiddle}}
  183. * @author Darren
  184. * @date 2020/2/11 19:03
  185. */
  186. private GroupConversationMiddle getGroupConversationMiddleByGroupId(String groupId){
  187. StringBuilder sql = new StringBuilder();
  188. sql.append(" select * from sw_group_conversation_middle where group_id = " + groupId)
  189. .append(" order by create_time desc limit 1");
  190. return GroupConversationMiddle.dao.findFirst(sql.toString());
  191. }
  192. }