preface
College learning time is coming to an end, sigh time in a hurry, three years flash by. My classmates are busy looking for jobs, so I would like to submit my resume here, and welcome employers and hunters to invite you. Let’s get down to business. Broadcast industry is the industry of the current hot, everybody wants to get a piece of the live feed a large number of people, a platform for the host to a rough estimate there are thousands of people, the real time watch online but some are incredibly amazing level of millions, especially the game host, it is conceivable that studio is a magnet type of advertising media, No wonder so many big companies are rushing to do live streaming. I don’t know how advanced the technology is in the live broadcast industry, since we haven’t done it ourselves, but we can create a live broadcast room for hundreds of people to watch at the same time.
The final results
- Demo address (computer and mobile terminal effect is different oh)
- Server project address
- Client project address
Mobile effect
Computer end effect broadcast Hong Kong SATELLITE TV
Project overview
The project is divided into three parts:
-
Client live room video pull stream, play and chat room, cool bullet screen and live room information
-
The server side deals with the data business of the direct broadcast room and the user, and the chat room message processing
-
Server The video server and Web server are deployed
Technology stack
Mobile client
- VUE buckets
- The UI layer vonic
- axios
- Video player: vue-video-player + videojs-contrib-hls
- Websocket client: VUe-stomp
- Barrage plugin: Vue-Barrage
- Packaging tool: Webpack
PC side client side
- Project architecture: Jquery + BootStrap
- Video player: video.js
- Websocket client: stomp.js + sockjs.js
- Danmu plugin: jquery.danmu.js
- Template engine: Thymeleaf
The service side
- IDE: IntelliJ IDEA
- Project architecture: SpringBoot1.5.4 +Maven3.0
- Primary database: Mysql5.7
- Secondary database: Redis3.2
- Database access layer: spring-boot-starter-data-jPA + spring-boot-starter-data-redis
- websocket: spring-boot-starter-websocket
- Messaging middleware: RabbitMQ/3.6.10
Server Deployment
- Video broadcast module: nginx-rtmp-module
- Web application server: Tomcat8.0
- Server: Tencent Cloud Centos6.5
Technical point explanation
The direct broadcast room mainly involves two main functions: the first is the video broadcast, the second is the chat room. Both of these are very real-time.
- Live video
When it comes to live streaming, let’s first learn about several commonly used live streaming protocols. I have read a lot of articles and blogs about streaming media protocols, but they are very rough. Here is a more detailed introduction of streaming media protocols. Here we only use RTMP and HLS. After practice, we found that RTMP can only be played on the computer, and HLS can only be played on the phone. And RTMP is pretty fast although not as fast as RTSP, the delay is only a few seconds, I tested about 2-5 seconds, but HLS is about 10 seconds. So if you’ve played a demo, you’ll see that there’s a lot of latency on the phone.
The process of live streaming: live streaming is divided into two processes: push flow and pull flow. Then where does the flow push and pull flow from? Of course, you need a video server. Don’t think that a live video server is very complicated. In fact, everything becomes simple in nginx server. I’ll explain how to deploy the Nginx server and configure the video module (nginx-rtmp-module) later.
First of all, hosts use streaming software, such as OBS Studio streaming software, which is relatively professional and recommended by many live streaming platforms to push video streams. Here, I also recommend Yasea, an open source streaming tool for Android, which has a small but powerful download address. After the live broadcast content is pushed to the server, the video coding tool can be used for transcoding on the server side, which can be converted into various HIGH-DEFINITION, STANDARD DEFINITION and ultra definition videos. That is why we can choose video resolution on various video websites. Here we do not transcode, just pull the video through the front-end video player (video.js). So the whole video push and pull process is complete.
- The chat room
The chat room in the direct broadcast room is similar to our group chat, but it has become the Web side, there are many instant communication solutions on the Web side, here we choose webSocket protocol to communicate with the server, WebSocket is a transmission protocol based on HTTP, the client sends HTTP requests to the server, And carry the Upgrade: webSocket Upgrade header to convert the WebSocket protocol, through the successful handshake with the server can establish TCP channel, so as to transfer messages, the biggest difference between it and HTTP is that the server can actively send messages to the client.
Now that we have a message channel, we need to send messages down the channel, but we need something to control who the messages go to, otherwise all hell will break free, so we chose the messaging middleware RabbitMQ. Use it to take care of where messages are routed.
Theoretical knowledge is finished, practical operation time!
Mobile client operations
The source address
Engineering structure
| | - build building services and webpack configuration - congfig project different environment configuration | - dist build directory to generate production | | - static static resources -- package. | - SRC json project configuration file Development source code directory by axios export apis directory | | - API - components pages and components | - | public public components - vuex global state | -- main. Js application launch configurationCopy the code
Function module
- Pull the live video stream (HLS) of the server and play the live picture
- Create websocket connection with server, send and receive chat room messages
-
Get the message via websocket and send it to the barrage
-
Update online users in real time through Websocket
- Get access history records in conjunction with the server
- Problem feedback module
rendering
Project description
Please refer to source code
Server operations
The source address
Since I prefer to contact new things, I chose SpringBoot for the back-end and vue.js for the front-end. Young people have to keep up with the trend. SpringBoot practice has found that it is really too easy to ignore the various configuration files, all automatic assembly. Post pom.xml here
Hushangjie rtmp-demo 0.0.1-SNAPSHOT jar rtmp-demo demo project for Spring Boot org.springFramework.boot Spring-boot-starter -parent 1.5.4.RELEASE UTF-8 UTF-8 1.8 org.springframework. Boot spring-boot-devtools true org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-actuator-docs org.springframework.boot spring-boot-starter-data-jpa org.springframework.boot spring-boot-starter-data-redis Org.springframework. boot spring-boot-starter-thymeleaf net.sourceforger. nekohtml nekohtml 1.9.22 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-websocket org.springframework.boot Spring-boot-starter-test test org.webjars vue 2.1.3 mysql mysql-connector-java joda-time joda-time 2.9.2 Projectreactor reactor-core 2.0.8.RELEASE IO. Projectreactor reactor-net 2.0.8.RELEASE IO.Net ty netty-all 4.1.6 org.springframework.boot spring-boot-maven-plugin trueCopy the code
Application. The properties file
spring.datasource.url=jdbc:mysql://host:3306/database? characterEncoding=utf8&useSSL=false spring.datasource.username=username spring.datasource.password=password spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.thymeleaf.mode=LEGACYHTML5 server.port=8085 # REDIS (RedisProperties) # Redis database index (default: 0) Spring.redis. database=0 # Redis server address Spring.redis. host=127.0.0.1 # Redis server connection port Port =6379 # Redis server connection password (default empty) Spring.redis. Password = # Maximum number of connections in the connection pool (negative value indicates no limit) spring.redis. Max-active =8 Spring.redis.pool. max-wait=-1 # Maximum idle connection in connection pool Spring.redis.pool. max-idle=8 # Minimum idle connection in connection pool Spring.redis.pool. min-idle=0 # Connection timeout (ms) spring.redis.timeout=0Copy the code
Websocket configuration
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {/ / interceptor into the service failure solution @ Bean public MyChannelInterceptor myChannelInterceptor(){ return new MyChannelInterceptor(); } @override public void registerStompEndpoints(StompEndpointRegistry Registry) {// Add domain access restrictions to prevent cross-domain socket connections //setAllowedOrigins("http://localhost:8085") registry.addEndpoint("/live").setAllowedOrigins("*").addInterceptors(new HandShkeInceptor()).withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { /*.enableSimpleBroker("/topic","/queue"); */ // If you need a third-party message broker, such as rabitMQ,activeMq, Configure registry here. SetApplicationDestinationPrefixes ("/demo "). EnableStompBrokerRelay ("/topic ", "/ queue") SetRelayHost (" 127.0.0.1). SetRelayPort (61613). SetClientLogin (" guest "). SetClientPasscode (" guest ") .setSystemLogin("guest") .setSystemPasscode("guest") .setSystemHeartbeatSendInterval(5000) .setSystemHeartbeatReceiveInterval(4000); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { ChannelRegistration channelRegistration = registration.setInterceptors(myChannelInterceptor()); super.configureClientInboundChannel(registration); } @Override public void configureClientOutboundChannel(ChannelRegistration registration) { super.configureClientOutboundChannel(registration); }}Copy the code
The configuration class inherits the message broker configuration class, which means we will use the message broker RabbitMQ. Use the registerStompEndpoints method to register a WebSocket terminal connection. There are two things we need to know here. The first one is STOMp and SockJS. What is SockJS? Sockjs automatically demotes to a polling strategy and simulates Websockets to ensure that the client and server can communicate. What does Stomp have to look at here
Stomp is a simple (streaming) text-directed message protocol that provides an interoperable connection format that allows STOMP clients to interact with any STOMP message Broker, namely our RabbbitMQ, which is a message Broker. The message broker can be configured using configureMessageBroker, but it is important to note that the server we will be deploying should also have RabbitMQ as it is middleware and is very easy to install, which is not explained here. We have configured the /topic /queue proxy strategy, which means that clients subscribed to channels prefixed with /topic /queue will be forwarded by the message proxy (RabbitMQ). It has nothing to do with Spring. It’s completely decoupled.
How does WebSocke ensure security
At first contact stomp when there has been a problem bothering me, only the client with the server through the websocket connection is established, and then he can subscribe to any content, means that you can accept any messages, so wouldn’t it be messed up, so I read a lot of blog posts, many are all examples of official did not resolve the actual problem. After pondering, in fact websocket is to consider security. Specifically in the following aspects
- Cross-domain Websocket connection
- Handshake interceptor before protocol upgrade
- Message channel interceptor
For cross-domain problems, we can use the setAllowedOrigins method to set the domain name that can be connected to prevent cross-site connections.
We can configure as follows whether the in-site users are allowed to connect
public class HandShkeInceptor extends HttpSessionHandshakeInterceptor { private static final Set ONLINE_USERS = new HashSet<>(); @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map Attributes) throws Exception {system.out.println (" before handshake "+ request.geturi ()); // Before converting websoket to HTTP, If (Request Instanceof ServletServerHttpRequest) {ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request; Request.getsession (true) : If there is a session, return the session, otherwise create a new session. / / request. GetSession (false) : If any session returns the session, otherwise returns NULL / / HttpSession session. = servletRequest getServletRequest () getSession (false); HttpSession session = servletRequest.getServletRequest().getSession(); UserEntity user = (UserEntity) session.getAttribute("user"); if (user ! = null) {// Here we use a simple session to store users, Principal Return super.beforeHandshake(Request, Response, wsHandler, Attributes); }else {system.out. println(" User not logged in, handshake failed!" ); return false; } } return false; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {system.out.println (" after handshake "); super.afterHandshake(request, response, wsHandler, ex); }}Copy the code
HttpSessionHandshakeInterceptor this interceptor is used to manage things, after shaking hands and we can request information, such as the token, or whether the user can connect session, so that it can prevent illegal users.
So how do you limit subscriptions to specific content? Let’s move on
public class MyChannelInterceptor extends ChannelInterceptorAdapter { @Autowired private StatDao statDao; @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Override public boolean preReceive(MessageChannel channel) { System.out.println("preReceive"); return super.preReceive(channel); } @Override public Message preSend(Message message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); / / test user subscriptions (prevent users SUBSCRIBE to illegal channels) if (StompCommand. SUBSCRIBE. Equals (command)) {/ / retrieved from the database user subscription channels compared (here to demonstrate directly instead of using the set set) set subedChannelInDB = new HashSet<>(); subedChannelInDB.add("/topic/group"); subedChannelInDB.add("/topic/online_user"); If (subedChannelInDB. The contains (accessor. GetDestination ())) {/ / the user subscription channel legal return. Super preSend (message, the channel); } else {// If the channel subscribed by the user is invalid, return null and the front-end user cannot receive the channel information. return null; } } else { return super.preSend(message, channel); } } @Override public void afterSendCompletion(Message message, MessageChannel channel, boolean sent, Exception ex) { //System.out.println("afterSendCompletion"); If the amount of data is too large, we can choose to use the cache database such as Redis. Because of the frequent deletion and increase of collection content, We select the set collection to store the online user StompHeaderAccessor Accessor = stompHeaderAccessor.wrap (message); StompCommand command = accessor.getCommand(); if (StompCommand.SUBSCRIBE.equals(command)){ Map map = (Map) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.add(map.get("user")); UserEntity user = map.get("user"); if(user ! = null){ statDao.pushOnlineUser(user); Guest guest = new Guest(); guest.setUserEntity(user); guest.setAccessTime(Calendar.getInstance().getTimeInMillis()); statDao.pushGuestHistory(guest); / / return through the websocket real-time online enclosing simpMessagingTemplate. ConvertAndSend ("/topic/online_user ", statDao getAllUserOnline ()); }} // If the user disconnects, Delete user information if (StompCommand. DISCONNECT. Equals (command)) {Map Map = (Map) accessor. GetHeader (" simpSessionAttributes "); //ONLINE_USERS.remove(map.get("user")); UserEntity user = map.get("user"); if (user ! = null){ statDao.popOnlineUser(user); simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } super.afterSendCompletion(message, channel, sent, ex); }}Copy the code
In STOMP, a Channel is a Channel through which messages are transmitted. A connection between a client and a server creates a Channel through which messages are transmitted. All messages have headers that are encapsulated in Spring’s Messag interface, such as CONNECT, and other information. Client subscription have to SUBSCRIBE to the head of the information the SUBSCRIBE, so if I can intercept everyone in this interceptor ChannelInterceptorAdapter subscription information, and then with the database information at the time, and finally to determine whether the user can SUBSCRIBE to the channel of information, right. This is my idea, according to this way of thinking, do single chat is not easy to solve it. How does a webSocket message from the client reach the subscriber? According to RabbitMQ rules, the subscriber belongs to the consumer, the sender belongs to the producer, and the producer sends the message to the server through the WebSocket. The server forwards the messages to the message broker (RabbitMQ), which stores the messages, manages the delivery rules, and pushes the messages to subscribers, as shown in the following code
@MessageMapping(value = "/chat")
@SendTo("/topic/group")
public MsgEntity testWst(String message , @Header(value = "simpSessionAttributes") Map session){
UserEntity user = (UserEntity) session.get("user");
String username = user.getRandomName();
MsgEntity msg = new MsgEntity();
msg.setCreator(username);
msg.setsTime(Calendar.getInstance());
msg.setMsgBody(message);
return msg;
}Copy the code
@messagemapping looks a lot like the SpringMVC method. It can be used at the class level as well as the method level. When the sender sends a message to /chat, the server receives the message and sends it to the subscriber of /topic/ Group, @sendto is who to send it to, The important thing to note here is that if we have no message broker configured and just use enableSimpleBroker(“/topic”,”/queue”) simple message broker, then it will be sent directly to the message subscribers, and if we have a message broker configured, it will be forwarded by the message broker.
If we want to send messages at any time on the server side rather than on the client side (which is a common scenario, such as sending global notifications), we can use the SimpMessagingTemplate class to inject the bean and send messages in the appropriate business scenario.
Redis statistics
Live broadcast rooms often need statistics, such as real-time online number of people, visits, contributions, subscriptions. The solution I’ve chosen is to use Redis for counting, although this demo may not be accessible by many people, but my goal is to learn how to use Redis by looking at the configuration of Redis in SpringBoot, okay
@Configuration Public class RedisConfig extends CachingConfigurerSupport{/** ** The policy that generates the key ** @return */ @bean public KeyGenerator keyGenerator() { return new KeyGenerator() { @Override public Object generate(Object target, Method method, Object... params) { StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getName()); sb.append(method.getName()); for (Object obj : params) { sb.append(obj.toString()); } return sb.toString(); }}; } /** * Manage cache ** @param redisTemplate * @return */ @SuppressWarnings(" rawTypes ") @bean public CacheManager cacheManager(RedisTemplate redisTemplate) { RedisCacheManager rcm = new RedisCacheManager(redisTemplate); // Set cache expiration time // rcm.setDefaultExpiration(60); Map Map =new HashMap(); map.put("test",60L); rcm.setExpires(map); return rcm; } /** * RedisTemplate configuration * @param factory * @return */ @bean public RedisTemplate RedisTemplate (RedisConnectionFactory) factory) { StringRedisTemplate template = new StringRedisTemplate(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer); / / if the key is a String need to configure the StringSerializer, or the key will be garbled/XX/XX template. The afterPropertiesSet (); //template.setStringSerializer(); return template; }}Copy the code
Redis data statistics Dao implementation
@Repository public class StatDao { @Autowired RedisTemplate redisTemplate; public void pushOnlineUser(UserEntity userEntity){ redisTemplate.opsForSet().add("OnlineUser",userEntity); } public void popOnlineUser(UserEntity userEntity){ redisTemplate.opsForSet().remove("OnlineUser" ,userEntity); } public Set getAllUserOnline(){ return redisTemplate.opsForSet().members("OnlineUser"); } public void pushGuestHistory(Guest Guest){if (redistemplate.opsForList ().size("Guest") == 200l){ redisTemplate.opsForList().rightPop("Guest"); } redisTemplate.opsForList().leftPush("Guest",guest); } public List getGuestHistory(){ return redisTemplate.opsForList().range("Guest",0,-1); }}Copy the code
The Dao layer is very simple because we only need to count the number of people online and visitors. But the online number is updated in real time, since we are using the websocket real-time data update is very easy, as we said before, in front of the interceptor can intercept the connection through the channel, subscription, disconnected, and so on event information, so we can store online users, when users to connect through the websocket returned online user information.
public class MyChannelInterceptor extends ChannelInterceptorAdapter { @Autowired private StatDao statDao; @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Override public boolean preReceive(MessageChannel channel) { System.out.println("preReceive"); return super.preReceive(channel); } @Override public Message preSend(Message message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); / / test user subscriptions (prevent users SUBSCRIBE to illegal channels) if (StompCommand. SUBSCRIBE. Equals (command)) {/ / retrieved from the database user subscription channels compared (here to demonstrate directly instead of using the set set) set subedChannelInDB = new HashSet<>(); subedChannelInDB.add("/topic/group"); subedChannelInDB.add("/topic/online_user"); If (subedChannelInDB. The contains (accessor. GetDestination ())) {/ / the user subscription channel legal return. Super preSend (message, the channel); } else {// If the channel subscribed by the user is invalid, return null and the front-end user cannot receive the channel information. return null; } } else { return super.preSend(message, channel); } } @Override public void afterSendCompletion(Message message, MessageChannel channel, boolean sent, Exception ex) { //System.out.println("afterSendCompletion"); If the amount of data is too large, we can choose to use the cache database such as Redis. Because of the frequent deletion and increase of collection content, We select the set collection to store the online user StompHeaderAccessor Accessor = stompHeaderAccessor.wrap (message); StompCommand command = accessor.getCommand(); if (StompCommand.SUBSCRIBE.equals(command)){ Map map = (Map) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.add(map.get("user")); UserEntity user = map.get("user"); if(user ! = null){ statDao.pushOnlineUser(user); Guest guest = new Guest(); guest.setUserEntity(user); guest.setAccessTime(Calendar.getInstance().getTimeInMillis()); statDao.pushGuestHistory(guest); / / return through the websocket real-time online enclosing simpMessagingTemplate. ConvertAndSend ("/topic/online_user ", statDao getAllUserOnline ()); }} // If the user disconnects, Delete user information if (StompCommand. DISCONNECT. Equals (command)) {Map Map = (Map) accessor. GetHeader (" simpSessionAttributes "); //ONLINE_USERS.remove(map.get("user")); UserEntity user = map.get("user"); if (user ! = null){ statDao.popOnlineUser(user); simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } super.afterSendCompletion(message, channel, sent, ex); }}Copy the code
Since the project has both mobile and computer sides, you need to determine which type the client belongs to based on the request agent UserAgent. The utility class is available in the source code. I won’t post it.
Server Deployment
All this im talk, but no live video. Don’t worry we’ll get to the video. At the beginning of this article, several media streaming protocols are explained. The detailed protocol process is not explained here. What we need to know is that we collect video information through streaming software, and how to collect it is not our concern. The collected information will be pushed to the specified server through software, as shown in the following figure
Obs Push stream Settings
Yasea mobile push stream Settings
The red part is the fetch stream interface that the server opens.
Nginx RTMP – the module configuration
There are many video servers and they support many media streaming protocols. Here we choose nginx-rtmp-module to do video service, next we need to install nginx under Linux, and install RTMP module. I am also a Linux beginner, step by step to explore the server to build a good, I heard that Tomcat and Nginx is very compatible with oh, so as a free open source of course the first choice of these two. Next you need to install the software and services on Linux.
- Nginx and Nginx – RTMP – module
- Tomcat
- Mysql
- Redis
- RabbitMQ
The nginx.conf file configuration is posted here
rtmp { server { listen 1935; chunk_size 4096; application video { play /yjdata/www/www/video; } application live { live on; hls on; hls_path /yjdata/www/www/live/hls/; hls_fragment 5s; }}}Copy the code
Play /yjdata/ WWW/WWW /video can play video directly under /yjdata/ WWW/WWW /video path. Hls_path defines the block storage path of HLS, because HLS is stored in the server in blocks by obtaining the pushed video stream information. So it has a higher latency than RTMP.
server { listen 80; server_name localhost; #charset koi8-r; index index.jsp index.html; root /yjdata/www/www; #access_log logs/host.access.log main; Location / {proxy_pass http://127.0.0.1:8080; } location ~ .*\.(gif|jpg|jpeg|png|bmp|swf|js|css|docx|pdf|doc|ppt|html|properties)$ { expires 30d; root /yjdata/www/www/static/; } location /hls { types { application/vnd.apple.mpegurl m3u8; #application/x-mpegURL; video/mp2t ts; } alias /yjdata/www/www/live/hls/; expires -1; add_header Cache-Control no-cache; } location /stat { rtmp_stat all; rtmp_stat_stylesheet stat.xsl; } location /stat.xsl { root /soft/nginx/nginx-rtmp-module/; }Copy the code
Configuration above the location point/HLS, alias is/yjdata/WWW/WWW/live/HLS /, so you can directly in the front through the domain name + / HLS / + filenames. M3u8 get live video. There’s a lot more to nginx configuration, and I’m still learning about it. All in all, Nginx is very powerful.
conclusion
By going from the front end => back end => server, the whole process still requires a lot of thought. But there are many gains. I will come out from the university, fresh out, is looking for a job, the article mistakes, please correct. Email: [email protected]