上一篇讲解了基于STOMP协议实现的WebSocket方案,本篇我讲一下Socket.IO的实现方案。
Socket.IO 主要使用WebSocket协议。但是如果需要的话,Socket.io可以回退到几种其它方法,
例如Adobe Flash Sockets、JSONP拉取、或是传统的AJAX拉取,并且提供完全相同的接口。
尽管它可以被用作WebSocket的包装库,它还是提供了许多其它功能,比如广播至多个套接字,存储与不同客户有关的数据,和异步IO操作。
更多请参考 Socket.IO 官网:https://socket.io/
概述
基于 socket.io 来说,采用 node 实现更加合适,本文使用两个后端的Java开源框架实现。
业务需求是将之前通过轮询方式调动RESTFul API改成使用WebSocket长连接方式,实现要服务器实时的推送消息,另外还要实时监控POS机的在线状态等。
引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| <dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.13</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-resolver</artifactId> <version>4.1.15.Final</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-transport</artifactId> <version>4.1.15.Final</version> </dependency> <dependency> <groupId>io.socket</groupId> <artifactId>socket.io-client</artifactId> <version>1.0.0</version> </dependency>
|
服务器代码
先来服务端程序爽一把,话不多说,先上代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| public class NamespaceSocketServer { private static final Logger logger = LoggerFactory.getLogger(NamespaceSocketServer.class);
public static void main(String[] args) {
Configuration config = new Configuration(); config.setHostname("localhost"); config.setPort(9092); config.setUpgradeTimeout(10000); config.setPingTimeout(180000); config.setPingInterval(60000); config.setAuthorizationListener(new AuthorizationListener() { @Override public boolean isAuthorized(HandshakeData data) { return true; } });
final SocketIOServer server = new SocketIOServer(config);
server.addConnectListener(new ConnectListener() { @Override public void onConnect(SocketIOClient client) { if (client != null) { logger.info("连接成功。clientId=" + client.getSessionId().toString()); client.joinRoom(client.getHandshakeData().getSingleUrlParam("appid")); } else { logger.error("并没有人连接上。。。"); } } });
server.addEventListener("login", LoginRequest.class, new DataListener<LoginRequest>() { @Override public void onData(SocketIOClient client, LoginRequest data, AckRequest ackRequest) { logger.info("接收到客户端login消息:code = " + data.getCode() + ",body = " + data.getBody()); if (ackRequest.isAckRequested()) { ackRequest.sendAckData("已成功收到客户端登录请求", "yeah"); } List<String> list = new ArrayList<>(); list.add("登录成功,sessionId=" + client.getSessionId()); String room = client.getHandshakeData().getSingleUrlParam("appid"); server.getRoomOperations(room).sendEvent("login", list.toString()); } }); server.start(); } }
|
Android客户端
老规矩,先上代码爽爽
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| public class SocketClient { private static Socket socket; private static final Logger logger = LoggerFactory.getLogger(SocketClient.class);
public static void main(String[] args) throws URISyntaxException { IO.Options options = new IO.Options(); options.transports = new String[]{"websocket"}; options.reconnectionAttempts = 2; options.reconnectionDelay = 1000; options.timeout = 20000; options.forceNew = true; options.query = "username=test1&password=test1&appid=com.xncoding.apay2"; socket = IO.socket("http://localhost:9092/", options); socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { @Override public void call(Object... args) { LoginRequest message = new LoginRequest(12, "这是客户端消息体"); socket.emit("login", JsonConverter.objectToJSONObject(message), (Ack) args1 -> { logger.info("回执消息=" + Arrays.stream(args1).map(Object::toString).collect(Collectors.joining(","))); }); } }).on("login", new Emitter.Listener() { @Override public void call(Object... args) { logger.info("接受到服务器房间广播的登录消息:" + Arrays.toString(args)); } }).on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("Socket.EVENT_CONNECT_ERROR"); socket.disconnect(); } }).on(Socket.EVENT_CONNECT_TIMEOUT, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("Socket.EVENT_CONNECT_TIMEOUT"); socket.disconnect(); } }).on(Socket.EVENT_PING, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("Socket.EVENT_PING"); } }).on(Socket.EVENT_PONG, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("Socket.EVENT_PONG"); } }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("-----------接受到消息啦--------" + Arrays.toString(args)); } }).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("客户端断开连接啦。。。"); socket.disconnect(); } }); socket.connect(); } }
|
关于心跳机制
根据 Socket.IO文档 解释,
客户端会定期发送心跳包,并触发一个ping事件和一个pong事件,如下:
ping
Fired when a ping packet is written out to the server.
pong
Fired when a pong is received from the server.
Parameters:
Number
number of ms elapsed since ping
packet (i.e.: latency)
这里最重要的两个服务器参数如下:
- pingTimeout (Number): how many ms without a pong packet to consider the connection closed (60000)
- pingInterval (Number): how many ms before sending a new ping packet (25000).
也就是说握手协议的时候,客户端从服务器拿到这两个参数,一个是ping消息的发送间隔时间,一个是从服务器返回pong消息的超时时间,
客户端会在超时后断开连接。心跳包发送方向是客户端向服务器端发送,以维持在线状态。
关于断线和超时
关闭浏览器、直接关闭客户端程序、kill进程、主动执行disconnect方法都会导致立刻产生断线事件。
而客户端把网络断开,服务器端在 pingTimeout
ms后产生断线事件、客户端在 pingTimeout
ms后也产生断线事件。
实际上,超时后会产生一个断线事件,叫”disconnect”。客户端和服务器端都可以对这个事件作出应答,释放连接。
客户端代码:
1 2 3 4 5 6 7
| .on(Socket.EVENT_DISCONNECT, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("客户端断开连接啦。。。"); socket.disconnect(); } });
|
连上服务器后,断开网络。超过了心跳超时时间后,产生断线事件。如果客户端不主动断开连接的话,会自动重连,
这时候发现连接不上,又产生连接错误事件,然后重试2次,都失败后自动断开连接了。
下面是客户端日志:
1 2 3 4 5
| SocketClient - 回执消息=服务器已成功收到客户端登录请求,yeah SocketClient - Socket.EVENT_PING SocketClient - Socket.EVENT_PONG SocketClient - 客户端断开连接啦。。。 SocketClient - Socket.EVENT_CONNECT_ERROR
|
服务器端代码:
1 2 3 4 5 6
| server.addDisconnectListener(new DisconnectListener() { @Override public void onDisconnect(SocketIOClient client) { System.out.println("服务器收到断线通知... sessionId=" + client.getSessionId()); } });
|
服务器逻辑是,如果在心跳超时后,就直接断开这个连接,并且产生一个断开连接事件。
服务器通过netty处理心跳包ping/pong的日志如下:
1 2 3
| WebSocket08FrameDecoder - Decoding WebSocket Frame opCode=1 WebSocket08FrameDecoder - Decoding WebSocket Frame length=1 WebSocket08FrameEncoder - Encoding WebSocket Frame opCode=1 length=1
|
浏览器客户端演示
对于netty-socketio
有一个demo工程,里面通过一个网页的聊天小程序演示了各种使用方法。
demo地址:netty-socketio-demo
SpringBoot集成
最后重点讲一下如何在SpringBoot中集成。
修改配置
首先maven依赖之前已经讲过了,先修改下application.yml
配置文件来配置下几个参数,比如主机、端口、心跳时间等等。
1 2 3 4
| xncoding: socket-hostname: localhost socket-port: 9096
|
添加Bean配置
然后增加一个SocketServer的Bean配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Configuration public class NettySocketConfig {
@Resource private MyProperties myProperties;
@Resource private ApiService apiService; @Resource private ManagerInfoService managerInfoService;
private static final Logger logger = LoggerFactory.getLogger(NettySocketConfig.class);
@Bean public SocketIOServer socketIOServer() {
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); config.setPort(9096); config.setUpgradeTimeout(10000); config.setPingInterval(60000); config.setPingTimeout(180000); config.setAuthorizationListener(new AuthorizationListener() { @Override public boolean isAuthorized(HandshakeData data) { String username = data.getSingleUrlParam("username"); String password = data.getSingleUrlParam("password"); logger.info("连接参数:username=" + username + ",password=" + password); ManagerInfo managerInfo = managerInfoService.findByUsername(username); String salt = managerInfo.getSalt(); String encodedPassword = ShiroKit.md5(password, username + salt); return encodedPassword.equals(managerInfo.getPassword()); } });
final SocketIOServer server = new SocketIOServer(config);
return server; }
@Bean public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) { return new SpringAnnotationScanner(socketServer); } }
|
注意,我在AuthorizationListener
里面通过调用service做了用户名和密码的认证。通过注解方式可以注入service,
执行相应的连接授权动作。
后面还有个SpringAnnotationScanner
的定义不能忘记。
添加消息结构类
预先定义好客户端和服务器端直接传递的消息类型,使用简单的JavaBean即可,比如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class ReportParam {
private String imei;
private String location;
public String getImei() { return imei; }
public void setImei(String imei) { this.imei = imei; }
public String getLocation() { return location; }
public void setLocation(String location) { this.location = location; } }
|
添加消息处理类
这里才是最核心的接口处理类,所有接口处理逻辑都应该写在这里面,我只举了一个例子,就是POS上传位置接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
@Component public class MessageEventHandler {
private final SocketIOServer server; private final ApiService apiService;
private static final Logger logger = LoggerFactory.getLogger(MessageEventHandler.class);
@Autowired public MessageEventHandler(SocketIOServer server, ApiService apiService) { this.server = server; this.apiService = apiService; }
@OnConnect public void onConnect(SocketIOClient client) { if (client != null) { String imei = client.getHandshakeData().getSingleUrlParam("imei"); String applicationId = client.getHandshakeData().getSingleUrlParam("appid"); logger.info("连接成功, applicationId=" + applicationId + ", imei=" + imei + ", sessionId=" + client.getSessionId().toString() ); client.joinRoom(applicationId); ReportParam param = new ReportParam(); param.setImei(imei); apiService.updateJustState(param, client.getSessionId().toString(), 1); } else { logger.error("客户端为空"); } }
@OnDisconnect public void onDisconnect(SocketIOClient client) { String imei = client.getHandshakeData().getSingleUrlParam("imei"); logger.info("客户端断开连接, imei=" + imei + ", sessionId=" + client.getSessionId().toString()); ReportParam param = new ReportParam(); param.setImei(imei); apiService.updateJustState(param, "", 2); client.disconnect(); }
@OnEvent(value = Socket.EVENT_MESSAGE) public void onEvent(SocketIOClient client, AckRequest ackRequest, Object data) { logger.info("接收到客户端消息"); if (ackRequest.isAckRequested()) { ackRequest.sendAckData("服务器回答Socket.EVENT_MESSAGE", "好的"); } }
@OnEvent(value = "broadcast") public void onBroadcast(SocketIOClient client, AckRequest ackRequest, Object data) { logger.info("接收到广播消息"); String room = client.getHandshakeData().getSingleUrlParam("appid"); server.getRoomOperations(room).sendEvent("broadcast", "广播啦啦啦啦"); }
@OnEvent(value = "doReport") public void onDoReport(SocketIOClient client, AckRequest ackRequest, ReportParam param) { logger.info("报告地址接口 start...."); BaseResponse result = postReport(param); ackRequest.sendAckData(result); }
private BaseResponse postReport(ReportParam param) { BaseResponse result = new BaseResponse(); int r = apiService.report(param); if (r > 0) { result.setSuccess(true); result.setMsg("报告地址成功"); } else { result.setSuccess(false); result.setMsg("该POS机还没有入网,报告地址失败。"); } return result; } }
|
添加ServerRunner
还有一个步骤就是添加启动器,在SpringBoot启动之后立马执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
@Component @Order(1) public class ServerRunner implements CommandLineRunner { private final SocketIOServer server; private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);
@Autowired public ServerRunner(SocketIOServer server) { this.server = server; }
@Override public void run(String... args) throws Exception { logger.info("ServerRunner 开始启动啦..."); server.start(); } }
|
nginx反向代理
要实现通过域名并走标准80或443端口的话,最好使用nginx做反向代理,跟正常的http反向代理基本一致,
不过websocket需要增加一个upgrade的配置。
下面我以一个实际使用例子来说明如何配置nginx的https访问websocket,并且开启301自动http跳转https。
首先要有一个域名,比如test.enzhico.net
,然后申请letsencrypt的免费证书,这个过程我不讲了,另外的博客文章里面有。
配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| map $http_upgrade $connection_upgrade { default upgrade; '' close; }
server { server_name test.enzhico.net; location / { proxy_pass http://localhost:9096; proxy_read_timeout 300s; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; } #root /opt/www/test.enzhico.net; #index index.html index.htm; error_page 404 /404.html; location = /40x.html { }
error_page 500 502 503 504 /50x.html; location = /50x.html { }
listen 443 ssl; # managed by Certbot ssl_certificate /etc/letsencrypt/live/test.enzhico.net/fullchain.pem; # managed by Certbot ssl_certificate_key /etc/letsencrypt/live/test.enzhico.net/privkey.pem; # managed by Certbot include /etc/letsencrypt/options-ssl-nginx.conf; # managed by Certbot ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem; # managed by Certbot
}
server { listen 80; server_name test.enzhico.net; return 301 https://$host$request_uri; # managed by Certbot }
|
注意这其中和普通HTTP代理的关键不同是:
1 2
| proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade;
|
参考文章
GitHub源码
springboot-socketio