package com.cn.processor; import com.alibaba.fastjson.JSONObject; import org.apache.commons.collections4.CollectionUtils; import org.jim.common.ImSessionContext; import org.jim.common.cache.redis.RedisCache; import org.jim.common.cache.redis.RedisCacheManager; import org.jim.common.packets.*; import org.jim.server.model.ChatGroup; import org.jim.server.model.Conversation; import org.jim.server.model.ConversationRecord; import org.jim.common.ImAio; import org.jim.server.command.handler.processor.chat.DefaultAsyncChatMessageProcessor; import org.jim.server.enums.VisitTypeEnum; import org.jim.server.model.GroupConversationMiddle; import org.jim.server.util.SnowflakeIdUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 异步聊天持久化 * @author Darren * @date 2020/2/6 19:42 */ public class IMChatAsyncChatMessageProcessor extends DefaultAsyncChatMessageProcessor { private static Logger log = LoggerFactory.getLogger(IMChatAsyncChatMessageProcessor.class); private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //设置key的过期时间 30分钟 private static final int TIME_OUT = 30 * 60; private final String SUBFIX = ":"; private static RedisCache groupCache; private static RedisCache userCache; static{ RedisCacheManager.register(USER, Integer.MAX_VALUE, Integer.MAX_VALUE); RedisCacheManager.register(GROUP, Integer.MAX_VALUE, Integer.MAX_VALUE); RedisCacheManager.register(STORE, Integer.MAX_VALUE, Integer.MAX_VALUE); RedisCacheManager.register(PUSH, Integer.MAX_VALUE, Integer.MAX_VALUE); groupCache = RedisCacheManager.getCache(GROUP); userCache = RedisCacheManager.getCache(USER); } @Override public void doHandler(ChatBody chatBody, ChannelContext channelContext) { log.info("聊天记录持久化... 开始:" + sdf.format(new Date())); //聊天类型(如 0:未知 1:群聊、2:私聊) Integer chatType = chatBody.getChatType(); //消息类型;(如:0:text、1:image、2:voice、3:vedio、4:music、5:news) Integer msgType = chatBody.getMsgType(); if( ChatType.CHAT_TYPE_PUBLIC.getNumber() == chatType ){ //群聊 ChatGroup group = ChatGroup.dao.findById(chatBody.getGroup_id()); if( null != group){ String group_id = group.getStr("group_id"); GroupConversationMiddle groupConversationMiddle = this.getGroupConversationMiddleByGroupId(group_id); if( null != groupConversationMiddle ){ Conversation conversation = Conversation.dao.findById(groupConversationMiddle.getLong("conversation_id")); if( null != conversation){ ConversationRecord.dao ._setAttrs(this.getConversationRecordAttrs(chatBody,conversation,groupConversationMiddle,group)) .save(); } } } if(ON.equals(imConfig.getIsStore())){ //延长redis数据过期时间 this.extendRedisData(chatBody,channelContext); } } log.info("聊天记录持久化... 结束:" + sdf.format(new Date())); } /** * 延长redis数据过期时间 * @param chatBody * @param channelContext * @return {@link } * @author Darren * @date 2020/2/18 21:02 */ private void extendRedisData(ChatBody chatBody, ChannelContext channelContext){ ImSessionContext imSessionContext = (ImSessionContext)channelContext.getAttribute(); Client client = imSessionContext.getClient(); if(client != null) { User onlineUser = client.getUser(); if(onlineUser != null){ userCache.expire(chatBody.getFrom() + SUBFIX + TERMINAL + SUBFIX + onlineUser.getTerminal(),TIME_OUT); } } userCache.expire( chatBody.getFrom() + SUBFIX + GROUP,TIME_OUT); userCache.expire(chatBody.getFrom() + SUBFIX + INFO,TIME_OUT); userCache.expire(chatBody.getFrom() + SUBFIX + FRIENDS,TIME_OUT); groupCache.expire(chatBody.getGroup_id() + SUBFIX + USER,TIME_OUT); groupCache.expire(chatBody.getGroup_id() + SUBFIX + INFO,TIME_OUT); } /** * 组装ConversationRecord数据 * @param chatBody * @param conversation * @return */ private Map getConversationRecordAttrs( ChatBody chatBody , Conversation conversation , GroupConversationMiddle groupConversationMiddle , ChatGroup group ){ Map conversationRecordAttrs = new HashMap<>(); conversationRecordAttrs.put("id", SnowflakeIdUtils.getInstance().nextId()); conversationRecordAttrs.put("chat_body_id",chatBody.getId()); conversationRecordAttrs.put("content",chatBody.getContent()); conversationRecordAttrs.put("chat_body_msg_type",chatBody.getMsgType()); conversationRecordAttrs.put("is_recall",0); conversationRecordAttrs.put("create_time",sdf.format(chatBody.getCreateTime())); String fromId = chatBody.getFrom(); conversationRecordAttrs.put("chat_body_from",fromId); // customerDepartmentId visitorDepartmentId serviceAccountRoleDepartmentId User user = ImAio.getUser(fromId); String visitType = user.getVisitType(); if(VisitTypeEnum.CUSTOMER.getKey().equals(visitType)){ // CUSTOMER 客户 发送的消息 String companyId = user.getExtras().getString("customerId"); conversationRecordAttrs.put("from_id",Long.valueOf(companyId)); conversationRecordAttrs.put("to_id",conversation.getLong("service_account_id")); conversationRecordAttrs.put("type",0); conversationRecordAttrs.put("chat_body_to", group.getStr("service_account_role_department_middle_id")); }else if(VisitTypeEnum.VISITOR.getKey().equals(visitType)){ // VISITOR 游客 发送的消息 String visitorId = user.getExtras().getString("visitorId"); conversationRecordAttrs.put("from_id",Long.valueOf(visitorId)); conversationRecordAttrs.put("to_id",conversation.getLong("service_account_id")); conversationRecordAttrs.put("type",0); conversationRecordAttrs.put("chat_body_to", group.getStr("service_account_role_department_middle_id")); }else{ //SERVICEACCOUNT 客服 发送的消息 String serviceAccountId = user.getExtras().getString("serviceAccountId"); conversationRecordAttrs.put("from_id",Long.valueOf(serviceAccountId)); conversationRecordAttrs.put("to_id",conversation.getLong("user_id")); conversationRecordAttrs.put("chat_body_to", group.getStr("consumer_id")); JSONObject jsonObject = chatBody.getExtras(); if( null != jsonObject && jsonObject.size() > 0 && Integer.valueOf(2).equals(jsonObject.getInteger("sendType")) ){ //客服自动发送的消息 conversationRecordAttrs.put("type",jsonObject.getInteger("sendType")); }else{ //客服手动发送的消息 conversationRecordAttrs.put("type",1); //判断 客服 是否第一次回复 (依据:是之前 first_answer_time 字段 和 first_answer_lenth 字段 值 是否为空 ) Timestamp firstAnswerTime = conversation.getTimestamp("first_answer_time"); Integer firstAnswerLenth = conversation.getInt("first_answer_lenth"); if( null == firstAnswerTime && null == firstAnswerLenth ){ //说明客服是第一次回复 long currentTimeMillis = System.currentTimeMillis(); //当前时间的格林威治时间 毫秒数 Timestamp currentTime = new Timestamp(currentTimeMillis); Timestamp createTime = conversation.getTimestamp("create_time"); //会话创建时间的格林威治时间 毫秒数 long createTimeMillis = createTime.getTime(); if(currentTimeMillis > createTimeMillis){ conversation.set("first_answer_time",currentTime) .set("first_answer_lenth",Integer.valueOf( String.valueOf( (currentTimeMillis - createTimeMillis)/1000 ))) .update(); } } } } conversationRecordAttrs.put( "conversation_id" ,groupConversationMiddle.getLong("conversation_id") ); return conversationRecordAttrs; } /** * 获取最新的 GroupConversationMiddle 对象 (根据groupId) * @param * @return {@link {@link GroupConversationMiddle}} * @author Darren * @date 2020/2/11 19:03 */ private GroupConversationMiddle getGroupConversationMiddleByGroupId(String groupId){ StringBuilder sql = new StringBuilder(); sql.append(" select * from sw_group_conversation_middle where group_id = " + groupId) .append(" order by create_time desc limit 1"); return GroupConversationMiddle.dao.findFirst(sql.toString()); } }