admin管理员组文章数量:1588976
文章目录
- 需求目的
- 需求分析
- 源码
项目源码
需求目的
这个需求的目的是为了实现一个IM系统,其中IM系统需要提供如下这些功能。
- 支持各种类型消息的发送,包括语音、图片、文本消息、链接等消息的发送。
- 需要确保在最高并发达到5w的情况下的性能,考虑集群模式部署server端的时候的客户端负载均衡注册。
- 需要兼容app1和app2两个端消息的互通,也就是需要多用户端数据同步。
- 需要实现消息未读和消息已读功能。
需求分析
首先分析如何实现IM系统。
IM(Instant Messaging)及时通信系统。目前市面上有特别多的IM系统,QQ、微信都是。
而我们的产品也需要实现一个自己的IM通信系统,只不过我们的系统的并发量和用户量并不会比上面两个系统多,大概最多也就只需要承当5kqps。
由于市面上开发IM系统的人很多,而且也已经有非常成熟的源码了,这里我就非常简单的讲解一下实现一个最小的IM系统需要考虑什么和实现什么。
具体的可以参考:IM学习
由于我们的项目是使用Java语言进行开发的,所以很容易就可以想到IM系统可以使用Netty配合websocket的方式来实现在线的双向通信。
按照上面的目的分析,我们可以先简单的分析一下如何实现上面的这些目的。
- 定义一个特定的类类型,这个类类型就是消息类,消息类中存储了要发送的消息信息,包括发送人、接收人、时间、消息类型、消息内容等。这里需要考虑到如果是图片、语音、链接,那么消息要如何处理?实际上这些东西也就是一个又一个的文件而已,直接设定一个字段用来存储文件的URL就好了。
a. 既然考虑到了要存储文件,并且得到URL,那么这里基本就需要用到OSS服务了,这里毋庸置疑,我选Minio,原因是因为我用的很多,比较熟悉。 - 其次,现在的系统一般都是集群部署的,因此,我们的服务端启动的时候,需要有一个管理中心,负责管理,并且进行负载均衡。
a. 我们可以使用Nacos、Zookeeper作为注册和配置中心。这里考虑用Zookeeper了,以为客户端简单。
b. 当客户端进行注册的时候,去Zk获取服务端的信息,并且进行负载均衡,把客户端挂到负载更小的服务端上。 - 要想实现多个设备的消息互通,那么就意味着,用户同一个账号的多个设备的客户端,都需要连接到服务端上,并且服务端需要存储对应用户的所有客户端连接信息,然后把消息转发过去,或者说广播过去。
- 消息已读未读比较easy,发送一条消息过去之后,就设定消息为未读,可以考虑把未读的消息和数量存放到Redis等高速缓存中(同时保存到数据库),然后接受者一旦打开页面,前端发送读取消息的请求,把消息从Redis中快速读取出来,并且删除Redis中的数据,并标记数据为已读状态。
源码
按照上面的要求,我们首先需要先实现一些特定的消息类型。
这个设计比较简单,之前我在设计RPC系统的时候也大概讲解过思路。
这个ChatMsg类型的作用就是我们系统的消息类,所有IM系统上发送的消息信息都可以用这个类来表示。
package blossom.project.imty;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class ChatMsg {
private String senderId; // 发送者的用户id
private String receiverId; // 接受者的用户id
private String msg; // 聊天内容
private Integer msgType; // 消息类型,见枚举 MsgTypeEnum.java
private String msgId;
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime chatTime; // 消息的聊天时间,既是发送者的发送时间、又是接受者的接受时间
private Integer showMsgDateTimeFlag; // 标记存储数据库,用于历史展示。每超过1分钟,则显示聊天时间,前端可以控制时间长短
private String videoPath; // 视频地址
private Integer videoWidth; // 视频宽度
private Integer videoHeight; // 视频高度
private Integer videoTimes; // 视频时间
private String voicePath; // 语音地址
private Integer speakVoiceDuration; // 语音时长
private Boolean isRead; // 语音消息标记是否已读未读,true: 已读,false: 未读
private Integer communicationType; // 聊天类型, 1:单聊,2:群聊
private Boolean isReceiverOnLine; // 用于标记当前接受者是否在线
}
设定完毕消息类型之后,我们可以先考虑一下如何实现多个设备的消息互通?
很容易就可以想到,存储一下客户端的每一个Channel连接,并且给他们标识一下编号,做好用户账号和Channel连接间的映射关系。
这样子,当同一个用户账号收到消息的时候,我们广播到这个用户账号对应的所有目前客户端连接就好了。
如下类就是这样子的一个管理多设备多端用户Channel的类。
public class MultiChannelManager {
// 用于多端同时接受消息,允许同一个账号在多个设备同时在线,比如iPad、iPhone、Mac等设备同时收到消息
// key: userId, value: 多个用户的channel
private static Map<String, List<Channel>> multiChannel = new HashMap<>();
// 用于记录用户id和客户端channel长id的关联关系
private static Map<String, String> userChannelIdRelation = new HashMap<>();
public static void putUserChannelIdRelation(String channelId, String userId) {
userChannelIdRelation.put(channelId, userId);
}
public static String getUserIdByChannelId(String channelId) {
return userChannelIdRelation.get(channelId);
}
public static void putMultiChannels(String userId, Channel channel) {
List<Channel> channels = getMultiChannels(userId);
if (channels == null || channels.size() == 0) {
channels = new ArrayList<>();
}
channels.add(channel);
multiChannel.put(userId, channels);
}
public static List<Channel> getMultiChannels(String userId) {
return multiChannel.get(userId);
}
public static void removeUselessChannels(String userId, String channelId) {
List<Channel> channels = getMultiChannels(userId);
if (channels == null || channels.size() == 0) {
return;
}
for (int i = 0 ; i < channels.size() ; i ++) {
Channel tempChannel = channels.get(i);
if (tempChannel.id().asLongText().equals(channelId)) {
channels.remove(i);
}
}
multiChannel.put(userId, channels);
}
public static List<Channel> getMyOtherChannels(String userId, String channelId) {
List<Channel> channels = getMultiChannels(userId);
if (channels == null || channels.size() == 0) {
return null;
}
List<Channel> myOtherChannels = new ArrayList<>();
for (int i = 0 ; i < channels.size() ; i ++) {
Channel tempChannel = channels.get(i);
if (!tempChannel.id().asLongText().equals(channelId)) {
myOtherChannels.add(tempChannel);
}
}
return myOtherChannels;
}
public static void outputMulti() {
for (Map.Entry<String, List<Channel>> entry : multiChannel.entrySet()) {
System.out.println("UserId: " + entry.getKey());
List<Channel> temp = entry.getValue();
for (Channel c : temp) {
System.out.println("\t\t ChannelId: " + c.id().asLongText());
}
}
}
public static void sendToTarget(List<Channel> receiverChannels, DataContent dataContent) {
ChannelGroup clients = ChatHandler.clients;
if (receiverChannels == null) {
return;
}
for (Channel c : receiverChannels) {
Channel findChannel = clients.find(c.id());
if (findChannel != null) {
findChannel.writeAndFlush(
new TextWebSocketFrame(
JsonUtils.objectToJson(dataContent)));
}
}
}
public static void sendToMyOthers(List<Channel> myOtherChannels, DataContent dataContent) {
ChannelGroup clients = ChatHandler.clients;
if (myOtherChannels == null) {
return;
}
for (Channel c : myOtherChannels) {
Channel findChannel = clients.find(c.id());
if (findChannel != null) {
findChannel.writeAndFlush(
new TextWebSocketFrame(
JsonUtils.objectToJson(dataContent)));
}
}
}
}
保存完毕多端用户通道之后,我们就可以开始设计一下如何往这些通道发送消息了。
构思一下,我们知道如果要往这些通道发消息,那么首先是通过服务器发送的,也就是一个NettyIM服务器上会保存有多个连接到这个服务器的Channel连接,我们通过特定的手段,例如ID的方式获取到这个Channel连接之后,然后通过IM的服务端发送消息到客户端连接上就好。
所以,这里我们可以顺带把服务端管理的实现给他先完成。
这里依靠Zk的能力。再服务端启动的时候,把服务端信息注册到Zk上。
public class CuratorConfig {
private static String host = "127.0.0.1:3191"; // 单机/集群的ip:port地址
private static Integer connectionTimeoutMs = 30 * 1000; // 连接超时时间
private static Integer sessionTimeoutMs = 3 * 1000; // 会话超时时间
private static Integer sleepMsBetweenRetry = 2 * 1000; // 每次重试的间隔时间
private static Integer maxRetries = 3; // 最大重试次数
private static String namespace = "itzixi-im"; // 命名空间(root根节点名称)
private static CuratorFramework client;
static {
RetryPolicy backoffRetry = new ExponentialBackoffRetry(sleepMsBetweenRetry, maxRetries);
// 声明初始化客户端
client = CuratorFrameworkFactory.builder()
.connectString(host)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.retryPolicy(backoffRetry)
.namespace(namespace)
.build();
client.start(); // 启动curator客户端
}
public static CuratorFramework getClient() {
return client;
}
}
public class ZookeeperRegister {
public static void registerNettyServer(String nodeName,
String ip,
Integer port) throws Exception {
CuratorFramework zkClient = CuratorConfig.getClient();
String path = "/" + nodeName;
Stat stat = zkClient.checkExists().forPath(path);
if (stat == null) {
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(path);
} else {
System.out.println(stat.toString());
}
// 创建对应的临时节点,值可以放在线人数,默认为初始化的0
NettyServerNode serverNode = new NettyServerNode();
serverNode.setIp(ip);
serverNode.setPort(port);
String nodeJson = JsonUtils.objectToJson(serverNode);
zkClient.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(path + "/im-", nodeJson.getBytes());
}
public static String getLocalIp() throws Exception {
InetAddress addr = InetAddress.getLocalHost();
String ip=addr.getHostAddress();
System.out.println("本机IP地址:" + ip);
return ip;
}
/**
* 增加在线人数
* @param serverNode
*/
public static void incrementOnlineCounts(NettyServerNode serverNode) throws Exception {
dealOnlineCounts(serverNode, 1);
}
/**
* 减少在线人数
* @param serverNode
*/
public static void decrementOnlineCounts(NettyServerNode serverNode) throws Exception {
dealOnlineCounts(serverNode, -1);
}
/**
* 处理在线人数的增减
* @param serverNode
* @param counts
*/
public static void dealOnlineCounts(NettyServerNode serverNode,
Integer counts) throws Exception {
CuratorFramework zkClient = CuratorConfig.getClient();
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(zkClient,
"/rw-locks");
readWriteLock.writeLock().acquire();
try {
String path = "/server-list";
List<String> list = zkClient.getChildren().forPath(path);
for (String node:list) {
String pendingNodePath = path + "/" + node;
String nodeValue = new String(zkClient.getData().forPath(pendingNodePath));
NettyServerNode pendingNode = JsonUtils.jsonToPojo(nodeValue,
NettyServerNode.class);
// 如果ip和端口匹配,则当前路径的节点则需要累加或者累减
if (pendingNode.getIp().equals(serverNode.getIp()) &&
(pendingNode.getPort().intValue() == serverNode.getPort().intValue())) {
pendingNode.setOnlineCounts(pendingNode.getOnlineCounts() + counts);
String nodeJson = JsonUtils.objectToJson(pendingNode);
zkClient.setData().forPath(pendingNodePath, nodeJson.getBytes());
}
}
} finally {
readWriteLock.writeLock().release();
}
}
/**
* 获取负载最轻的服务器
* @return
* @throws Exception
*/
public static NettyServerNode getLeastLoadedServer() throws Exception {
CuratorFramework zkClient = CuratorConfig.getClient();
String path = "/server-list";
List<String> list = zkClient.getChildren().forPath(path);
NettyServerNode leastLoadedNode = null;
int minOnlineCounts = Integer.MAX_VALUE;
for (String node : list) {
String pendingNodePath = path + "/" + node;
String nodeValue = new String(zkClient.getData().forPath(pendingNodePath));
NettyServerNode pendingNode = JsonUtils.jsonToPojo(nodeValue, NettyServerNode.class);
if (pendingNode.getOnlineCounts() < minOnlineCounts) {
minOnlineCounts = pendingNode.getOnlineCounts();
leastLoadedNode = pendingNode;
}
}
return leastLoadedNode;
}
}
public class JedisPoolUtils {
private static final JedisPool jedisPool;
static {
//配置连接池
JedisPoolConfig poolConfig = new JedisPoolConfig();
//最大连接数
poolConfig.setMaxTotal(10);
//最大空闲连接
poolConfig.setMaxIdle(10);
//最小空闲连接
poolConfig.setMinIdle(5);
//最长等待时间,ms
poolConfig.setMaxWaitMillis(1500);
//创建连接池对象
jedisPool = new JedisPool(poolConfig,
"127.0.0.1",
5379,
1000,
"BlossomIM");
}
public static Jedis getJedis(){
return jedisPool.getResource();
}
}
然后在我们的服务端启动的时候,执行如下代码即可。
// 注册当前netty服务到zookeeper中
ZookeeperRegister.registerNettyServer("server-list",
ZookeeperRegister.getLocalIp(),
nettyPort);
这里我们知道,Netty作为服务器启动后,会绑定机器上的某一个端口,所以我们的服务器节点保存ip和端口信息即可。
完成了当前nettyim服务器的注册之后,我们就可以开始设计如何广播消息了。
这里我们利用MQ的能力来实现异步。
package blossom.project.imty.mq;
import blossom.project.imty.DataContent;
import blossom.project.imty.websocket.MultiChannelManager;
import blossom.project.im.utils.JsonUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class RabbitMQConnectUtils {
private final List<Connection> connections = new ArrayList<>();
private final int maxConnection = 20;
// 开发环境 dev
private final String host = "127.0.0.1";
private final int port = 5682;
private final String username = "BlossomIM";
private final String password = "BlossomIM";
private final String virtualHost = "BlossomIM";
// 生产环境 prod
//private final String host = "";
//private final int port = 5672;
//private final String username = "123";
//private final String password = "123";
//private final String virtualHost = "123";
public ConnectionFactory factory;
public ConnectionFactory getRabbitMqConnection() {
return getFactory();
}
public ConnectionFactory getFactory() {
initFactory();
return factory;
}
private void initFactory() {
try {
if (factory == null) {
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendMsg(String message, String queue) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.basicPublish("",
queue,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("utf-8"));
channel.close();
setConnection(connection);
}
public void sendMsg(String message, String exchange, String routingKey) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.basicPublish(exchange,
routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("utf-8"));
channel.close();
setConnection(connection);
}
public GetResponse basicGet(String queue, boolean autoAck) throws Exception {
GetResponse getResponse = null;
Connection connection = getConnection();
Channel channel = connection.createChannel();
getResponse = channel.basicGet(queue, autoAck);
channel.close();
setConnection(connection);
return getResponse;
}
public Connection getConnection() throws Exception {
return getAndSetConnection(true, null);
}
public void setConnection(Connection connection) throws Exception {
getAndSetConnection(false, connection);
}
public void listen(String fanout_exchange, String queueName) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
// FANOUT 发布订阅模式(广播模式)
channel.exchangeDeclare(fanout_exchange,
BuiltinExchangeType.FANOUT,
true, false, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, fanout_exchange, "");
Consumer consumer = new DefaultConsumer(channel){
/**
* 重写消息配送方法
* @param consumerTag 消息的标签(标识)
* @param envelope 信封(一些信息,比如交换机路由等等信息)
* @param properties 配置信息
* @param body 收到的消息数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println("body = " + msg);
String exchange = envelope.getExchange();
System.out.println("exchange = " + exchange);
if (exchange.equalsIgnoreCase("fanout_exchange")) {
DataContent dataContent = JsonUtils.jsonToPojo(msg, DataContent.class);
String senderId = dataContent.getChatMsg().getSenderId();
String receiverId = dataContent.getChatMsg().getReceiverId();
// 广播至集群的其他节点并且发送给用户聊天信息
List<ioty.channel.Channel> receiverChannels =
MultiChannelManager.getMultiChannels(receiverId);
MultiChannelManager.sendToTarget(receiverChannels, dataContent);
// 广播至集群的其他节点并且同步给自己其他设备聊天信息
String currentChannelId = dataContent.getExtend();
List<ioty.channel.Channel> senderChannels =
MultiChannelManager.getMyOtherChannels(senderId, currentChannelId);
MultiChannelManager.sendToTarget(senderChannels, dataContent);
}
}
};
/**
* queue: 监听的队列名
* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
* callback: 回调函数,处理监听到的消息
*/
channel.basicConsume(queueName, true, consumer);
}
private synchronized Connection getAndSetConnection(boolean isGet, Connection connection) throws Exception {
getRabbitMqConnection();
if (isGet) {
if (connections.isEmpty()) {
return factory.newConnection();
}
Connection newConnection = connections.get(0);
connections.remove(0);
if (newConnection.isOpen()) {
return newConnection;
} else {
return factory.newConnection();
}
} else {
if (connections.size() < maxConnection) {
connections.add(connection);
}
return null;
}
}
}
用下面的类来保存消息和广播消息。
public class MessagePublisher {
// 定义交换机的名字
public static final String TEST_EXCHANGE = "test_exchange";
// 定义队列的名字
public static final String TEST_QUEUE = "test_queue";
// 发送信息到消息队列接受并且保存到数据库的路由地址
public static final String ROUTING_KEY_WECHAT_MSG_SEND = "BlossomIM.wechat.wechat.msg.send";
public static void sendMsgToSave(ChatMsg msg) throws Exception {
RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();
connectUtils.sendMsg(JsonUtils.objectToJson(msg),
TEST_EXCHANGE,
ROUTING_KEY_WECHAT_MSG_SEND);
}
public static void sendMsgToOtherNettyServer(String msg) throws Exception {
RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();
String fanout_exchange = "fanout_exchange";
connectUtils.sendMsg(msg, fanout_exchange, "");
}
}
并且在服务器启动的时候,监听特定的rabbitmq-queue。
// 启动消费者进行监听,队列可以根据动态生成的端口号进行拼接
String queueName = "netty_queue_" + nettyPort;
RabbitMQConnectUtils mqConnectUtils = new RabbitMQConnectUtils();
mqConnectUtils.listen("fanout_exchange", queueName);
这样子,当我们在消息处理函数中,广播消息的时候,就可以通过上面的mq接口进行广播了。
这个服务器就会收到,然后取出对应的服务器,把消息发送给特定的客户端。
在设计完毕我们的消息类型之后,我们可以开始基于Websocket和Netty实现我们的消息处理类了,也就是假设我们收到了上面的ChatMsg类型的消息,我们会如何进行处理?
这里我们创建如下类型,用来处理入站消息。和RPC的实现方式一样,再这个类中,我们需要考虑到所有的我们希望的对消息的处理操作。
// SimpleChannelInboundHandler: 对于请求来说,相当于入站(入境)
// TextWebSocketFrame: 用于为websocket专门处理的文本数据对象,Frame是数据(消息)的载体
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>
具体的消息处理代码如下:
package blossom.project.imty.websocket;
import blossom.project.im.enums.MsgTypeEnum;
import blossom.project.imty.ChatMsg;
import blossom.project.imty.DataContent;
import blossom.project.imty.NettyServerNode;
import blossom.project.imty.utils.JedisPoolUtils;
import blossom.project.imty.utils.ZookeeperRegister;
import blossom.project.im.utils.JsonUtils;
import blossom.project.im.utils.LocalDateUtils;
import blossom.project.im.utils.OkHttpUtil;
import com.a3testponent.idworker.IdWorkerConfigBean;
import com.a3testponent.idworker.Snowflake;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import ioty.channel.Channel;
import ioty.channel.ChannelHandlerContext;
import ioty.channel.SimpleChannelInboundHandler;
import ioty.channel.group.ChannelGroup;
import ioty.channel.group.DefaultChannelGroup;
import ioty.handler.codec.http.websocketx.TextWebSocketFrame;
import ioty.util.concurrent.GlobalEventExecutor;
import blossom.project.im.grace.result.GraceJSONResult;
import blossom.project.imty.mq.MessagePublisher;
import redis.clients.jedis.Jedis;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* 创建自定义助手类
*
* @Auther ZhangBlossom
*/
// SimpleChannelInboundHandler: 对于请求来说,相当于入站(入境)
// TextWebSocketFrame: 用于为websocket专门处理的文本数据对象,Frame是数据(消息)的载体
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用于记录和管理所有客户端的channel组
public static ChannelGroup clients =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) throws Exception {
// 获得客户端传输过来的消息
String content = msg.text();
System.out.println("接受到的数据:" + content);
// 1. 获取客户端发来的消息并且解析
DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
ChatMsg chatMsg = dataContent.getChatMsg();
String msgText = chatMsg.getMsg();
String receiverId = chatMsg.getReceiverId();
String senderId = chatMsg.getSenderId();
// 判断是否黑名单 start
// 如果双方只要有一方是黑名单,则终止发送
GraceJSONResult result = OkHttpUtil.get("http://127.0.0.1:1000/friendship/isBlack?friendId1st=" + receiverId
+ "&friendId2nd=" + senderId);
boolean isBlack = (Boolean) result.getData();
System.out.println("当前的黑名单关系为: " + isBlack);
if (isBlack) {
return;
}
// 判断是否黑名单 end
// 时间校准,以服务器的时间为准
chatMsg.setChatTime(LocalDateTime.now());
Integer msgType = chatMsg.getMsgType();
// 获取channel
Channel currentChannel = ctx.channel();
String currentChannelId = currentChannel.id().asLongText();
String currentChannelIdShort = currentChannel.id().asShortText();
// System.out.println("客户端currentChannelId:" + currentChannelId);
// System.out.println("客户端currentChannelIdShort:" + currentChannelIdShort);
// 2. 判断消息类型,根据不同的类型来处理不同的业务
if (Objects.equals(msgType, MsgTypeEnum.CONNECT_INIT.type)) {
// 当websocket初次open的时候,初始化channel,把channel和用户userid关联起来
MultiChannelManager.putMultiChannels(senderId, currentChannel);
MultiChannelManager.putUserChannelIdRelation(currentChannelId, senderId);
NettyServerNode minNode = dataContent.getServerNode();
// System.out.println(minNode);
// 初次连接后,该节点下的在线人数累加
ZookeeperRegister.incrementOnlineCounts(minNode);
// 获得ip+端口,在redis中设置关系,以便在前端设备断线后减少在线人数
Jedis jedis = JedisPoolUtils.getJedis();
jedis.set(senderId, JsonUtils.objectToJson(minNode));
} else if (Objects.equals(msgType, MsgTypeEnum.WORDS.type)
|| Objects.equals(msgType, MsgTypeEnum.IMAGE.type)
|| Objects.equals(msgType, MsgTypeEnum.VIDEO.type)
|| Objects.equals(msgType, MsgTypeEnum.VOICE.type)
) {
// 此处为mq异步解耦,保存信息到数据库,数据库无法获得信息的主键id,
// 所以此处可以用snowflake直接生成唯一的主键id
Snowflake snowflake = new Snowflake(new IdWorkerConfigBean());
String sid = snowflake.nextId();
System.out.println("sid = " + sid);
String iid = IdWorker.getIdStr();
System.out.println("iid = " + iid);
chatMsg.setMsgId(sid);
// 此处receiverId所对应的channel为空
// 发送消息
// List<Channel> receiverChannels = MultiChannelManager.getMultiChannels(receiverId);
// if (receiverChannels == null || receiverChannels.size() == 0 || receiverChannels.isEmpty()) {
// receiverChannels为空,表示用户离线/断线状态,消息不需要发送,后续可以存储到数据库
// chatMsg.setIsReceiverOnLine(false);
// } else {
// chatMsg.setIsReceiverOnLine(true);
if (Objects.equals(msgType, MsgTypeEnum.VOICE.type)) {
chatMsg.setIsRead(false);
}
dataContent.setChatMsg(chatMsg);
String chatTimeFormat = LocalDateUtils
.format(chatMsg.getChatTime(),
LocalDateUtils.DATETIME_PATTERN_2);
dataContent.setChatTime(chatTimeFormat);
// MultiChannelManager.sendToTarget(receiverChannels, dataContent);
MessagePublisher.sendMsgToOtherNettyServer(JsonUtils.objectToJson(dataContent));
// 当receiverChannels为空不为空的时候,同账户多端设备接受消息
// for (Channel c : receiverChannels) {
// Channel findChannel = clients.find(c.id());
// if (findChannel != null) {
//
// // if (msgType == MsgTypeEnum.VOICE.type) {
// // chatMsg.setIsRead(false);
// // }
// // dataContent.setChatMsg(chatMsg);
// // String chatTimeFormat = LocalDateUtils
// // .format(chatMsg.getChatTime(),
// // LocalDateUtils.DATETIME_PATTERN_2);
// // dataContent.setChatTime(chatTimeFormat);
// // 发送消息给在线的用户
// findChannel.writeAndFlush(
// new TextWebSocketFrame(
// JsonUtils.objectToJson(dataContent)));
// }
//
// }
// }
// 把聊天信息作为mq的消息发送给消费者进行消费处理(保存到数据库)
MessagePublisher.sendMsgToSave(chatMsg);
}
// 此处也不需要了,都在mq的监听中完成
// dataContent.setChatMsg(chatMsg);
// String chatTimeFormat = LocalDateUtils
// .format(chatMsg.getChatTime(),
// LocalDateUtils.DATETIME_PATTERN_2);
// dataContent.setChatTime(chatTimeFormat);
// dataContent.setExtend(currentChannelId);
//
// List<Channel> myOtherChannels = MultiChannelManager
// .getMyOtherChannels(senderId, currentChannelId);
// MultiChannelManager.sendToMyOthers(myOtherChannels, dataContent);
// for (Channel c : myOtherChannels) {
// Channel findChannel = clients.find(c.id());
// if (findChannel != null) {
// // dataContent.setChatMsg(chatMsg);
// // String chatTimeFormat = LocalDateUtils
// // .format(chatMsg.getChatTime(),
// // LocalDateUtils.DATETIME_PATTERN_2);
// // dataContent.setChatTime(chatTimeFormat);
// // 同步消息给在线的其他设备端
// findChannel.writeAndFlush(
// new TextWebSocketFrame(
// JsonUtils.objectToJson(dataContent)));
// }
// }
// currentChannel.writeAndFlush(new TextWebSocketFrame(currentChannelId));
// clients.writeAndFlush(new TextWebSocketFrame(currentChannelId));
MultiChannelManager.outputMulti();
}
/**
* 客户端连接到服务端之后(打开链接)
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel currentChannel = ctx.channel();
String currentChannelId = currentChannel.id().asLongText();
System.out.println("客户端建立连接,channel对应的长id为:" + currentChannelId);
// 获得客户端的channel,并且存入到ChannelGroup中进行管理(作为一个客户端群组)
clients.add(currentChannel);
}
/**
* 关闭连接,移除channel
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel currentChannel = ctx.channel();
String currentChannelId = currentChannel.id().asLongText();
System.out.println("客户端关闭连接,channel对应的长id为:" + currentChannelId);
// 移除多余的会话
String userId = MultiChannelManager.getUserIdByChannelId(currentChannelId);
MultiChannelManager.removeUselessChannels(userId, currentChannelId);
clients.remove(currentChannel);
// zk中在线人数累减
Jedis jedis = JedisPoolUtils.getJedis();
NettyServerNode minNode = JsonUtils.jsonToPojo(jedis.get(userId),
NettyServerNode.class);
ZookeeperRegister.decrementOnlineCounts(minNode);
}
/**
* 发生异常并且捕获,移除channel
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel currentChannel = ctx.channel();
String currentChannelId = currentChannel.id().asLongText();
System.out.println("发生异常捕获,channel对应的长id为:" + currentChannelId);
// 发生异常之后关闭连接(关闭channel)
ctx.channel().close();
// 随后从ChannelGroup中移除对应的channel
clients.remove(currentChannel);
// 移除多余的会话
String userId = MultiChannelManager.getUserIdByChannelId(currentChannelId);
MultiChannelManager.removeUselessChannels(userId, currentChannelId);
// zk中在线人数累减
Jedis jedis = JedisPoolUtils.getJedis();
NettyServerNode minNode = JsonUtils.jsonToPojo(jedis.get(userId),
NettyServerNode.class);
ZookeeperRegister.decrementOnlineCounts(minNode);
}
}
到此为止,其实我们的核心代码基本就完成了,这个时候,只要编写一个客户端代码,连接到对应的服务端,然后发送消息就好了。就能实现数据的交互。
接下来是main函数。
package blossom.project.imty;
import blossom.project.imty.websocket.WSServerInitializer;
import ioty.bootstrap.ServerBootstrap;
import ioty.channel.ChannelFuture;
import ioty.channel.EventLoopGroup;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.nio.NioServerSocketChannel;
import org.apachemons.lang3.StringUtils;
import blossom.project.imty.mq.RabbitMQConnectUtils;
import blossom.project.imty.utils.JedisPoolUtils;
import blossom.project.imty.utils.ZookeeperRegister;
import redis.clients.jedis.Jedis;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* ChatServer: Netty 服务的启动类(服务器)
* @Auther ZhangBlossom
*/
public class ChatServer {
public static final Integer nettyDefaultPort = 875;
public static final String initOnlineCounts = "0";
/*
* FIXME: 优化方案,改成zookeeper方案,
* 如此可以不需要在中断连接后,监听并且清理在线人数和端口,
* 因为netty与zk建立的临时节点,中断连接后,会自动删除该临时节点
*/
public static Integer selectPort(Integer port) {
String portKey = "netty_port";
Jedis jedis = JedisPoolUtils.getJedis();
jedis.set("jedis-test", "hello world");
Map<String, String> portMap = jedis.hgetAll(portKey);
System.out.println(portMap);
// 由于map中的key都应该是整数类型的port,所以先转换成整数后,再比对,否则string类型的比对会有问题
List<Integer> portList = portMap.entrySet().stream()
.map(entry -> Integer.valueOf(entry.getKey()))
.collect(Collectors.toList());
// step1: 编码到此处先运行测试看一下结果
System.out.println(portList);
Integer nettyPort = null;
if (portList == null || portList.isEmpty()) {
// step2: 编码到此处先运行测试看一下结果
jedis.hset(portKey, port+"", initOnlineCounts);
nettyPort = port;
} else {
// 循环portList,获得最大值,并且累加10
Optional<Integer> maxInteger = portList.stream().max(Integer::compareTo);
Integer maxPort = maxInteger.get().intValue();
Integer currentPort = maxPort + 10;
jedis.hset(portKey, currentPort+"", initOnlineCounts);
nettyPort = currentPort;
}
// step3: 编码到此处先运行测试看一下最终结果
return nettyPort;
/**
* TODO: ChatServer停止的时候,需要删除在redis中对应的端口。
* 一旦断开,则会触发zk的节点删除事件,在那边删除即可。写到springboot服务中即可
*/
// TODO: 客户端负载均衡,最小连接数,查询redis中最少在线人数的,并且让前端建立ws连接
// TODO: 用户创建连接,会话管理那边,则对应netty服务的在线人数累+1
// TODO: 用户断开连接,会话管理那边,则对应netty服务的在线人数累-1
}
public static void removePort(Integer port) {
String portKey = "netty_port";
Jedis jedis = JedisPoolUtils.getJedis();
jedis.hdel(portKey, port+"");
}
public static void main(String[] args) throws Exception {
// 定义主从线程组
// 定义主线程池,用于接受客户端的连接,但是不做任何处理,比如老板会谈业务,拉到业务就会交给下面的员工去做了
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 定义从线程池,处理主线程池交过来的任务,公司业务员开展业务,完成老板交代的任务
EventLoopGroup workerGroup = new NioEventLoopGroup();
// Netty服务启动的时候,从redis中查找有没有端口,如果没有则用875,如果有则把端口累加1(或10)再启动
Integer nettyPort = selectPort(nettyDefaultPort);
// 注册当前netty服务到zookeeper中
ZookeeperRegister.registerNettyServer("server-list",
ZookeeperRegister.getLocalIp(),
nettyPort);
// 启动消费者进行监听,队列可以根据动态生成的端口号进行拼接
String queueName = "netty_queue_" + nettyPort;
RabbitMQConnectUtils mqConnectUtils = new RabbitMQConnectUtils();
mqConnectUtils.listen("fanout_exchange", queueName);
try {
// 构建Netty服务器
ServerBootstrap server = new ServerBootstrap(); // 服务的启动类
server.group(bossGroup, workerGroup) // 把主从线程池组放入到启动类中
.channel(NioServerSocketChannel.class) // 设置Nio的双向通道
.childHandler(new WSServerInitializer()); // 设置处理器,用于处理workerGroup
// 启动server,并且绑定端口号875,同时启动方式为"同步"
ChannelFuture channelFuture = server.bind(nettyPort).sync();
// 请求:http://127.0.0.1:875
// 监听关闭的channel
channelFuture.channel().closeFuture().sync();
} finally {
// 优雅的关闭线程池组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
// 移除现有的redis与netty的端口关系
removePort(nettyPort);
}
}
}
其实到此为止,一个简单的IM系统就开发完毕了。
而Client的代码,可以参考如下进行实现:
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.URI;
import java.URISyntaxException;
import java.util.Scanner;
public class SimpleIMClient extends WebSocketClient {
public SimpleIMClient(URI serverUri) {
super(serverUri);
}
@Override
public void onOpen(ServerHandshake handshakedata) {
System.out.println("Connected to server");
}
@Override
public void onMessage(String message) {
System.out.println("Received message: " + message);
}
@Override
public void onClose(int code, String reason, boolean remote) {
System.out.println("Disconnected from server with exit code " + code + " additional info: " + reason);
}
@Override
public void onError(Exception ex) {
ex.printStackTrace();
}
public static void main(String[] args) throws URISyntaxException, InterruptedException {
// 获取负载最小的服务器节点
NettyServerNode leastLoadedServer = ZookeeperRegister.getLeastLoadedServer();
String serverUri = "ws://" + leastLoadedServer.getIp() + ":" + leastLoadedServer.getPort() + "/ws";
// 连接到服务器
SimpleIMClient client = new SimpleIMClient(new URI(serverUri));
client.connectBlocking();
// 创建一个新的线程来读取用户输入并发送消息
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String message = scanner.nextLine();
client.send(message);
}
}).start();
}
}
版权声明:本文标题:【BlossomIM】一个简单的IM系统的实现 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dianzi/1728048932a1143456.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论