A background.
A recent project: After receiving the order assignment task from the upstream system, our system will push it to the relevant equipment of the designated store and conduct corresponding business processing.
Use of Netty
After receiving the order delivery task, push it to the device related to the specified store through Netty. Netty implements message push, long connection and heartbeat mechanism in our system.
2.1 Netty Server:
Each Netty server saves the client’s clientId and socketChannels through ConcurrentHashMap.
When the server sends a message to the client, it only needs to obtain the SocketChannel corresponding to the clientId and write corresponding message to the SocketChannel.
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new MessageEncoder());
p.addLast(new MessageDecoder());
p.addLast(newPushServerHandler()); }}); ChannelFuture future = bootstrap.bind(host,port).sync();if (future.isSuccess()) {
logger.info("server start...");
}
Copy the code
2.2 Netty Client:
The client receives the message from the server and then processes the business. The client also has a heartbeat mechanism that periodically sends Ping messages to the server through the IdleEvent event to check whether a SocketChannel is interrupted.
public PushClientBootstrap(String host, int port) throws InterruptedException {
this.host = host;
this.port = port;
start(host,port);
}
private void start(String host, int port) throws InterruptedException {
bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.group(workGroup)
.remoteAddress(host, port)
.handler(new ChannelInitializer(){
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new IdleStateHandler(20.10.0)); // IdleStateHandler is used to detect heartbeat
p.addLast(new MessageDecoder());
p.addLast(new MessageEncoder());
p.addLast(newPushClientHandler()); }}); doConnect(port, host); }/** * Establishes a connection and can be automatically reconnected. *@param port port.
* @param host host.
* @throws InterruptedException InterruptedException.
*/
private void doConnect(int port, String host) throws InterruptedException {
if(socketChannel ! =null && socketChannel.isActive()) {
return;
}
final int portConnect = port;
final String hostConnect = host;
ChannelFuture future = bootstrap.connect(host, port);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (futureListener.isSuccess()) {
socketChannel = (SocketChannel) futureListener.channel();
logger.info("Connect to server successfully!");
} else {
logger.info("Failed to connect to server, try connect after 10s");
futureListener.channel().eventLoop().schedule(new Runnable() {
@Override
public void run(a) {
try {
doConnect(portConnect, hostConnect);
} catch(InterruptedException e) { e.printStackTrace(); }}},10, TimeUnit.SECONDS);
}
}
}).sync();
}
Copy the code
Implement simple service registration and discovery with ZooKeeper
3.1 Service Registration
Service registries essentially decouple service providers and service consumers. Service registry is a highly available and consistent service discovery repository, which is mainly used to store service apis and address mappings. For high availability, service registries are typically clustered and distributed consistently. ZooKeeper, Etcd and so on are commonly used.
In our project, We use ZooKeeper to realize service registration.
public class ServiceRegistry {
private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
private CountDownLatch latch = new CountDownLatch(1);
private String registryAddress;
public ServiceRegistry(String registryAddress) {
this.registryAddress = registryAddress;
}
public void register(String data) {
if(data ! =null) {
ZooKeeper zk = connectServer();
if(zk ! =null) { createNode(zk, data); }}}/** * Connect to the ZooKeeper server *@return* /
private ZooKeeper connectServer(a) {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); }}}); latch.await(); }catch (IOException | InterruptedException e) {
logger.error("", e);
}
return zk;
}
/** * Create node *@param zk
* @param data
*/
private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes();
String path = zk.create(Constants.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.debug("create zookeeper node ({} => {})", path, data);
} catch (KeeperException | InterruptedException e) {
logger.error("", e); }}}Copy the code
With service registration, the IP address and port of the Netty server are registered with ZooKeeper after the Netty server is started.
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new MessageEncoder());
p.addLast(new MessageDecoder());
p.addLast(newPushServerHandler()); }}); ChannelFuture future = bootstrap.bind(host,port).sync();if (future.isSuccess()) {
logger.info("server start...");
}
if(serviceRegistry ! =null) {
serviceRegistry.register(host + ":" + port);
}
Copy the code
3.2 Service Discovery
Here we use client-side service discovery, that is, the mechanism of service discovery is implemented by the client.
Before establishing a connection with the server, the client obtains the address of the server by querying the registry. If multiple Netty servers exist, you can perform load balancing for the service. In our project, we only use a simple random method for loading.
public class ServiceDiscovery {
private static final Logger logger = LoggerFactory.getLogger(ServiceDiscovery.class);
private CountDownLatch latch = new CountDownLatch(1);
private volatile List<String> serviceAddressList = new ArrayList<>();
private String registryAddress; // Address of the registry
public ServiceDiscovery(String registryAddress) {
this.registryAddress = registryAddress;
ZooKeeper zk = connectServer();
if(zk ! =null) { watchNode(zk); }}/** * Obtain the address of the service provider * through service discovery@return* /
public String discover(a) {
String data = null;
int size = serviceAddressList.size();
if (size > 0) {
if (size == 1) { // Only one service provider
data = serviceAddressList.get(0);
logger.info("unique service address : {}", data);
} else { // Use random assignment. Simple load balancing method
data = serviceAddressList.get(ThreadLocalRandom.current().nextInt(size));
logger.info("choose an address : {}", data); }}return data;
}
/** * Connect to ZooKeeper *@return* /
private ZooKeeper connectServer(a) {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState() == Watcher.Event.KeeperState.SyncConnected) { latch.countDown(); }}}); latch.await(); }catch (IOException | InterruptedException e) {
logger.error("", e);
}
return zk;
}
/** * Get the service address list *@param zk
*/
private void watchNode(final ZooKeeper zk) {
try {
// Get the list of child nodes
List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
// Call this method again when the child node changes to update the service addresswatchNode(zk); }}}); List<String> dataList =new ArrayList<>();
for (String node : nodeList) {
byte[] bytes = zk.getData(Constants.ZK_REGISTRY_PATH + "/" + node, false.null);
dataList.add(new String(bytes));
}
logger.debug("node data: {}", dataList);
this.serviceAddressList = dataList;
} catch (KeeperException | InterruptedException e) {
logger.error("", e); }}}Copy the code
After the Netty client is started, the IP address and port of the Netty server are obtained through service discovery.
/** * The host and port of the Socket server can be obtained through service discovery@param discoveryAddress
* @throws InterruptedException
*/
public PushClientBootstrap(String discoveryAddress) throws InterruptedException {
serviceDiscovery = new ServiceDiscovery(discoveryAddress);
serverAddress = serviceDiscovery.discover();
if(serverAddress! =null) {
String[] array = serverAddress.split(":");
if(array! =null && array.length==2) {
String host = array[0];
int port = Integer.parseInt(array[1]); start(host,port); }}}Copy the code
4. To summarize
Service registration and discovery has always been a distributed core component. This article describes how to implement a simple service registration and discovery using ZooKeeper as a registry. In fact, there are many registry options, such as Etcd, Eureka and so on. Choosing what fits our business needs is the most important thing.
Java and Android technology stack: update and push original technical articles every week, welcome to scan the qr code of the public account below and pay attention to, looking forward to growing and progress with you together.