Browse Source

加入redis事务,sql事务

779513719 5 years ago
parent
commit
1b0eab2bf0

+ 1 - 0
.gitignore

@@ -4,4 +4,5 @@
 *.iml
 *.classpath
 *.project
+*.iml
 

+ 4 - 4
jim-common/src/main/java/org/jim/common/cache/redis/JedisTemplate.java

@@ -155,7 +155,7 @@ public  class JedisTemplate implements  Serializable{
            }  
            return result;  
        }  
-   }  
+   }
    /**
     * 模糊获取所有的key
     * @return
@@ -1002,7 +1002,7 @@ public  class JedisTemplate implements  Serializable{
    /**
     * 往有序集合sortSet中添加数据;
     * @param key
-    * @param score要排序的值
+    * @param score 要排序的值
     * @param value
     * @return
     */
@@ -1036,7 +1036,7 @@ public  class JedisTemplate implements  Serializable{
     * @param key
     * @param min score区间最小值
     * @param max scroe区间最大值
-    * @param offet 偏移量(类似LIMIT 0,10)
+    * @param offset 偏移量(类似LIMIT 0,10)
     * @param count 数量
     * @return
     */
@@ -1073,7 +1073,7 @@ public  class JedisTemplate implements  Serializable{
     * 将信息 message 发送到指定的频道 channel。 
     * 时间复杂度:O(N+M),其中 N 是频道 channel 的订阅者数量,而 M 则是使用模式订阅(subscribed patterns)的客户端的数量。 
     * @param channel 频道 
-    * @param List<message> 要发布的信息
+    * @param messages List<String> 要发布的信息
     */  
    public void publishAll(final String channel, final List<String> messages) {
 	   if(messages == null || messages.size() == 0) {

+ 72 - 0
jim-server/src/main/java/org/jim/server/util/TransactionKit.java

@@ -0,0 +1,72 @@
+package org.jim.server.util;
+
+import com.jfinal.plugin.activerecord.DbKit;
+
+import java.sql.SQLException;
+
+public class TransactionKit {
+
+    /**
+     * 开始事务
+     * @throws SQLException
+     */
+    public static void beginTransation() throws SQLException{
+        DbKit.getConfig().setThreadLocalConnection(DbKit.getConfig().getConnection());
+        DbKit.getConfig().getThreadLocalConnection().setAutoCommit(false);
+    }
+
+    /**
+     * 开始事务
+     * @param configName
+     * @throws SQLException
+     */
+    public static void beginTransation(String configName) throws SQLException {
+        DbKit.getConfig(configName).setThreadLocalConnection(DbKit.getConfig().getConnection());
+        DbKit.getConfig(configName).getThreadLocalConnection().setAutoCommit(false);
+    }
+
+    /**
+     * 事务回滚
+     * @throws SQLException
+     */
+    public static void rollback() throws SQLException {
+        DbKit.getConfig().getThreadLocalConnection().rollback();
+        DbKit.getConfig().getThreadLocalConnection().setAutoCommit(true);
+        DbKit.getConfig().removeThreadLocalConnection();
+    }
+
+    /**
+     * 事务回滚
+     * @param configName
+     * @throws SQLException
+     */
+    public static void rollback(String configName) throws SQLException {
+        DbKit.getConfig(configName).getThreadLocalConnection().rollback();
+        DbKit.getConfig(configName).getThreadLocalConnection().setAutoCommit(true);
+        DbKit.getConfig(configName).removeThreadLocalConnection();
+    }
+
+    /**
+     * 事务提交
+     * @throws SQLException
+     */
+    public static void commit() throws SQLException {
+        DbKit.getConfig().getThreadLocalConnection().commit();
+        DbKit.getConfig().getThreadLocalConnection().setAutoCommit(true);
+        DbKit.getConfig().removeThreadLocalConnection();
+    }
+
+    /**
+     * 事务提交
+     * @param configName
+     * @throws SQLException
+     */
+    public static void commit(String configName) throws SQLException {
+        DbKit.getConfig(configName).getThreadLocalConnection().commit();
+        DbKit.getConfig(configName).getThreadLocalConnection().setAutoCommit(true);
+        DbKit.getConfig(configName).removeThreadLocalConnection();
+    }
+
+
+
+}

+ 1 - 10
server-chat/src/main/java/com/cn/ServerChatStart.java

@@ -6,21 +6,16 @@ import com.cn.listener.IMChatGroupListener;
 import com.cn.processor.IMChatWsHandshakeProcessor;
 import com.cn.service.IMChatLoginServiceProcessor;
 import nl.basjes.shaded.org.springframework.util.CollectionUtils;
-import org.jim.common.ImAio;
 import org.jim.common.ImConfig;
 import org.jim.common.ImConst;
-import org.jim.common.ImPacket;
 import org.jim.common.cache.redis.JedisTemplate;
 import org.jim.common.config.PropertyImConfigBuilder;
-import org.jim.common.packets.CloseReqBody;
 import org.jim.common.packets.Command;
 import org.jim.common.packets.User;
 import org.jim.common.utils.JsonKit;
 import org.jim.server.ImServerStarter;
 import org.jim.server.command.CommandManager;
-import org.jim.server.command.handler.CloseReqHandler;
 import org.jim.server.command.handler.HandshakeReqHandler;
-import org.jim.server.command.handler.JoinGroupReqHandler;
 import org.jim.server.command.handler.LoginReqHandler;
 import org.jim.server.enums.*;
 import org.jim.server.model.ChatGroup;
@@ -31,16 +26,11 @@ import org.quartz.*;
 import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.tio.core.Aio;
-import org.tio.core.ChannelContext;
-import org.tio.utils.lock.SetWithLock;
 
 import java.sql.Timestamp;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 
 /**
  * @author Darren
@@ -146,6 +136,7 @@ public class ServerChatStart {
                                     .set("group_state", GroupStateEnum.OFF_LINE.getKey())
                                     .set("consumer_type", ConsumerTypeEnum.OFF_LINE.getKey())
                                     .update();
+                            //群组留言状态 1:留言 0:非留言
                             if(MessageStateEnum.NO.getKey() == chatGroup.getInt("message_state")){
                                 //群组状态 为 非留言状态
                                 //修改 sw_conversation 表中的  end_time 和 conversation_lenth

+ 2 - 0
server-chat/src/main/java/com/cn/processor/IMChatAsyncChatMessageProcessor.java

@@ -102,6 +102,8 @@ public class IMChatAsyncChatMessageProcessor extends DefaultAsyncChatMessageProc
                         this.extendRedisData(chatBody,channelContext);
                     }
                 }
+            }else{
+                log.info("IMChatAsyncChatMessageProcessor... 系统繁忙,获取锁失败!");
             }
         } catch (InterruptedException e) {
             log.info("获取锁失败!");

+ 69 - 31
server-chat/src/main/java/com/cn/service/IMChatLoginServiceProcessor.java

@@ -17,13 +17,17 @@ import org.jim.common.cache.redis.JedisTemplate;
 import org.jim.common.packets.*;
 import org.jim.server.command.handler.processor.login.LoginCmdProcessor;
 import org.jim.server.util.SnowflakeIdUtils;
+import org.jim.server.util.TransactionKit;
 import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.tio.core.ChannelContext;
 import org.tio.utils.lock.SetWithLock;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Transaction;
 
+import java.sql.SQLException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -72,45 +76,40 @@ public class IMChatLoginServiceProcessor implements LoginCmdProcessor {
         RLock rLock = redisson.getLock(LOGIN);
         LoginRespBody loginRespBody;
         boolean res = false;
+        Transaction transaction = null;
         try {
             res = rLock.tryLock(500, 5000, TimeUnit.MILLISECONDS);
             if (res) {
+                Jedis jedis = jedisTemplate.getSingletonJedis();
+                //开启redis事务
+                transaction = jedis.multi();
+                //开启sql事务,关闭自动提交,改为手动提交
+                TransactionKit.beginTransation();
                 //获取token
                 String token = loginReqBody.getToken();
                 User user = null;
                 if(StringUtils.isNotBlank(token)){
                     user = this.getUser(token);
                 }
-                if(user == null){
-                    loginRespBody = new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10008);
-                }else{
-                    Boolean noOnlineService = user.getExtras().getBoolean("noOnlineService");
-                    if(noOnlineService != null && noOnlineService){
-                        loginRespBody =  new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10022,user);
-                    }else{
-                        loginRespBody = new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10007,user);
-                        String userJson = JsonKit.toJSONString(user);
-                        Map<String,String> users = jedisTemplate.hashGetAll(CURRENT_SYSTEM_ALL_USER_ID);
-                        if(!CollectionUtils.isEmpty(users)){
-                            boolean flag = true;
-                            for(Map.Entry<String,String> entry : users.entrySet()){
-                                if(entry.getKey() == user.getId() ){
-                                    flag = false;
-                                }
-                            }
-                            if(flag){
-                                //存储所有登录的用户 的 userId
-                                jedisTemplate.hashSet(CURRENT_SYSTEM_ALL_USER_ID, user.getId(), userJson);
-                            }
-                        }else{
-                            jedisTemplate.hashSet(CURRENT_SYSTEM_ALL_USER_ID, user.getId(), userJson);
-                        }
-                    }
-                }
+                loginRespBody = this.disposeLoginData(user);
+                int a = 4/0;
+                //提交sql事务,关闭手动提交,开启自动提交
+                TransactionKit.commit();
+                //提交redis事务
+                transaction.exec();
             }else{
                 loginRespBody = new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10028);
             }
-        } catch (InterruptedException e) {
+        } catch (Exception e) {
+            if(transaction != null){
+                transaction.discard();
+            }
+            try {
+                //sql事务回滚
+                TransactionKit.rollback();
+            } catch (SQLException ex) {
+                ex.printStackTrace();
+            }
             loginRespBody = new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10029);
             e.printStackTrace();
         } finally {
@@ -122,6 +121,42 @@ public class IMChatLoginServiceProcessor implements LoginCmdProcessor {
     }
 
     /**
+     * 处理登录数据
+     * @param user
+     * @return
+     */
+    private LoginRespBody disposeLoginData(User user){
+        LoginRespBody loginRespBody;
+        if(user == null){
+            loginRespBody = new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10008);
+        }else{
+            Boolean noOnlineService = user.getExtras().getBoolean("noOnlineService");
+            if(noOnlineService != null && noOnlineService){
+                loginRespBody =  new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10022,user);
+            }else{
+                loginRespBody = new LoginRespBody(Command.COMMAND_LOGIN_RESP,ImStatus.C10007,user);
+                String userJson = JsonKit.toJSONString(user);
+                Map<String,String> users = jedisTemplate.hashGetAll(CURRENT_SYSTEM_ALL_USER_ID);
+                if(!CollectionUtils.isEmpty(users)){
+                    boolean flag = true;
+                    for(Map.Entry<String,String> entry : users.entrySet()){
+                        if(entry.getKey() == user.getId() ){
+                            flag = false;
+                        }
+                    }
+                    if(flag){
+                        //存储所有登录的用户 的 userId
+                        jedisTemplate.hashSet(CURRENT_SYSTEM_ALL_USER_ID, user.getId(), userJson);
+                    }
+                }else{
+                    jedisTemplate.hashSet(CURRENT_SYSTEM_ALL_USER_ID, user.getId(), userJson);
+                }
+            }
+        }
+        return loginRespBody;
+    }
+
+    /**
      * 通过token 获取用户信息
      * @param token
      * @return {@link {@link User}}
@@ -147,9 +182,11 @@ public class IMChatLoginServiceProcessor implements LoginCmdProcessor {
                 if( VisitTypeEnum.VISITOR.getKey().equals(visitType) ){
                     //当前访问人为游客
                     user = this.createVisitorData(map);
-                    //修改游客在线状态
+                    //判断 是否有在线客服
                     Boolean noOnlineService = user.getExtras().getBoolean("noOnlineService");
                     if(!noOnlineService){
+                        //存在在线客服
+                        //修改游客在线状态
                         VisitorDepartmentMiddle.dao.findById(user.getId()).set("online",0).update();
                     }
                 }
@@ -239,6 +276,7 @@ public class IMChatLoginServiceProcessor implements LoginCmdProcessor {
             //游客对应客服存在 且 在线; isOnline=true;
             isOnline = serviceAccountContainer.containsKey(accountRoleDepartmentId);
         }
+        //有没有在线客服
         user.getExtras().put("noOnlineService",false);
         if(!isOnline){
             // 从当前在线的客服中 获取 连接客户数最少的 客服
@@ -638,7 +676,7 @@ public class IMChatLoginServiceProcessor implements LoginCmdProcessor {
     }
 
     /**
-     * 初始化 由客服主动发起聊天的群组
+     * 初始化 由 客服 向客户/游客 主动发起聊天的群组
      * @param user
      * @return {@link {@link List< Group>}}
      * @author Darren
@@ -702,7 +740,7 @@ public class IMChatLoginServiceProcessor implements LoginCmdProcessor {
                 .append(" order by create_time desc");
         ChatGroup chatGroup = ChatGroup.dao.findFirst(sql.toString());
         //customer 是否在线 如果不在线就是 留言群组
-        boolean isOnline = isOnline(customerDepartmentId);
+        boolean isOnline = this.isOnline(customerDepartmentId);
         if(null != chatGroup){
             //说明 当前存在 群组
             if(
@@ -967,7 +1005,7 @@ public class IMChatLoginServiceProcessor implements LoginCmdProcessor {
             //修改 sw_group 表中的 message_state 状态;  群组留言状态 1:留言 0:非留言
             //修改 sw_group 表中的 service_account_type 状态; 客服离线状态 1: 离线 0: 在线
             for (ChatGroup chatGroup : messageChatGroups) {
-                chatGroup .set("message_state", MessageStateEnum.NO.getKey())
+                chatGroup.set("message_state", MessageStateEnum.NO.getKey())
                         .set("service_account_type",ServiceAccountOfflineTypeEnum.ON_LINE.getKey())
                         .update();
             }