This is the third day of my participation in the November Gwen Challenge. Check out the details: the last Gwen Challenge 2021.

Someone asked me what responsive programming was,

Reactive programming is a programming paradigm declarative based on data stream and the propagation of change. What about data stream and declarative? Let’s talk about our Stream.

The use of a Stream consists of three steps (creating the Stream, performing the intermediate action, and performing the final action).

Intermediate: Opens a stream, performs some data filtering and mapping, and generates a new stream. The filter to filter

The map processing

Turn mapToInt int

FlatMap dimension reduction

The PEEK traversal operation is an intermediate operation

Skip (6)

Limit Clipping operation Limit (6) takes the first six elements

Sorted () sorted(Comparator<? super T> comparator)

Distinct to heavy.

A stream can only have one Terminal operation. After this operation is performed, the stream is used as “light” and cannot be operated again. So this must be the last operation of the stream. The execution of the Terminal operation will actually start the stream traversal and will produce a result, or side effect. f

orEach forEachOrdered

ToArray returns an Object[] toArray(Student[]::new);

The reduce induction operation needs to provide a starting value and perform operations according to certain rules

collect .collect(Collectors.toList()); .collect(Collectors.toCollection(TreeSet::new)); Transfer List map: collect (Collectors. ToMap (Student: : getAge, s – > s));

Max /min find the largest/smallest element. Max /min must be passed a Comparator

Count returns the number of elements

Short circuit termination: anyMatch anyMatch; AllMatch All matches; NoneMatch does not match; FindFirst returns the first element; FindAny returns an arbitrary element.

With all that said, what about data flows and declaratives? We used to process the data ourselves, but then we abstracted the data to be processed (into a data stream), and then processed the data in the data stream through the API (declarative).

Such as the following code; Turn the data in the array into a data stream, and process the data in the data stream with an explicit declaration calling.sum() to get the final result:

public static void main(String[] args) { int[] nums = { 1, 2, 3 }; int sum2 = IntStream.of(nums).parallel().sum(); System.out.println(" result: "+ sum2); }Copy the code

Having said that, today let’s take a look at webFlux to implement the function of WebSocket.

WebFlux itself supports the WebSocket protocol. To process WebSocket requests, the corresponding handler implements the WebSocketHandler interface. Each WebSocket has an associated WebSocketSession, which contains the handshake information, HandshakeInfo, and other related information when establishing the request. The session receive() method is used to receive data from the client, and the session send() method is used to send data to the client.

Directly on the code:

Step 1 Configure the WebSocket

@Configuration public class WebSocketConfig { @Bean public HandlerMapping handlerMapping(@Autowired EchoHandler EchoHandler){// define the mapping set Map<String, WebSocketHandler> Map = new HashMap<>(); // Configure the mapping model map.put("/websocket/{token}", echoHandler); SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping(); // HandlerMapping.setOrder (order.highest_precedence); // HandlerMapping.seturlmap (map); return handlerMapping; @bean public WebSocketHandlerAdapter WebSocketHandlerAdapter() {return new WebSocketHandlerAdapter(); }}Copy the code

Step 2: Develop a webSocketHandler

@Component
@Slf4j
public class EchoHandler implements WebSocketHandler {

    //websocket处理终端
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        log.info("websocket的握手信息={}", session.getHandshakeInfo().getUri());
        return session.send(session.receive().map(msg->session.textMessage("[Echo]"+msg.getPayloadAsText())));
    }
}
Copy the code

Step 3: Websocket test

www.jsons.cn/websocket/ input: ws: / / localhost: 8089 / websocket/test click connection, and the input message and console print a message to the following:

Successful connection, now you can send a message to test!

Message sent by You 2021-11-17 17:08:45 Hello

2021-11-17 17:08:45 [Echo] Hello

Your message 2021-11-17 17:08:54 Hello

2021-11-17 17:08:54 [Echo] Hello

This will send the webSocket request/webSocket /{MSG} to EchoHandler. Also create a corresponding WebSocketHandlerAdapter for the WebSocket type handler so that the DispatcherHandler can call our WebSocketHandler. After completing these three steps, when a WebSocket request arrives at WebFlux, DispatcherHandler will first process it and find the corresponding WebSocket request handler according to the existing HandlerMapping. We then find that this handler implements the WebSocketHandler interface, so we call this handler through the WebSocketHandlerAdapter.

Obviously, there are problems with this operation. WebSocket is full duplex communication, so how to realize bidirectional communication, instead of one request and one return, is of no practical significance. In addition, the URL configuration mapping model of SimpleUrlHandlerMapping is also determined by the state. Adding a new Handler will require modification of the configuration, which is very troublesome. So how do you optimize?

And the next time it breaks down.