Browse Source

加入分布式锁

779513719 5 years ago
parent
commit
95003d6848

+ 3 - 1
jim-common/src/main/java/org/jim/common/ImStatus.java

@@ -53,7 +53,9 @@ public enum ImStatus implements Status{
 					",'group_id':String群组id仅在chatType为1时需要" +
 					",'chatType':Integer聊天类型{0:未知,1:群聊,2:私聊} }!"
 	),
-	C10027(10027,"ok","对方正在输入...")
+	C10027(10027,"ok","对方正在输入..."),
+	C10028(10028,"login failed !","登录失败!系统繁忙,请稍后再试!"),
+	C10029(10028,"system error!","系统错误,请稍后再试!")
 	;
 
 

+ 41 - 3
jim-server/src/main/java/org/jim/server/command/handler/LoginReqHandler.java

@@ -113,11 +113,13 @@ public class LoginReqHandler extends AbstractCmdHandler {
 		//判断当前用户 是否具有 查看所有客服权限 如果有 绑定群组
 		hasViewAllServiceAccountAuthority(channelContext,user);
 		//初始化 由客服主动发起聊天的群组
-		initIntoChatGroups(channelContext,user);
+		initIntoChatGroups(user);
 		loginServiceHandler.onSuccess(channelContext);
 		if( !VisitTypeEnum.SERVICEACCOUNT.getKey().equals(user.getVisitType()) ){
 			//自动发送客服设置的消息 (在存在群组时 => 相当于客户或游客登录后)
 			sendServiceAccountAutoMsg(user);
+			//绑定群组中的客服
+			bindGroupForServiceAccount(user);
 		}
 		//loginRespBody.clear();
 		ImPacket loginRespPacket = new ImPacket(Command.COMMAND_LOGIN_RESP, loginRespBody.toByte());
@@ -125,14 +127,50 @@ public class LoginReqHandler extends AbstractCmdHandler {
 	}
 
 	/**
+	 * 为群组绑定群组中的客服 (当前登录人是客户或者游客)
+	 * @param user 客服
+	 * @return {@link }
+	 * @author Darren
+	 * @date 2020/2/17 11:05
+	 */
+	private void bindGroupForServiceAccount(User user) {
+		List<Group> groups = user.getGroups();
+		if (CollectionUtils.isNotEmpty(groups)){
+			for(Group group : groups){
+				String serviceAccountId = group.getServiceAccountRoleDepartmentId();
+				//如果客服 在线 >> 把客服通道 绑定到群组
+				SetWithLock<ChannelContext> channelContexts = ImAio.getChannelContextsByUserId(serviceAccountId);
+				if( null != channelContexts && channelContexts.size() > 0 ){
+					User serviceAccount = ImAio.getUser(serviceAccountId);
+					if( null != serviceAccount ){
+						ReadLock readLock = channelContexts.getLock().readLock();
+						readLock.lock();
+						try {
+							Set<ChannelContext> channels = channelContexts.getObj();
+							for (ChannelContext channel : channels) {
+								//为客服绑定群组
+								this.bindUnbindGroup(channel, serviceAccount, groups);
+							}
+						} catch (Exception e) {
+							log.error(e.toString(),e);
+						} finally {
+							//解锁
+							readLock.unlock();
+						}
+					}
+				}
+			}
+		}
+	}
+
+	/**
 	 * 初始化 由客服主动发起聊天的群组
-	 * @param channelContext 客服通道
  	 * @param user 客服
 	 * @return {@link }
 	 * @author Darren
 	 * @date 2020/2/17 11:05
 	 */
-	private void initIntoChatGroups(ChannelContext channelContext, User user) throws Exception {
+	private void initIntoChatGroups(User user) throws Exception {
 		if(VisitTypeEnum.SERVICEACCOUNT.getKey().equals(user.getVisitType())){
 			if( user.getExtras().getBoolean("intoChat") ){
 				List<Group> intoChatGroups = user.getIntoChatGroups();

+ 58 - 29
server-chat/src/main/java/com/cn/processor/IMChatAsyncChatMessageProcessor.java

@@ -3,8 +3,10 @@ package com.cn.processor;
 import com.alibaba.fastjson.JSONObject;
 import org.apache.commons.collections4.CollectionUtils;
 import org.jim.common.ImSessionContext;
+import org.jim.common.cache.redis.JedisTemplate;
 import org.jim.common.cache.redis.RedisCache;
 import org.jim.common.cache.redis.RedisCacheManager;
+import org.jim.common.cache.redis.RedissonTemplate;
 import org.jim.common.packets.*;
 import org.jim.server.model.ChatGroup;
 import org.jim.server.model.Conversation;
@@ -14,6 +16,8 @@ import org.jim.server.command.handler.processor.chat.DefaultAsyncChatMessageProc
 import org.jim.server.enums.VisitTypeEnum;
 import org.jim.server.model.GroupConversationMiddle;
 import org.jim.server.util.SnowflakeIdUtils;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.tio.core.ChannelContext;
@@ -24,6 +28,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 异步聊天持久化
@@ -36,50 +41,74 @@ public class IMChatAsyncChatMessageProcessor extends DefaultAsyncChatMessageProc
 
     private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
+    private static RedissonClient redisson = null;
+
+    private static RedisCache groupCache;
+
+    private static RedisCache userCache;
+
     //设置key的过期时间 30分钟
     private static final int TIME_OUT = 30 * 60;
 
     private final String SUBFIX = ":";
 
-    private static RedisCache groupCache;
-
-    private static RedisCache userCache;
+    private final String SYNC_CHAT = "SYNC_CHAT";
 
     static{
-        RedisCacheManager.register(USER, Integer.MAX_VALUE, Integer.MAX_VALUE);
-        RedisCacheManager.register(GROUP, Integer.MAX_VALUE, Integer.MAX_VALUE);
-        RedisCacheManager.register(STORE, Integer.MAX_VALUE, Integer.MAX_VALUE);
-        RedisCacheManager.register(PUSH, Integer.MAX_VALUE, Integer.MAX_VALUE);
-        groupCache = RedisCacheManager.getCache(GROUP);
-        userCache = RedisCacheManager.getCache(USER);
+        try {
+            RedisCacheManager.register(USER, Integer.MAX_VALUE, Integer.MAX_VALUE);
+            RedisCacheManager.register(GROUP, Integer.MAX_VALUE, Integer.MAX_VALUE);
+            RedisCacheManager.register(STORE, Integer.MAX_VALUE, Integer.MAX_VALUE);
+            RedisCacheManager.register(PUSH, Integer.MAX_VALUE, Integer.MAX_VALUE);
+            groupCache = RedisCacheManager.getCache(GROUP);
+            userCache = RedisCacheManager.getCache(USER);
+            redisson = RedissonTemplate.me().getRedissonClient();
+        } catch (Exception e) {
+            log.info(" IMChatAsyncChatMessageProcessor 初始化失败!");
+            e.printStackTrace();
+        }
     }
 
     @Override
     public void doHandler(ChatBody chatBody, ChannelContext channelContext) {
         log.info("聊天记录持久化... 开始:" + sdf.format(new Date()));
-        //聊天类型(如 0:未知 1:群聊、2:私聊)
-        Integer chatType = chatBody.getChatType();
-        //消息类型;(如:0:text、1:image、2:voice、3:vedio、4:music、5:news)
-        Integer msgType = chatBody.getMsgType();
-        if( ChatType.CHAT_TYPE_PUBLIC.getNumber() == chatType ){
-            //群聊
-            ChatGroup group = ChatGroup.dao.findById(chatBody.getGroup_id());
-            if( null != group){
-                String group_id = group.getStr("group_id");
-                GroupConversationMiddle groupConversationMiddle
-                        = this.getGroupConversationMiddleByGroupId(group_id);
-                if( null != groupConversationMiddle ){
-                    Conversation conversation = Conversation.dao.findById(groupConversationMiddle.getLong("conversation_id"));
-                    if( null != conversation){
-                        ConversationRecord.dao
-                                ._setAttrs(this.getConversationRecordAttrs(chatBody,conversation,groupConversationMiddle,group))
-                                .save();
+        RLock rLock = redisson.getLock(SYNC_CHAT);
+        boolean res = false;
+        try {
+            res = rLock.tryLock(500, 5000, TimeUnit.MILLISECONDS);
+            if(res){
+                //聊天类型(如 0:未知 1:群聊、2:私聊)
+                Integer chatType = chatBody.getChatType();
+                //消息类型;(如:0:text、1:image、2:voice、3:vedio、4:music、5:news)
+                Integer msgType = chatBody.getMsgType();
+                if( ChatType.CHAT_TYPE_PUBLIC.getNumber() == chatType ){
+                    //群聊
+                    ChatGroup group = ChatGroup.dao.findById(chatBody.getGroup_id());
+                    if( null != group){
+                        String group_id = group.getStr("group_id");
+                        GroupConversationMiddle groupConversationMiddle
+                                = this.getGroupConversationMiddleByGroupId(group_id);
+                        if( null != groupConversationMiddle ){
+                            Conversation conversation = Conversation.dao.findById(groupConversationMiddle.getLong("conversation_id"));
+                            if( null != conversation){
+                                ConversationRecord.dao
+                                        ._setAttrs(this.getConversationRecordAttrs(chatBody,conversation,groupConversationMiddle,group))
+                                        .save();
+                            }
+                        }
+                    }
+                    if(ON.equals(imConfig.getIsStore())){
+                        //延长redis数据过期时间
+                        this.extendRedisData(chatBody,channelContext);
                     }
                 }
             }
-            if(ON.equals(imConfig.getIsStore())){
-                //延长redis数据过期时间
-                this.extendRedisData(chatBody,channelContext);
+        } catch (InterruptedException e) {
+            log.info("获取锁失败!");
+            e.printStackTrace();
+        } finally {
+            if(res){
+                rLock.unlock();
             }
         }
         log.info("聊天记录持久化... 结束:" + sdf.format(new Date()));

+ 54 - 29
server-chat/src/main/java/com/cn/service/IMChatLoginServiceProcessor.java

@@ -3,6 +3,7 @@ package com.cn.service;
 import com.alibaba.fastjson.JSONObject;
 import org.jim.common.ImAio;
 import org.jim.common.ImConfig;
+import org.jim.common.cache.redis.RedissonTemplate;
 import org.jim.common.message.MessageHelper;
 import org.jim.common.utils.JsonKit;
 import org.jim.server.command.CommandManager;
@@ -16,12 +17,15 @@ import org.jim.common.cache.redis.JedisTemplate;
 import org.jim.common.packets.*;
 import org.jim.server.command.handler.processor.login.LoginCmdProcessor;
 import org.jim.server.util.SnowflakeIdUtils;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.tio.core.ChannelContext;
 import org.tio.utils.lock.SetWithLock;
 
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -34,18 +38,23 @@ public class IMChatLoginServiceProcessor implements LoginCmdProcessor {
 
     private static JedisTemplate jedisTemplate = null;
 
-    protected static ImConfig imConfig;
+    protected static ImConfig imConfig = null;
+
+    private static RedissonClient redisson = null;
 
     private static final String GROUP_PROPERTY = " * ";
 
     private static final String CURRENT_SYSTEM_ALL_USER_ID = "CURRENT_SYSTEM_ALL_USER_ID";
 
+    private final String LOGIN = "LOGIN";
+
     static {
         try {
             imConfig = CommandManager.getImConfig();
             jedisTemplate = JedisTemplate.me();
+            redisson = RedissonTemplate.me().getRedissonClient();
         } catch (Exception e) {
-            logger.info("JedisTemplate 和 imConfig 初始化失败!");
+            logger.info("IMChatLoginServiceProcessor 初始化失败!");
             e.printStackTrace();
         }
     }
@@ -60,37 +69,53 @@ public class IMChatLoginServiceProcessor implements LoginCmdProcessor {
     @Override
     public LoginRespBody doLogin(LoginReqBody loginReqBody, ChannelContext channelContext) {
         logger.info("执行登录处理器doLogin()方法...");
-        //获取token
-        String token = loginReqBody.getToken();
+        RLock rLock = redisson.getLock(LOGIN);
         LoginRespBody loginRespBody;
-        User user = null;
-        if(StringUtils.isNotBlank(token)){
-            user = this.getUser(token);
-        }
-        if(user == null){
-            loginRespBody = new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10008);
-        }else{
-            Boolean noOnlineService = user.getExtras().getBoolean("noOnlineService");
-            if(noOnlineService != null && noOnlineService){
-                loginRespBody =  new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10022,user);
-            }else{
-                loginRespBody = new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10007,user);
-                String userJson = JsonKit.toJSONString(user);
-                Map<String,String> users = jedisTemplate.hashGetAll(CURRENT_SYSTEM_ALL_USER_ID);
-                if(!CollectionUtils.isEmpty(users)){
-                    boolean flag = true;
-                    for(Map.Entry<String,String> entry : users.entrySet()){
-                        if(entry.getKey() == user.getId() ){
-                            flag = false;
+        boolean res = false;
+        try {
+            res = rLock.tryLock(500, 5000, TimeUnit.MILLISECONDS);
+            if (res) {
+                //获取token
+                String token = loginReqBody.getToken();
+                User user = null;
+                if(StringUtils.isNotBlank(token)){
+                    user = this.getUser(token);
+                }
+                if(user == null){
+                    loginRespBody = new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10008);
+                }else{
+                    Boolean noOnlineService = user.getExtras().getBoolean("noOnlineService");
+                    if(noOnlineService != null && noOnlineService){
+                        loginRespBody =  new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10022,user);
+                    }else{
+                        loginRespBody = new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10007,user);
+                        String userJson = JsonKit.toJSONString(user);
+                        Map<String,String> users = jedisTemplate.hashGetAll(CURRENT_SYSTEM_ALL_USER_ID);
+                        if(!CollectionUtils.isEmpty(users)){
+                            boolean flag = true;
+                            for(Map.Entry<String,String> entry : users.entrySet()){
+                                if(entry.getKey() == user.getId() ){
+                                    flag = false;
+                                }
+                            }
+                            if(flag){
+                                //存储所有登录的用户 的 userId
+                                jedisTemplate.hashSet(CURRENT_SYSTEM_ALL_USER_ID, user.getId(), userJson);
+                            }
+                        }else{
+                            jedisTemplate.hashSet(CURRENT_SYSTEM_ALL_USER_ID, user.getId(), userJson);
                         }
                     }
-                    if(flag){
-                        //存储所有登录的用户 的 userId
-                        jedisTemplate.hashSet(CURRENT_SYSTEM_ALL_USER_ID, user.getId(), userJson);
-                    }
-                }else{
-                    jedisTemplate.hashSet(CURRENT_SYSTEM_ALL_USER_ID, user.getId(), userJson);
                 }
+            }else{
+                loginRespBody = new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10028);
+            }
+        } catch (InterruptedException e) {
+            loginRespBody = new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10029);
+            e.printStackTrace();
+        } finally {
+            if (res) {
+                rLock.unlock();
             }
         }
         return loginRespBody;