preface
Welcome back to the Vert.x Blueprint series! Nowadays, microservice architecture is becoming more and more popular, and developers want to try the development and architecture of microservice applications. Vert.x provides an exciting suite of components for microservice development, including Service Discovery, Circuit Breaker, and others. With the help of vert. x microservices, we were able to quickly leverage vert. x to build our microservices application. In this blueprint tutorial, we’ll explore a micro-shop microservice application developed using vert. x components
Through this tutorial, you will learn the following:
- Microservices Architecture
- How to use vert. x to develop microservices applications
- Asynchronous development pattern
- Reactive, functional programming
- Event Sourcing
- Asynchronous RPC calls are made via distributed Event Bus
- Various service types (e.g. REST, data sources, Event Bus services, etc.)
- Vert.x Service Discovery
- Vert.x Circuit Breaker
- How to implement API Gateway with vert. x
- How to Authenticate permission (OAuth 2 + Keycloak)
- How to configure and use the Sockjs-Event Bus Bridge
And a few other things…
This tutorial is the third in the vert. x blueprint series, corresponding to vert. x version 3.3.2. The complete code for this tutorial is hosted on GitHub.
Step into the door of micro services
The term “microservices” is familiar to you — at least it sounds familiar. More and more developers are embracing microservices architecture, so what exactly is microservices? To sum up in a word:
Microservices are small, autonomous services that work together.
Let’s dive into the various features of microservices to see why they are so good:
- First, the important thing about microservices is micro. Each microservice is independent, and each individual microservice component focuses on a particular logic. In the microservices architecture, we split traditional monolithic applications into many independent components. Each component is defined by its own specific “logical boundaries”, so components are not overly large. But then again, how small should each component be? This is a difficult question to answer, and it often depends on our business and load. As Sam Newman explains in his book Building Microservices:
We seem to have a very good sense of what is too big, and so it could be argued that once a piece of code no longer feels too big, it’s probably small enough.
Thus, when we don’t think each component is particularly large, the size of the component may be just right.
- In the microservices architecture, components can communicate with each other over any protocol, such as HTTP or AMQP.
- Each component is independent, so we can use different programming languages, different technologies in different components — this is called Polyglot support (yes, vert.x is also multilingual!).
- Each component is developed, deployed, and distributed independently, so this reduces the difficulty of deployment and distribution.
- Microservice architectures often go hand in hand with distributed systems, so we need to consider all aspects of distributed systems, including availability, resilience, and extensibility.
- Microservice architectures are often designed to be failure oriented because failure scenarios in distributed systems are complex and we need means to deal with failure effectively.
Despite all the benefits of microservices, it is important to remember that microservices are not a silver bullet because they introduce all kinds of problems that occur in distributed systems, so we need to consider these situations when designing our architecture.
Service discovery
In a microservice architecture, each component is independent and does not know the location of the other components, but components need to communicate with each other, so we must know the location of each component. However, it’s obviously not a good idea to write location information down in code, so we need a mechanism to dynamically record the location of each component — that’s service discovery. With the service discovery module, we can publish the location of the service to the service discovery module, from which other services can get the location of the service they want to call and invoke it. We do not need to know the location of the corresponding service in the process of invoking the service, so when the service location or environment changes, the service invocation can not be affected, which makes our architecture more flexible.
Vert.x provides a service discovery module for publishing and retrieving service records. In the vert. x service discovery module, each service is abstracted into a Record. The service provider can publish the service to the service discovery module. In this case, the Record is stored in a local Map, distributed Map, or Redis based on the configuration of underlying ServiceDiscoveryBackend. The service consumer can get the service record from the service discovery module and then use the service record to get the corresponding service instance and then make the service invocation. Vert.x currently supports several service types natively, such as Event Bus services (service proxies), HTTP endpoints, message sources, and data sources. Of course, we can also implement our own service type, refer to the relevant documentation. We’ll cover how to use the service discovery module in more detail later, but here’s a quick overview.
Asynchronous, responsive vert.x
Both asynchronous and responsive styles work well with microservice architectures, and Vert.x has both! The asynchronous development pattern is pretty familiar, and the responsive style is familiar if you’ve read the previous blueprints. With future-based and RXJava-based asynchronous development patterns, we can combine and transform asynchronous processes at will, and the code can be very simple and beautiful! In this blueprint tutorial, we’ll see a number of asynchronous methods based on Future and RxJava.
Mirco Shop microservices app
Now that you have an overview of the microservices architecture, let’s take a look at the microservices applications in this blueprint. This is a simple micro-shop micro-service application (currently only basic functions are completed), people can do online shopping and transactions… The current version of the microservice application contains the following components:
- Account service: Provides user account operation services and uses MySQL as the back-end storage.
- Commodity service: Provides commodity operation services, using MySQL as the back-end storage.
- Inventory service: to provide the operation service of commodity inventory, such as inventory query, increase inventory or reduce inventory. Use Redis for back-end storage.
- Online store service: provides online store operation and management service, using MongoDB as back-end storage.
- Shopping cart service: Provides shopping cart event generation and shopping cart operation (add, remove, and settle) services. We use this service to tell the source of the event.
- Order service: The order service receives the order request sent by the shopping cart service from the Event Bus, processes the order and sends it to the underlying service (in this case, simply to the database).
- Micro Shop Front End: The front end portion (SPA) of this Micro service is now integrated into the API Gateway component.
- Monitoring dashboard: Used to monitor the status of the microservice system and view logs and statistics.
- API Gateway: The entry point to the entire microservice, which is responsible for distributing the received requests to the REST endpoints of the corresponding components according to certain rules (equivalent to a reverse proxy). It is also responsible for permission authentication and management, load balancing, heartbeat detection, and failure handling (using vert. x Circuit Breaker).
Micro Shop Micro service architecture
Let’s take a look at the Micro Shop Micro service application architecture:
User requests first pass through the API Gateway, which processes them and distributes them to the corresponding business endpoint.
Let’s take a look at the internal structure of each of the base components (the business components at the bottom of the diagram).
Component structure
Each base component has at least two Verticle: the service Verticle and the REST Verticle. REST Vertice provides the REST endpoint for the service and is also responsible for publishing this endpoint to the service discovery layer. The service Verticle is responsible for publishing other services (such as Event Bus services or message sources) and deploying REST Verticle.
Each base component contains a corresponding service interface, such as the ProductService interface in the commodity component. These service interfaces are Event Bus services, decorated with the @ProxyGen annotation. As mentioned in the previous blueprint tutorial, vert. x Service Proxy automatically generates Service Proxy classes for @ProxyGen annotated interfaces, so we can easily make asynchronous RPC calls on the Event Bus without writing extra code. Cool!!!! And with the service discovery component, it is very easy to publish the Event Bus service to the service discovery layer so that other components can invoke the service more easily.
Communication between components
Let’s start by looking at the types of services used in our microservices application:
- HTTP endpoints (e.g. REST endpoints and API gateways) – the location of this service is described by a URL
- Event Bus service – The location of this service is described by a specific address on the Event Bus
- Event source – The Event source service corresponds to an Event consumer at an address on the Event Bus. The location of this service is described by a specific address on the Event Bus
Therefore, our components can communicate with each other via HTTP as well as the Event Bus (essentially TCP), for example:
The API Gateway communicates with other components over HTTP.
Let’s get started!
Now, let’s start our journey of microservices blueprint! Clone project on GitHub:
Git clone github.com/sczyh30/ver…
In this blueprint tutorial, we use Maven as a build tool. Let’s start by looking at the POM.xml configuration file. As we can see, our blueprint application is made up of many modules:
microservice-blueprint-common
account-microservice
product-microservice
inventory-microservice
store-microservice
shopping-cart-microservice
order-microservice
api-gateway
cache-infrastructure
monitor-dashboard
Copy the code
Each module represents a component. Looking at the configuration file, there seem to be quite a few components! Don’t worry, we’ll explore these components one by one. Let’s start by looking at the base module for all components – MicroService – Blueprint-Common.
Basic module of microservices
The MicroService-Blueprint-Common module provides some helper classes related to microservice functionality as well as helper Verticle. Let’s start with two Base verticles-Base icroservice everticle and RestAPIVerticle.
Base Microservice Verticle
BaseMicroserviceVerticle provides microservice-related initialization functions as well as a variety of helper functions. Every other Verticle inherits this Verticle, so the base Verticle is very important.
First let’s look at the member variables:
protected ServiceDiscovery discovery;
protected CircuitBreaker circuitBreaker;
protected Set registeredRecords = new ConcurrentHashSet<>();
Copy the code
Discovery and circuitBreaker represent service discovery instances and circuit breakers instances, respectively, while registeredRecords represent a collection of currently published service records that are used to unregister the service upon termination of Verticle.
The start function mainly initializes the service discovery instance and the circuit breaker instance, and the configuration file is obtained from config(). The implementation is very simple:
@Override public void start() throws Exception { // init service discovery instance discovery = ServiceDiscovery.create(vertx, new ServiceDiscoveryOptions().setBackendConfiguration(config())); // init circuit breaker instance JsonObject cbOptions = config().getJsonObject("circuit-breaker") ! = null ? config().getJsonObject("circuit-breaker") : new JsonObject(); circuitBreaker = CircuitBreaker.create(cbOptions.getString("name", "circuit-breaker"), vertx, new CircuitBreakerOptions() .setMaxFailures(cbOptions.getInteger("max-failures", 5)) .setTimeout(cbOptions.getLong("timeout", 10000L)) .setFallbackOnFailure(true) .setResetTimeout(cbOptions.getLong("reset-timeout", 30000L)) ); }Copy the code
Below we also provide several helper functions for publishing various services. These functions are asynchronous and based on the Future:
protected Future publishHttpEndpoint(String name, String host, int port) {
Record record = HttpEndpoint.createRecord(name, host, port, "/",
new JsonObject().put("api.name", config().getString("api.name", ""))
);
return publish(record);
}
protected Future publishMessageSource(String name, String address) {
Record record = MessageSource.createRecord(name, address);
return publish(record);
}
protected Future publishJDBCDataSource(String name, JsonObject location) {
Record record = JDBCDataSource.createRecord(name, location, new JsonObject());
return publish(record);
}
protected Future publishEventBusService(String name, String address, Class serviceClass) {
Record record = EventBusService.createRecord(name, address, serviceClass);
return publish(record);
}
Copy the code
As we mentioned earlier, each service Record represents a service, where the service type is identified by the Type field in the Record. Vert.x’s native support for various service interfaces includes several createRecord methods so that we can easily create service records using these methods. Usually we need to give each service a name so that we can retrieve the service by name. We can also use the setMetadata method to add additional metadata to the service record.
You may have noticed that in the publishHttpEndpoint method we provide metadata with apI-name, which we will see later is used by the API Gateway for reverse proxies.
Let’s look at the publish method, a common way to publish a service:
private Future publish(Record record) { Future future = Future.future(); // publish the service discovery.publish(record, ar -> { if (ar.succeeded()) { registeredRecords.add(record); logger.info("Service + ar.result().getName() + "> published"); future.complete(); } else { future.fail(ar.cause()); }}); return future; }Copy the code
In the Publish method, we invoke the publish method of the service discovery instance Discovery to publish the service to the service discovery module. It is also an asynchronous method, and when the publication is successful, we store the service record to registeredRecords, print the log and notify the Future that the operation has completed. Finally, return the corresponding Future.
Note that in the design of the current version of vert. x Service Discovery (3.3.2), the Service publisher needs to manually unregister the Service if necessary, so when Verticle ends, we need to unregister all registered services:
@Override public void stop(Future future) throws Exception { // In current design, the publisher is responsible for removing the service List futures = new ArrayList<>(); for (Record record : registeredRecords) { Future unregistrationFuture = Future.future(); futures.add(unregistrationFuture); discovery.unpublish(record.getRegistration(), unregistrationFuture.completer()); } if (futures.isEmpty()) { discovery.close(); future.complete(); } else { CompositeFuture.all(futures) .setHandler(ar -> { discovery.close(); if (ar.failed()) { future.fail(ar.cause()); } else { future.complete(); }}); }}Copy the code
In the Stop method, we iterate through the registeredRecords collection and try to unregister each service, adding the asynchronous result Future to the Futures list. We then call CompositeFuture.all(Futures) to get the status of each asynchronous result in turn. The all method returns a combined Future, which is successful if all of the futures in the list are successfully assigned, and failed if any asynchronous result fails. Therefore, we bind it to a Handler so that when all services are deregistered, the service discovery module can be safely shut down, otherwise the termination function will fail.
REST API Verticle
The RestAPIVerticle abstract class inherits the BaseMicroserviceVerticle abstract class. As the name suggests, it provides a number of helper methods for REST API development. We encapsulate things like creating a server, enabling Cookie and Session support, enabling heartbeat detection support (over HTTP), various routing processing encapsulation, and routing handlers for permission validation. We will look at these methods in later chapters.
Ok, now that we’ve seen the two base Verticle in the entire blueprint application, it’s time to explore the individual modules! Before exploring the logical components, let’s take a look at one of the most important, the API Gateway.
API Gateway
We have classified the content of API Gateway as a separate tutorial, see vert. x Blueprint – Micro Shop API Gateway.
Event Bus Service – Accounts, online stores and commodity services
Asynchronous RPC is performed on the Event Bus
We covered asynchronous RPC (also known as a service proxy) in vert. x earlier in the Vert.x Kue blueprint tutorial, so let’s go back and talk about how asynchronous RPC can be made easier with the service discovery module.
Traditional RPC has one disadvantage: the consumer needs to block and wait for the producer to respond. This is a blocking model that doesn’t fit with vert. x’s asynchronous development paradigm. Also, traditional RPC is not really designed for failure. Fortunately, vert. x provides an efficient, responsive RPC called asynchronous RPC. Instead of waiting for a response from the producer, we simply pass a Handler> argument to the asynchronous method. This conveniently calls the corresponding Handler when the producer result is received, which is consistent with vert. x’s asynchronous development pattern. Also, AsyncResult is designed for failure.
Vert.x Service Proxy automatically processes the @ProxyGen annotated Service interface to generate the corresponding Service Proxy class. The generated service proxy classes help us encapsulate the data and send it to the Event Bus, receive it from the Event Bus, encode and decode the data, so we can save a lot of code. All we need to do is follow some of the restrictions of the @ProxyGen annotation.
For example, here is an Event Bus service interface:
@ProxyGen
public interface MyService {
@Fluent
MyService retrieveData(String id, Handler> resultHandler);
}
Copy the code
We can use the vert. x Service Proxy component to generate the corresponding Proxy class. We can then register this service with the Event Bus via the registerService method of the ProxyHelper class:
MyService myService = MyService.createService(vertx, config);
ProxyHelper.registerService(MyService.class, vertx, myService, SERVICE_ADDRESS);
Copy the code
With service discovery components in place, publishing services to the service discovery layer is easy. For example, in our blueprint application we use encapsulated methods:
publishEventBusService(SERVICE_NAME, SERVICE_ADDRESS, MyService.class)
Copy the code
OK, the service has now been successfully published to the service discovery module. Now we can get the published EventBus service from the service discovery layer through the getProxy method of the EventBusService interface and do the asynchronous RPC as if we were calling a normal asynchronous method:
EventBusService.getProxy(discovery, new JsonObject().put("name", SERVICE_NAME), ar -> {
if (ar.succeeded()) {
MyService myService = ar.result();
myService.retrieveData(...);
}
});
Copy the code
Common features for several service modules
In our Micro Shop application, there are several common features and conventions for accounts, online stores and goods and services. Now let’s explain.
Of the three modules, each contains:
- An Event Bus service interface. This service interface defines various operations on the entity store
- Implementation of the service interface
- The REST API Verticle is used to create a server and publish it to the service discovery module
- Main Verticle for deploying other VertiCLES and publishing Event Bus services and message sources to the service discovery layer
Among them, the user account service and commodity service use MySQL as the back-end storage, while the online shop service uses MongoDB as the back-end storage. Here we’ll pick just two typical services to show you how to manipulate different databases through vert. x: product-microService and store-microService. The account-MicroService implementation is very similar to product-MicroService, and you can check out the code on GitHub.
Mysql-based goods and services
The commodity microservice module provides the operation functions of commodities, including adding, querying (searching), deleting and updating commodities, etc. One of the most important is the ProductService service interface and its implementation. Let’s first look at the definition of this service interface:
@VertxGen
@ProxyGen
public interface ProductService {
/**
* The name of the event bus service.
*/
String SERVICE_NAME = "product-eb-service";
/**
* The address on which the service is published.
*/
String SERVICE_ADDRESS = "service.product";
/**
* Initialize the persistence.
*/
@Fluent
ProductService initializePersistence(Handler> resultHandler);
/**
* Add a product to the persistence.
*/
@Fluent
ProductService addProduct(Product product, Handler> resultHandler);
/**
* Retrieve the product with certain `productId`.
*/
@Fluent
ProductService retrieveProduct(String productId, Handler> resultHandler);
/**
* Retrieve the product price with certain `productId`.
*/
@Fluent
ProductService retrieveProductPrice(String productId, Handler> resultHandler);
/**
* Retrieve all products.
*/
@Fluent
ProductService retrieveAllProducts(Handler>> resultHandler);
/**
* Retrieve products by page.
*/
@Fluent
ProductService retrieveProductsByPage(int page, Handler>> resultHandler);
/**
* Delete a product from the persistence
*/
@Fluent
ProductService deleteProduct(String productId, Handler> resultHandler);
/**
* Delete all products from the persistence
*/
@Fluent
ProductService deleteAllProducts(Handler> resultHandler);
}
Copy the code
As we mentioned earlier, the service interface is an Event Bus service, so we need to annotate it with @ProxyGen. These methods are asynchronous, so each method needs to accept a Handler> parameter. When the asynchronous operation completes, the corresponding Handler is called. Notice that we also annotated this interface with @vertxgen. As mentioned in the previous blueprint tutorial, this is to enable Polyglot Language support. Vert.x Codegen annotation processor automatically processes classes with @VertxGen annotations and generates code for other supported languages, such as Ruby, JS, etc… This is ideal for microservice architectures because different components can be developed in different languages!
The meaning of each method is given in the comments and will not be explained here.
The implementation of the commodity Service interface is in the ProductServiceImpl class. The commodity information is stored in MySQL, so we can manipulate the database via vert.x-JDBC. We covered vert. x JDBC usage in detail in the first blueprint tutorial, so we won’t go into the details here. Let’s focus on reducing the amount of code. Because the process of simple database operations is often the same, encapsulation is necessary.
Let’s start by reviewing the process of each database operation:
- from
JDBCClient
To get a database connectionSQLConnection
This is an asynchronous process - Execute SQL statement and bind callback
Handler
- Finally, don’t forget to close the database connection to free up resources
For normal CRUD operations, they are all implemented similarly, so we have encapsulated a JdbcRepositoryWrapper class to implement the generic logic. It is located in io.vertx.blueprint.microservice.com mon. Service package:
We provide the following encapsulation methods:
executeNoResult
: Execute SQL statements with parameters (passupdateWithParams
Methods). The execution result is not needed, so you only need to accept oneHandler>
Type parameter. This method is commonly used forinsert
And so on.retrieveOne
: Executes an SQL statement with parameters to get a particular entity (throughqueryWithParams
Methods). This method is based onFuture
It returns oneFuture>
Type of asynchronous results. If the result set is empty, an empty is returnedOptional
Monad. If the result set is not empty, the first result is returned and usedOptional
For packaging.retrieveMany
: Gets multiple entities and returnsFuture>
As an asynchronous result.retrieveByPage
And:retrieveMany
Methods are similar, but contain paging logic.retrieveAll
: similar toretrieveMany
method but does not require query parameters as it simply executes statement such asSELECT * FROM xx_table
.removeOne
andremoveAll
: remove entity from the database.
Of course, the disadvantage of this compared to Spring JPA is that you have to write your own SQL statements, and it is not very convenient to encapsulate them yourself… Given that vert. x JDBC underlayer also uses Worker thread pools to wrap native JDBC (not true async), we can also combine Spring Data components to simplify development. In addition, vert. x JDBC uses C3P0 as the default database connection pool. Therefore, it is necessary to switch to HikariCP with better performance.
Go back to the JdbcRepositoryWrapper. This layer of encapsulation can greatly reduce the amount of code. For example, our ProductServiceImpl implementation class could inherit the JdbcRepositoryWrapper class and then take advantage of these wrapped methods. Look at an example — an implementation of the retrieveProduct method:
@Override
public ProductService retrieveProduct(String productId, Handler> resultHandler) {
this.retrieveOne(productId, FETCH_STATEMENT)
.map(option -> option.map(Product::new).orElse(null))
.setHandler(resultHandler);
return this;
}
Copy the code
All we need to do is convert the result to the desired type. Is it convenient?
Of course, this is not the only way. In the following sections, we will talk about a more Reactive, Functional approach that utilizes the Rx version of vert.x JDBC. Also, using vertx-sync is a good option (similar to async/await).
All right! Having looked at the service implementation, it’s time for the REST API. Let’s look at the implementation of RestProductAPIVerticle:
public class RestProductAPIVerticle extends RestAPIVerticle { public static final String SERVICE_NAME = "product-rest-api"; private static final String API_ADD = "/add"; private static final String API_RETRIEVE = "/:productId"; private static final String API_RETRIEVE_BY_PAGE = "/products"; private static final String API_RETRIEVE_PRICE = "/:productId/price"; private static final String API_RETRIEVE_ALL = "/products"; private static final String API_DELETE = "/:productId"; private static final String API_DELETE_ALL = "/all"; private final ProductService service; public RestProductAPIVerticle(ProductService service) { this.service = service; } @Override public void start(Future future) throws Exception { super.start(); final Router router = Router.router(vertx); // body handler router.route().handler(BodyHandler.create()); // API route handler router.post(API_ADD).handler(this::apiAdd); router.get(API_RETRIEVE).handler(this::apiRetrieve); router.get(API_RETRIEVE_BY_PAGE).handler(this::apiRetrieveByPage); router.get(API_RETRIEVE_PRICE).handler(this::apiRetrievePrice); router.get(API_RETRIEVE_ALL).handler(this::apiRetrieveAll); router.patch(API_UPDATE).handler(this::apiUpdate); router.delete(API_DELETE).handler(this::apiDelete); router.delete(API_DELETE_ALL).handler(context -> requireLogin(context, this::apiDeleteAll)); enableHeartbeatCheck(router, config()); // get HTTP host and port from configuration, Or use default value String host = config().getString("product.http.address", "0.0.0.0"); int port = config().getInteger("product.http.port", 8082); // create HTTP server and publish REST service createHttpServer(router, host, port) .compose(serverCreated -> publishHttpEndpoint(SERVICE_NAME, host, port)) .setHandler(future.completer()); } private void apiAdd(RoutingContext context) { try { Product product = new Product(new JsonObject(context.getBodyAsString())); service.addProduct(product, resultHandler(context, r -> { String result = new JsonObject().put("message", "product_added") .put("productId", product.getProductId()) .encodePrettily(); context.response().setStatusCode(201) .putHeader("content-type", "application/json") .end(result); })); } catch (DecodeException e) { badRequest(context, e); } } private void apiRetrieve(RoutingContext context) { String productId = context.request().getParam("productId"); service.retrieveProduct(productId, resultHandlerNonEmpty(context)); } private void apiRetrievePrice(RoutingContext context) { String productId = context.request().getParam("productId"); service.retrieveProductPrice(productId, resultHandlerNonEmpty(context)); } private void apiRetrieveByPage(RoutingContext context) { try { String p = context.request().getParam("p"); int page = p == null ? 1 : Integer.parseInt(p); service.retrieveProductsByPage(page, resultHandler(context, Json::encodePrettily)); } catch (Exception ex) { badRequest(context, ex); } } private void apiRetrieveAll(RoutingContext context) { service.retrieveAllProducts(resultHandler(context, Json::encodePrettily)); } private void apiDelete(RoutingContext context) { String productId = context.request().getParam("productId"); service.deleteProduct(productId, deleteResultHandler(context)); } private void apiDeleteAll(RoutingContext context, JsonObject principle) { service.deleteAllProducts(deleteResultHandler(context)); }}Copy the code
This Verticle inherits from RestAPIVerticle, so we can take advantage of many of the helper methods. First, look at the startup process, the start method. We first call super.start() to initialize the service discovery component, then create the Router, bind the BodyHandler to handle the request body, and then create the API routes and bind the handlers. We then call the enableHeartbeatCheck method to enable simple heartbeat detection support. Finally, we create the HTTP server using the wrapped createHttpServer and publish the HTTP endpoint to the service discovery module using the publishHttpEndpoint method.
The createHttpServer method is very simple. We just changed the vertx.createHttpServer method to future-based:
protected Future createHttpServer(Router router, String host, int port) {
Future httpServerFuture = Future.future();
vertx.createHttpServer()
.requestHandler(router::accept)
.listen(port, host, httpServerFuture.completer());
return httpServerFuture.map(r -> null);
}
Copy the code
For details on how to implement each route processing logic, refer to vert. x Blueprint – Todo Backend Tutorial for trust information.
Finally we open the Main Verticle-ProductVerticle class in this microservice module. As we mentioned earlier, it is responsible for publishing services and deploying REST Verticle. Let’s look at the start method:
@Override
public void start(Future future) throws Exception {
super.start();
// create the service instance
ProductService productService = new ProductServiceImpl(vertx, config()); // (1)
// register the service proxy on event bus
ProxyHelper.registerService(ProductService.class, vertx, productService, SERVICE_ADDRESS); // (2)
// publish the service in the discovery infrastructure
initProductDatabase(productService) // (3)
.compose(databaseOkay -> publishEventBusService(ProductService.SERVICE_NAME, SERVICE_ADDRESS, ProductService.class)) // (4)
.compose(servicePublished -> deployRestService(productService)) // (5)
.setHandler(future.completer()); // (6)
}
Copy the code
First we create an instance of the ProductService service (1), and then register the service with the Event Bus (2) through the registerService method. We then initialize the database table (3), publish the goods and services to the service discovery layer (4), and deploy REST Verticle (5). This is a series of asynchronous method combinations. Finally, we bind future.completer() to the combined Future so that it completes automatically when all asynchronous operations are OK.
Of course, don’t forget to specify api.name in the configuration. As mentioned earlier in the API Gateway section, the reverse proxy part of the API Gateway distributes requests through api.name of the corresponding REST service. By default, api.name is product:
{
"api.name": "product"
}
Copy the code
Online shop service based on MongoDB
Online shop services are used for online shop operations, such as opening, closing and updating data. Normally, a manual application is required to open a shop, but in this blueprint tutorial we have simplified this step. The structure of the shop service module is very similar to the commodity service module, so we won’t go into details. We’ll just take a look at how to use vert. x Mongo Client.
Using vert. x MongoClient is very simple. First we need to create a MongoClient instance similar to JDBCClient:
private final MongoClient client;
public StoreCRUDServiceImpl(Vertx vertx, JsonObject config) {
this.client = MongoClient.createNonShared(vertx, config);
}
Copy the code
Then we can use it to manipulate Mongo. For example, if we want to perform a save operation, we can write:
@Override public void saveStore(Store store, Handler> resultHandler) { client.save(COLLECTION, new JsonObject().put("_id", store.getSellerId()) .put("name", store.getName()) .put("description", store.getDescription()) .put("openTime", store.getOpenTime()), ar -> { if (ar.succeeded()) { resultHandler.handle(Future.succeededFuture()); } else { resultHandler.handle(Future.failedFuture(ar.cause())); }}); }Copy the code
These operations are asynchronous, so you must be familiar with this pattern! Of course, if you don’t like callback based asynchronous mode, you can also choose the Rx version of the API ~
For more details on using vert. x Mongo Client, please refer to the official documentation.
Redis-based goods inventory service
The Goods inventory service is responsible for manipulating the inventory quantity of goods, such as adding inventory, reducing inventory, and retrieving the current inventory quantity. Inventory is stored using Redis.
Unlike the previous Event Bus service, our goods inventory service is future-based rather than callback-based. Since the service proxy module does not support handling future-based service interfaces, we will dispense with asynchronous RPC here and just publish a REST API endpoint, with all calls made through REST.
First, take a look at the InventoryService service interface:
public interface InventoryService { /** * Create a new inventory service instance. * * @param vertx Vertx instance * @param config configuration object * @return a new inventory service instance */ static InventoryService createService(Vertx vertx, JsonObject config) { return new InventoryServiceImpl(vertx, config); } /** * Increase the inventory amount of a certain product. * * @param productId the id of the product * @param increase increase amount * @return the asynchronous result of current amount */ Future increase(String productId, int increase); /** * Decrease the inventory amount of a certain product. * * @param productId the id of the product * @param decrease decrease amount * @return the asynchronous result of current amount */ Future decrease(String productId, int decrease); /** * Retrieve the inventory amount of a certain product. * * @param productId the id of the product * @return the asynchronous result of current amount */ Future retrieveInventoryForProduct(String productId); }Copy the code
The interface definition is very simple, and the meaning is given in the comments. Next, take a look at the service’s implementation class, InventoryServiceImpl. In Redis, all inventory quantities are stored in the Inventory :v1 namespace and identified by the item number productId. For example, good A123456 will be stored in inventory:v1:A123456 key-value pair.
Vert.x Redis provides incrby and decrby commands that make it easy to add and subtract inventory, with similar code. Here we just look at the inventory increase feature:
@Override
public Future increase(String productId, int increase) {
Future future = Future.future();
client.incrby(PREFIX + productId, increase, future.completer());
return future.map(Long::intValue);
}
Copy the code
Since the amount of inventory is not very large, Integer is sufficient, so we need to convert the Long result to Integer with a Long::intValue method reference.
The realization of the retrieveInventoryForProduct method is also very short:
@Override
public Future retrieveInventoryForProduct(String productId) {
Future future = Future.future();
client.get(PREFIX + productId, future.completer());
return future.map(r -> r == null ? "0" : r)
.map(Integer::valueOf);
}
Copy the code
We get the value through the get command. Since the result is String, we need to convert it to Integer ourselves. If the result is empty, we assume that the item is not in stock and return 0.
As for REST Verticle (also Main Verticle in this module), the implementation pattern is pretty much the same as the previous one, so I won’t expand on that here. Don’t forget to specify api.name in config.json:
{
"api.name": "inventory",
"redis.host": "redis",
"inventory.http.address": "inventory-microservice",
"inventory.http.port": 8086
}
Copy the code
Event traceability – Shopping cart services
Ok, now we are done with the basic service module. Now we come to another important service module – shopping cart microservices. This module is responsible for getting the shopping cart, adding shopping cart events, and billing functions. Instead of a traditional implementation, we will introduce a different development pattern — Event Sourcing.
The solution way Event Sourcing
In the traditional data storage model, we usually store the state of the data itself directly into the database. This is not a problem in general scenarios, but sometimes you can use Event Sourcing to solve this problem if you want to not only get the data, but also the process by which it was generated.
Event traceability ensures that data state changes are stored in the database as a series of events. So, we can not only get the events of each transformation, but also combine the data states of any time in the past through the events of the past! It is important to note that saved events and their sequences cannot be changed — that is, event stores can only be added, not removed, and need to be immutable. Does this feel similar to database transaction logging?
In microservices architecture, the event traceability pattern can bring the following benefits:
- We can construct data states at any given moment from a sequence of events in the past
- Each past event is saved, so this makes it possible to compensate transactions
- We can take the event stream from the event store and transform and process it in an asynchronous, reactive style
- Event stores can also be used as data logs
The choice of event storage also needs to be considered. Apache Kafka is well suited for this scenario, and in this version of the Micro Shop microservice, to simplify its implementation, we simply use MySQL as the event store. In the next version we will integrate Kafka.
Note: In a real production environment, shopping carts are typically stored in a Session or cache. This section describes the event storage mode only for event tracing.
Shopping cart events
Let’s look at the CartEvent data object representing the shopping CartEvent:
@DataObject(generateConverter = true) public class CartEvent { private Long id; private CartEventType cartEventType; private String userId; private String productId; private Integer amount; private long createdAt; public CartEvent() { this.createdAt = System.currentTimeMillis(); } public CartEvent(JsonObject json) { CartEventConverter.fromJson(json, this); } public CartEvent(CartEventType cartEventType, String userId, String productId, Integer amount) { this.cartEventType = cartEventType; this.userId = userId; this.productId = productId; this.amount = amount; this.createdAt = System.currentTimeMillis(); } public static CartEvent createCheckoutEvent(String userId) { return new CartEvent(CartEventType.CHECKOUT, userId, "all", 0); } public static CartEvent createClearEvent(String userId) { return new CartEvent(CartEventType.CLEAR_CART, userId, "all", 0); } public JsonObject toJson() { JsonObject json = new JsonObject(); CartEventConverter.toJson(this, json); return json; } public static boolean isTerminal(CartEventType eventType) { return eventType == CartEventType.CLEAR_CART || eventType == CartEventType.CHECKOUT; }}Copy the code
A shopping cart event stores the type of event, when it happened, the user, the corresponding item ID, and the change in the number of items. In our blueprint application, there are four types of shopping cart events, represented by the CartEventType enumeration class:
Public enum CartEventType {ADD_ITEM, // add items to cart REMOVE_ITEM, // remove items from cart REMOVE_ITEM, // CHECKOUT and empty CLEAR_CART // empty} public enum CartEventType {ADD_ITEM, REMOVE_ITEM, // remove items from cartCopy the code
The CHECKOUT and CLEAR_CART events operate on the entire shopping cart entity, and the corresponding shopping cart event parameters are similar, so we wrote two static methods to create both events.
We also noticed a static method, isTerminal, that detects if the current shopping cart event is a “finalization” event. By “terminating,” I mean that the entire shopping cart is operated on at this point (checkout or emptying). This approach is useful when building the corresponding shopping cart state from the shopping cart event stream.
Shopping cart entity
Having looked at the shopping cart event, let’s look at the shopping cart. The ShoppingCart entity is represented by a ShoppingCart data object, which contains a list of items representing the number of items currently in the cart:
private List productItems = new ArrayList<>();
Copy the code
The ProductTuple data object contains the item number, item seller ID, unit price, and amount of the current item in the cart.
For convenience, we also put a amountMap in the ShoppingCart class to temporarily store the number of items:
private Map amountMap = new HashMap<>();
Copy the code
Since it is only stored temporarily, we don’t want to see it in the corresponding JSON data, so annotate both its getter and setter methods with @genignore.
In event traceability mode, we build the corresponding shopping cart state from a series of shopping cart events, so we need to incorporate each shopping cart event into the cart to change the number of items:
Public ShoppingCart incorporate(CartEvent CartEvent) {// This event must be an add or remove event Boolean ifValid = Stream.of(CartEventType.ADD_ITEM, CartEventType.REMOVE_ITEM) .anyMatch(cartEventType -> cartEvent.getCartEventType().equals(cartEventType)); if (ifValid) { amountMap.put(cartEvent.getProductId(), amountMap.getOrDefault(cartEvent.getProductId(), 0) + (cartEvent.getAmount() * (cartEvent.getCartEventType() .equals(CartEventType.ADD_ITEM) ? 1:1))); } return this; }Copy the code
The implementation is relatively simple, we first check whether the event to merge is an add or remove item event, if so, we change the amount of the item in the current shopping cart according to the event type and the corresponding quantity change (amountMap).
Use the Rx version of vert.x JDBC
Now that we know about the entity classes in the shopping cart microservice, it’s time to look at the shopping cart Event store service.
Vert.x supports integration with RxJava, and almost every vert. x component has an Rx version! Reactive: Vert.x JDBC (Rx version) to write our shopping cart event storage service. That is, all asynchronous methods will be Observable, frP-style!
We first define a simple CRUD interface called SimpleCrudDataSource:
public interface SimpleCrudDataSource {
Observable save(T entity);
Observable retrieveOne(ID id);
Observable delete(ID id);
}
Copy the code
We then define a CartEventDataSource interface that defines methods related to shopping cart event fetching:
public interface CartEventDataSource extends SimpleCrudDataSource {
Observable streamByUser(String userId);
}
Copy the code
As you can see, this interface has only one method — the streamByUser method returns a shopping cart event stream corresponding to a user, so we can stream it later!
Let’s take a look at the service implementation class CartEventDataSourceImpl. The first is the save method, which stores an event to the event database:
@Override
public Observable save(CartEvent cartEvent) {
JsonArray params = new JsonArray().add(cartEvent.getCartEventType().name())
.add(cartEvent.getUserId())
.add(cartEvent.getProductId())
.add(cartEvent.getAmount())
.add(cartEvent.getCreatedAt() > 0 ? cartEvent.getCreatedAt() : System.currentTimeMillis());
return client.getConnectionObservable()
.flatMap(conn -> conn.updateWithParamsObservable(SAVE_STATEMENT, params))
.map(r -> null);
}
Copy the code
Look at our code and see if it is more concise and Reactive than normal callback-based vert. x JDBC. We can be very simply by getConnectionObservable method to obtain a database connection, and then combined updateWithParamsObservable method performs the corresponding containing SQL statements. It only takes two lines no! In callback-based style, you can only write:
client.getConnection(ar -> { if (ar.succeeded) { SQLConnection connection = ar.result(); connection.updateWithParams(SAVE_STATEMENT, params, ar2 -> { // ... }) } else { resultHandler.handle(Future.failedFuture(ar.cause())); }})Copy the code
Therefore, RxJava is a great pleasure to use! Vertx-sync is also a good choice.
Of course, don’t forget that the returned Observable is cold, so data is emitted only when it is subscribed.
Vert.x MySQL/PostgreSQL Client is an asynchronous database operation library written in Scala. But it’s not stable yet, so you can try it yourself.
Let’s look again at the retrieveOne method, which retrieves an event with a specific ID from the data store:
@Override public Observable retrieveOne(Long id) { return client.getConnectionObservable() .flatMap(conn -> conn.queryWithParamsObservable(RETRIEVE_STATEMENT, new JsonArray().add(id))) .map(ResultSet::getRows) .filter(list -> ! list.isEmpty()) .map(res -> res.get(0)) .map(this::wrapCartEvent); }Copy the code
It’s pretty neat, like our future-based paradigm, so I won’t explain it in detail here
Let’s take a look at the streamByUser method, the most important one:
@Override
public Observable streamByUser(String userId) {
JsonArray params = new JsonArray().add(userId).add(userId);
return client.getConnectionObservable()
.flatMap(conn -> conn.queryWithParamsObservable(STREAM_STATEMENT, params))
.map(ResultSet::getRows)
.flatMapIterable(item -> item) // list merge into observable
.map(this::wrapCartEvent);
}
Copy the code
At its core is its SQL statement STREAM_STATEMENT:
SELECT * FROM cart_event c
WHERE c.user_id = ? AND c.created_at > coalesce(
(SELECT created_at FROM cart_event
WHERE user_id = ? AND (`type` = "CHECKOUT" OR `type` = "CLEAR_CART")
ORDER BY cart_event.created_at DESC
LIMIT 1),
0)
ORDER BY c.created_at ASC;
Copy the code
This SQL statement is executed to retrieve all cart events associated with the current cart. Notice that we have many users, and each user may have many cart events that belong to the cart at different times, so how do we get related events? The method is — first we get the time corresponding to the last “finalizing” event, so the current cart related cart events are all the cart events since this finalizing event occurred.
With that in mind, let’s go back to the streamByUser method. Since this method gets a list of events from the database, why does it return Observable instead of Observable>? Let’s take a look at the flatMapIterable operator, which transforms a sequence into a stream of data. So the Observable here is a little different from the Vert.x Future and the CompletableFuture in Java 8. The CompletableFuture is more like a Single in RxJava, which simply sends a value or an error message, whereas an Observable itself is like a data stream that flows from the publisher to the subscriber. The previous use of an Observable returned from the retrieveOne and Save methods was more like a Single, but in the streamByUser method, an Observable is really a stream of event data. We will handle the event flow in the ShoppingCartService ShoppingCartService.
A: wow! In the following sections, we’ll explore shopping cart services and their implementation, based on the Future and very Reactive!
Build the corresponding shopping cart state based on the shopping cart event sequence
Let’s first look at the ShoppingCartService — the ShoppingCartService interface, which is also an Event Bus service:
@VertxGen
@ProxyGen
public interface ShoppingCartService {
/**
* The name of the event bus service.
*/
String SERVICE_NAME = "shopping-cart-eb-service";
/**
* The address on which the service is published.
*/
String SERVICE_ADDRESS = "service.shopping.cart";
@Fluent
ShoppingCartService addCartEvent(CartEvent event, Handler> resultHandler);
@Fluent
ShoppingCartService getShoppingCart(String userId, Handler> resultHandler);
}
Copy the code
Here we define two methods: addCartEvent to store the shopping cart event to the event store; The getShoppingCart method is used to get the state of a user’s current shopping cart.
Let’s take a look at its implementation class, ShoppingCartServiceImpl. The first is the addCartEvent method, which is very simple:
@Override
public ShoppingCartService addCartEvent(CartEvent event, Handler> resultHandler) {
Future future = Future.future();
repository.save(event).toSingle().subscribe(future::complete, future::fail);
future.setHandler(resultHand
return this;
}
Copy the code
As mentioned earlier, the Observable returned by the save method is actually more like a Single, so we convert it toSingle using the toSingle method. We then subscribe(Future ::complete, Future ::fail) to the future so that we can bind it to a Handler of type Handler>.
The logic for the getShoppingCart method is in the aggregateCartEvents method, which is very important and future-based. Let’s look at the code first:
private Future aggregateCartEvents(String userId) { Future future = Future.future(); // aggregate cart events into raw shopping cart repository.streamByUser(userId) // (1) .takeWhile(cartEvent -> ! CartEvent.isTerminal(cartEvent.getCartEventType())) // (2) .reduce(new ShoppingCart(), ShoppingCart::incorporate) // (3) .toSingle() .subscribe(future::complete, future::fail); // (4) return future.compose(cart -> getProductService() // (5) .compose(service -> prepareProduct(service, cart)) // (6) prepare product data .compose(productList -> generateCurrentCartFromStream(cart, productList)) // (7) prepare product items ); }Copy the code
Let’s explain it in detail. StreamByUser (userId); then we use the takeWhile operator to fetch all ADD_ITEM and REMOVE_ITEM events (2). The takeWhile operator stops transmitting new data when the decision condition becomes false, so when the event stream encounters a termination event, the new event is no longer sent out and the previous event will continue to be delivered.
Here’s how to generate the shopping cart state! We “aggregate” the stream of events into shopping cart entities through the Reduce operator (3). This process can be summarized as follows: First we create an empty shopping cart and then “merge” each shopping cart event into the shopping cart entity in turn. The resulting aggregated shopping cart entity should contain a complete amountMap.
The Observable now contains the desired initial shopping cart. We convert it to Single and then subscribe(Future ::complete, Future ::fail) to future (4).
Now we need more information to assemble a complete shopping cart, so we first combine the getProductService asynchronous method to get the goods service from the service discovery layer (5), and then get the required goods data through the prepareProduct method (6). Finally, generateCurrentCartFromStream asynchronous method compose a complete shopping cart entity (7). There are several combinations involved, and we’ll explain them one by one.
Let’s start with the getProductService asynchronous method. It is used to retrieve the commodity service from the service discovery layer and return its asynchronous result:
private Future getProductService() {
Future future = Future.future();
EventBusService.getProxy(discovery,
new JsonObject().put("name", ProductService.SERVICE_NAME),
future.completer());
return future;
}
Copy the code
Now that we have the goods and services, the natural next step is to get the required goods data. This process is implemented using the prepareProduct asynchronous method:
private Future> prepareProduct(ProductService service, ShoppingCart cart) {
List> futures = cart.getAmountMap().keySet() // (1)
.stream()
.map(productId -> {
Future future = Future.future();
service.retrieveProduct(productId, future.completer());
return future; // (2)
})
.collect(Collectors.toList()); // (3)
return Functional.sequenceFuture(futures); // (4)
}
Copy the code
In this implementation, first we get the ids (1) of all the items in the shopping cart from amountMap, then we asynchronously invoke the retrieveProduct method of the goods service based on each ID and wrap it with a Future (2), and then turn this flow into a List of type List> (3). What we want here is an asynchronous result for all goods, the Future>, so how do we convert? Here I wrote an auxiliary function sequenceFuture to achieve such transformation, it is located in io.vertx.blueprint.microservice.com mon. The functional package under the functional class:
public static Future> sequenceFuture(List> futures) { return CompositeFutureImpl.all(futures.toArray(new Future[futures.size()])) // (1) .map(v -> futures.stream() .map(Future::result) // (2) .collect(Collectors.toList()) // (3)); }Copy the code
This method is useful when you want to transform a sequence of futures into a single Future. Here we first call the All method (1) of the CompositeFutureImpl class, which returns a composite Future in a successful state if and only if all the futures in the sequence have completed successfully, failing otherwise. Let’s transform the combined Future: get the result for each Future (because the all method already forces all results), and then boil it down to the list (3).
Come back to the group! Now that we have the List of items we need, it’s time to build the complete shopping cart entity from this information! Let’s take a look at generateCurrentCartFromStream method implementation:
private Future generateCurrentCartFromStream(ShoppingCart rawCart, List productList) { Future future = Future.future(); // check if any of the product is invalid if (productList.stream().anyMatch(e -> e == null)) { // (1) future.fail("Error when retrieve products: empty"); return future; } // construct the product items List currentItems = rawCart.getAmountMap().entrySet() // (2) .stream() .map(item -> new ProductTuple(getProductFromStream(productList, item.getKey()), // (3) item.getValue())) // (4) amount value .filter(item -> item.getAmount() > 0) // (5) amount must be greater than zero .collect(Collectors.toList()); ShoppingCart cart = rawCart.setProductItems(currentItems); // (6) return Future.succeededFuture(cart); / / (7)}Copy the code
Looks like a mess… Don’t worry, let’s take our time. Note that this method is not asynchronous per se, but we need to indicate that the method succeeds or fails (AsyncResult), so this method still returns a Future. First we create a Future, and then check whether the list of items is valid through anyMatch (1). If not, return a failed Future; If it is legal, we build a ProductTuple for each item in turn. In (3), we build ProductTuple with this constructor:
public ProductTuple(Product product, Integer amount) {
this.productId = product.getProductId();
this.sellerId = product.getSellerId();
this.price = product.getPrice();
this.amount = amount;
}
Copy the code
The first parameter is the corresponding commodity entity. To get the corresponding commodity entity from the list, we write a getProductFromStream method:
private Product getProductFromStream(List productList, String productId) {
return productList.stream()
.filter(product -> product.getProductId().equals(productId))
.findFirst()
.get();
}
Copy the code
Once the ProductTuple for each item has been built, we can assign the list to the corresponding shopping cart entity (6) and return the shopping cart entity result (7). Now we finally put together a complete shopping cart!
Settlement – Generates orders from shopping cart
Now that we have selected our favorite products and filled our shopping cart slowly, it’s time to settle up! Here we also define a CheckoutService interface, which contains only one specific method: checkout:
@VertxGen @ProxyGen public interface CheckoutService { /** * The name of the event bus service. */ String SERVICE_NAME = "shopping-checkout-eb-service"; /** * The address on which the service is published. */ String SERVICE_ADDRESS = "service.shopping.cart.checkout"; /** * Order event source address. */ String ORDER_EVENT_ADDRESS = "events.service.shopping.to.order"; /** * Create a shopping checkout service instance */ static CheckoutService createService(Vertx vertx, ServiceDiscovery discovery) { return new CheckoutServiceImpl(vertx, discovery); } void checkout(String userId, Handler> handler); }Copy the code
The interface is very simple, so let’s look at its implementation — the CheckoutServiceImpl class. Even though the interface contains only one checkout method, we all know that the checkout process is not easy… It contains the logic for inventory checking, payment (temporarily omitted here), and order generation. Let’s look at the source code for the checkout method:
@Override public void checkout(String userId, Handler> resultHandler) { if (userId == null) { // (1) resultHandler.handle(Future.failedFuture(new IllegalStateException("Invalid user"))); return; } Future cartFuture = getCurrentCart(userId); // (2) Future orderFuture = cartFuture.compose(cart -> checkAvailableInventory(cart).compose(checkResult -> { // (3) if (checkResult.getBoolean("res")) { // (3) double totalPrice = calculateTotalPrice(cart); Order Order = new Order().setbuyerId (userId).setPayId("TEST") .setProducts(cart.getProductItems()) .setTotalPrice(totalPrice); // Set the order number, Then send the order to the order component and wait for the return retrieveCounter("order") // (6).compose(ID -> sendOrderAwaitResult(Order.setorderID (ID))) // (7) .compose(result -> saveCheckoutEvent(userId).map(v -> result)); // (8)} else { Return Future.succeededFuture(new CheckoutResult().setMessage(checkResult.getString("message"))); // (9)}})); orderFuture.setHandler(resultHandler); / / (10)}Copy the code
Well, we’re seeing a lot of compose again… Yes, here again we combine a lot of future-based asynchronous methods. If the given userId is valid (1), the Future will fail immediately. If the user is valid, we get the current cart state (2) for the given user using the getCurrentCart method. This procedure is asynchronous, so this method returns an asynchronous result of type Future:
private Future getCurrentCart(String userId) { Future future = Future.future(); EventBusService.getProxy(discovery, new JsonObject().put("name", ShoppingCartService.SERVICE_NAME), future.completer()); return future.compose(service -> { Future cartFuture = Future.future(); service.getShoppingCart(userId, cartFuture.completer()); return cartFuture.compose(c -> { if (c == null || c.isEmpty()) return Future.failedFuture(new IllegalStateException("Invalid shopping cart")); else return Future.succeededFuture(c); }); }); }Copy the code
In the getCurrentCart method, we get the shopping cart service from the service discovery layer through the getProxy method of the EventBusService interface; We then call the getShoppingCart method of the shopping cart service to get the shopping cart. Here we also need to check if the cart is empty. If the cart is not empty, it will return asynchronous results. If the cart is empty, it will clearly not be appropriate to settle and return illegal errors.
You may have noticed that the checkout method produces an asynchronous result of type CheckoutResult, which represents the result of the settlement:
@DataObject(generateConverter = true) public class CheckoutResult { private String message; Private Order Order; // If successful, this is the order entity}Copy the code
Let’s go back to our checkout method. Now we will perform a series of operations from the obtained cartFuture to get the settlement result of the Future type. So what are we going to do? First, we combine the checkAvailableInventory asynchronous method, which is used to obtain commodity inventory test data. Later, we will discuss its implementation in detail. Then we check the obtained commodity inventory data to determine whether all inventory is sufficient (3). If not, we simply return a CheckoutResult and mark the inventory to be low (9). If the stock is sufficient, we calculate the total price of the Order (4) and generate the Order Order (5). The Order is represented by an Order data object, which contains the following information:
- The buyer ID
- The quantity, unit price and seller ID of each selected item
- Commodity price
After generating the initial order, we needed to generate the order serial number (6) from the counter service, then send the order data to the order component via the Event Bus, and wait for the CheckoutResult CheckoutResult (7). With that done, we add the shopping cart settlement event (8) to the event store. Finally, we bind the resultHandler handler (10) to the resulting orderFuture. When the checkout result is returned, the handler will be called.
Let’s explain some of the asynchronous processes that have occurred above. The first is the checkAvailableInventory method used to prepare inventory data mentioned earlier:
private Future checkAvailableInventory(ShoppingCart cart) {
Future> allInventories = getInventoryEndpoint().compose(client -> { // (1)
List> futures = cart.getProductItems() // (2)
.stream()
.map(product -> getInventory(product, client)) // (3)
.collect(Collectors.toList());
return Functional.sequenceFuture(futures); // (4)
});
return allInventories.map(inventories -> {
JsonObject result = new JsonObject();
// get the list of products whose inventory is lower than the demand amount
List insufficient = inventories.stream()
.filter(item -> item.getInteger("inventory") - item.getInteger("amount") < 0) // (5)
.collect(Collectors.toList());
// insufficient inventory exists
if (insufficient.size() > 0) {
String insufficientList = insufficient.stream()
.map(item -> item.getString("id"))
.collect(Collectors.joining(", ")); // (6)
result.put("message", String.format("Insufficient inventory available for product %s.", insufficientList))
.put("res", false); // (7)
} else {
result.put("res", true); // (8)
}
return result;
});
}
Copy the code
It’s a little complicated… First we get the REST endpoint (1) corresponding to the goods inventory component from the service discovery layer using the getInventoryEndpoint method. This is a simple wrapper around the getClient method of the HttpEndpoint interface:
private Future getInventoryEndpoint() {
Future future = Future.future();
HttpEndpoint.getClient(discovery,
new JsonObject().put("name", "inventory-rest-api"), // service name
future.completer());
return future;
}
Copy the code
And then we’re going to put together another Future. In this process, we get the list of items from the shopping cart (2) and transform each ProductTuple into the corresponding item ID and corresponding inventory (3). Having obtained the HttpClient corresponding to the REST endpoint of the inventory service, we can now obtain the inventory of each item through the client. The process of getting inventory is implemented in the getInventory method:
private Future getInventory(ProductTuple product, HttpClient client) { Future future = Future.future(); // (A) client.get("/" + product.getProductId(), response -> { // (B) if (response.statusCode() == 200) { // (C) response.bodyHandler(buffer -> { try { int inventory = Integer.valueOf(buffer.toString()); // (D) future.complete(inventory); } catch (NumberFormatException ex) { future.fail(ex); }}); } else { future.fail("not_found:" + product.getProductId()); // (E) } }) .exceptionHandler(future::fail) .end(); return future.map(inv -> new JsonObject() .put("id", product.getProductId()) .put("inventory", inv) .put("amount", product.getAmount())); // (F) }Copy the code
The process is very straightforward. First we create A Future to hold the asynchronous result (A) of the inventory quantity. We then call the client’s get method to send the request for the inventory (B). In the responseHandler logic for the response, if the result status is 200 OK (C), we can use bodyHandler to parse the response body and convert it to type Integer (D). After all of these processes are complete, the corresponding future is assigned to the corresponding inventory quantity; If the result state is abnormal (say 400 or 404), then we can consider the fetch failed and set the Future to the failed state (E).
The inventory quantity alone is not enough (because we don’t know which item the inventory corresponds to), so for convenience, we stuff the inventory quantity with the corresponding item number and the selected quantity in the shopping cart into a JsonObject, and finally transform the Future into a result of type Future (F).
Let’s go back to the checkAvailableInventory method. After (3) process, we have a list of the Future, so we called again Functional. SequenceFuture method to transform it into Future > type (4). Now we can check that each stock is full! We create a list of insufficient stocks of items through the filter operator (5). If the list of under-stocked items is not empty, it means that there are under-stocked items, so we need to take the ID of each under-stocked item and summarize it into a string of information. This is achieved through the collect operator: Collect (Collectors. Joining (“, “)) (6). This trick works well, for example, lists [TST-0001, TST-0002, BK-16623] will be summed up as strings like “TST-0001, TST-0002, BK-16623”. Once we have generated information about an out-of-stock item, we place this information in a JsonObject. At the same time, we use a bool RES in this JsonObject to indicate an abundance of goods, so here we set the res value to false (7).
If the previously acquired list of understocked items is empty, then all items are in sufficient balance, we set the res value to true (8) and return the asynchronous result Future.
Let’s go back to that list of combinations. We then calculate the total price of the items in the cart through the calculateTotalPrice method to provide information for order generation. The process is simple:
Return cart.getproductitems ().stream().map(p -> p.getamount () * p.getprice ())) // join by product id.reduce (0.0d, (a, b) -> a + b);Copy the code
As mentioned earlier in the checkout method, after the original order is created, we perform three combinations of the results: retrieveCounter -> sendOrderAwaitResult -> saveCheckoutEvent. Let’s see.
We start by generating the current order serial number from the cache component’s counter service:
private Future retrieveCounter(String key) { Future future = Future.future(); EventBusService.getProxy(discovery, new JsonObject().put("name", "counter-eb-service"), ar -> { if (ar.succeeded()) { CounterService service = ar.result(); service.addThenRetrieve(key, future.completer()); } else { future.fail(ar.cause()); }}); return future; }Copy the code
Of course, you can use the AUTO INCREMENT counter directly from your database, but when you have multiple database servers, you need to ensure that the counters are consistent across the cluster.
We then store the shopping cart settlement event through the saveCheckoutEvent method, which is implemented very similarly to the getCurrentCart method. They both fetch the shopping cart service from the service discovery layer and invoke the corresponding logic asynchronously:
private Future saveCheckoutEvent(String userId) { Future future = Future.future(); EventBusService.getProxy(discovery, new JsonObject().put("name", ShoppingCartService.SERVICE_NAME), future.completer()); return future.compose(service -> { Future resFuture = Future.future(); CartEvent event = CartEvent.createCheckoutEvent(userId); service.addCartEvent(event, resFuture.completer()); return resFuture; }); }Copy the code
Send the order to the order module
After generating the order sequence number, our order entity is now complete and ready to be sent to the underlying order service component. Let’s look at the implementation — the sendOrderAwaitResult method:
private Future sendOrderAwaitResult(Order order) { Future future = Future.future(); vertx.eventBus().send(CheckoutService.ORDER_EVENT_ADDRESS, order.toJson(), reply -> { if (reply.succeeded()) { future.complete(new CheckoutResult((JsonObject) reply.result().body())); } else { future.fail(reply.cause()); }}); return future; }Copy the code
We send the order entity to a specific address on the Event Bus so that in the Order Service component, the order service can pick up the sent order from the Event Bus and process and distribute it. Notice that the send function we called also takes a Handler>> parameter, which means we need to wait for a reply message to be sent back by the message receiver. This is actually a message pattern similar to the request/reply pattern. If we successfully receive the reply message, we convert it to the order result CheckoutResult and assign values to the Future; If we receive a failed message, or if the acceptance of the message timed out, we mark the Future as failed.
All right! After going through a series of “compose” processes, we finally finished exploring the Checkout method. Do you feel Reactive?
Since the order service does not know the address from which we sent the order, we need to publish a message source to the service discovery layer, where the message source is actually the location from which we sent the order. The order can then be retrieved from the corresponding consumer MessageConsumer through the service discovery layer and accepted from there. We will publish this message source in CartVerticle, but before looking at the implementation of CartVerticle, let’s take a peek at the REST Verticle for the shopping cart service.
Shopping cart service REST API
There are three main apis in REST Verticle related to shopping cart services:
- GET
/cart
– Gets the shopping cart status of the current user - POST
/events
– Adds a new shopping cart event associated with the current user to the shopping cart event store - POST
/checkout
– Issue shopping cart billing request
Note that all three apis require permissions (login users), so their route handlers wrap the requireLogin method. This was mentioned earlier in the API Gateway section:
// api route handler
router.post(API_CHECKOUT).handler(context -> requireLogin(context, this::apiCheckout));
router.post(API_ADD_CART_EVENT).handler(context -> requireLogin(context, this::apiAddCartEvent));
router.get(API_GET_CART).handler(context -> requireLogin(context, this::apiGetCart));
Copy the code
The implementation of their routing functions is quite simple. Here we’ll just look at the apiAddCartEvent method:
private void apiAddCartEvent(RoutingContext context, JsonObject principal) { String userId = Optional.ofNullable(principal.getString("userId")) .orElse(TEST_USER); // (1) CartEvent cartEvent = new CartEvent(context.getBodyAsJson()); // (2) if (validateEvent(cartEvent, userId)) { shoppingCartService.addCartEvent(cartEvent, resultVoidHandler(context, 201)); // (3) } else { context.fail(400); / / (4)}}Copy the code
First we get the user ID from the current user credential Principal. If the ID is not available in the current user credentials, we temporarily replace (1) with TEST_USER. We then create the shopping CartEvent CartEvent (2) from the request body. We also need to verify that the user in the shopping cart event matches the user in the current scope. If so, the addCartEvent method of the service is called to add the event to the event store and return 201* status (3) on success. If the shopping cart event in the Request body is illegal, we return 400 Bad Request** status (4).
Cart Verticle
CartVerticle is the Main Verticle of the shopping cart service component, used to publish various services. Here we will release three services:
shopping-checkout-eb-service
: Settlement service, this is aThe Event Bus service.shopping-cart-eb-service
: Shopping cart service, this is oneThe Event Bus service.shopping-order-message-source
: the message source that sends the order, this is oneMessage source service.
Our CartVerticle is also responsible for deploying the RestShoppingAPIVerticle. Don’t forget to set api.name:
{
"api.name": "cart"
}
Copy the code
Here is the UI for the shopping cart section:
Order service
All right! Now that we have submitted the settlement request, the underlying order has been sent to the order microservice component. So the next natural step is the responsibility of the order service — to distribute and process the order. In the current version of the Micro Shop implementation, we simply store the order into the database and change the corresponding inventory amount of the item. In a normal production environment, we would normally push the order to the message queue and pull the order from the message queue and process it in the underlying service.
The implementation of the order store service is too similar to the previous one, so I won’t go into the details of OrderService and its implementation here. You can look at the code for yourself.
Our order processing logic is written in the RawOrderDispatcher verticle. Let’s look at it below.
Consume the data sent by the message source
First we need to get the message consumer from the message source based on the service name, and then get the sent order from the consumer. This can be done through the getConsumer method of the MessageSource interface:
@Override public void start(Future future) throws Exception { super.start(); MessageSource.getConsumer(discovery, new JsonObject().put("name", "shopping-order-message-source"), ar -> { if (ar.succeeded()) { MessageConsumer orderConsumer = ar.result(); orderConsumer.handler(message -> { Order wrappedOrder = wrapRawOrder(message.body()); dispatchOrder(wrappedOrder, message); }); future.complete(); } else { future.fail(ar.cause()); }}); }Copy the code
After obtaining the corresponding MessageConsumer, we can bind it with a handler function of type Handler> through the handler method. In this handler function, we can perform various operations on the obtained message. Here our Message Body is of type JsonObject, so we first convert it to an order entity, which can then be distributed and processed. The corresponding logic is in the dispatchOrder method.
“Process” orders
Let’s look at the simple “dispatch processing” logic in the dispatchOrder method:
private void dispatchOrder(Order order, Message sender) { Future orderCreateFuture = Future.future(); orderService.createOrder(order, orderCreateFuture.completer()); // (1) orderCreateFuture .compose(orderCreated -> applyInventoryChanges(order)) // (2) .setHandler(ar -> { if (ar.succeeded()) { CheckoutResult result = new CheckoutResult("checkout_success", order); // (3) sender.reply(result.toJson()); // (4) publishLogEvent("checkout", result.toJson(), true); // (5) } else { sender.fail(5000, ar.cause().getMessage()); // (6) ar.cause().printStackTrace(); }}); }Copy the code
Let’s start by creating a Future that represents the asynchronous result of adding an order to the database. We then call the createOrder method of the order service to store the order in the database (1). You can see we give the method represents the processing function of orderCreateFuture.com pleter (), so that when added at the end of the operation, the corresponding Future can be assigned. Next we combine an asynchronous method, the applyInventoryChanges method, to change the inventory quantity (2). If both processes are successful, we create a CheckoutResult entity (3) that represents settlement success and call the Reply method to reply to the message sender with the settlement result (4). We then send a settlement Event to the Event Bus to inform the logging component to log (5). If any of the procedures fail, we need to call the fail method on the message sender to notify that the operation failed (6).
Easy, right? Let’s look at an implementation of the applyInventoryChanges method to see how to change the inventory of goods:
private Future applyInventoryChanges(Order order) { Future future = Future.future(); // Get the REST endpoint Future from the service discovery layer clientFuture = future.future (); Httpendpoint.getclient (Discovery, new JsonObject().put("name", "inventory-rest-API "), // Service name clientFuture.completer()); Return clientFuture.compose(client -> {List futures = order.getProducts().stream().map(item) Future resultFuture = future.future (); String url = String.format("/%s/decrease? n=%d", item.getProductId(), item.getAmount()); client.put(url, response -> { if (response.statusCode() == 200) { resultFuture.complete(); } else { resultFuture.fail(response.statusMessage()); } }) .exceptionHandler(resultFuture::fail) .end(); return resultFuture; }) .collect(Collectors.toList()); // Every Future must be a success, SetHandler (ar -> {if (ar.Succeeded ()) {future.complete(); } else { future.fail(ar.cause()); }}); return future; }); }Copy the code
You will be familiar with the implementation of this method because it is very similar to the getInventory method we discussed earlier in the shopping cart service. We first get the HTTP client corresponding to the inventory component, and then for each item in the order, we call the REST API to reduce the corresponding inventory based on its amount. Calling the REST API to get the result is asynchronous, so here we get a List again. But here we don’t need the actual results of every Future. We only need the state of each Future, so here we simply call the CompositeFuture.all method to get the compositeFuture.all compositeFuture.all.
As for the OrderVerticle in the component, it does only three tiny things: publish the order service, deploy the RawOrderDispatcher for order distribution processing, and deploy the REST Verticle.
Micro Shop SPA integration
In our Micro Shop project, we provided a simple SPA front end page written in Angular.js. So the question is, how do we integrate that into our microservices?
Note: In the current version, we have integrated the SPA section into the API-Gateway module for convenience. The UI portion is typically deployed separately in a production environment.
With the magic of vert. x Web, all we need to do is configure the routing to handle static resources! All it takes is one line:
router.route("/*").handler(StaticHandler.create());
Copy the code
The default static resource mapping directory is the Webroot directory, of course you can also configure the mapping directory when creating StaticHandler.
Monitor dashboards and statistics
The Monitor Dashboard is also a SPA front-end application. In this chapter we will cover the following:
- How to configure the Sockjs-EventBus Bridge
- How do I receive information from the Event Bus in the browser
- How to use vert. x Dropwizard Metrics to obtain vert. x component statistics
SockJS – Event Bus Bridge
Many times we want to receive messages from the Event Bus in the browser and process them. Sounds amazing, and as you can imagine, vert. x supports this! Vert.x provides the Sockjs-Event Bus Bridge to enable communication between the service and the client (usually the browser) via the Event Bus.
To enable support for the Sockjs-Event Bus Bridge, we need to configure the SockJSHandler and the corresponding router:
// event bus bridge SockJSHandler sockJSHandler = SockJSHandler.create(vertx); // (1) BridgeOptions options = new BridgeOptions() .addOutboundPermitted(new PermittedOptions().setAddress("microservice.monitor.metrics")) // (2) .addOutboundPermitted(new PermittedOptions().setAddress("events.log")); sockJSHandler.bridge(options); // (3) router.route("/eventbus/*").handler(sockJSHandler); / / (4)Copy the code
First we create a SockJSHandler (1) that handles the Event Bus information. By default, vert. x does not allow any messages to be transmitted to the browser side via the Event Bus for security purposes, so we need to configure it. We can create a BridgeOptions and set the address to allow one-way transmission of messages. There are two types of addresses: Outbound and Inbound. The Outbound address allows the server to send messages to the browser through the Event Bus, while the Inbound address allows the browser to send messages to the server through the Event Bus. Here we only need two Outbound Address: microservice. Monitor. The metrics used to transport statistics, the events. The log is used as transmission log messages (2). We can then set the configured BridgeOptions to Bridge (3) and then configure the corresponding route. The browser-side SockJS client uses the/eventBus /* routing path to communicate.
Send statistics to the Event Bus
Monitoring is also an important part of the microservices architecture. With vert. x’s various Metrics components, such as vert. x Dropwizard Metrics or vert. x Hawkular Metrics, we can get statistics from the corresponding components.
Here we use vert. x Dropwizard Metrics. To use MetricsService, start by creating an instance of MetricsService:
MetricsService service = MetricsService.create(vertx);
Copy the code
We can then call the getMetricsSnapshot method to get statistics for the various components. This method accepts a class that implements the Measured interface. The Measured interface defines a specification for capturing Metrics Data, and the major vert. x classes, such as Vertx and EventBus, implement it. Therefore, different Measured implementations can be passed in to obtain different data. Here we pass in an instance of Vertx to get more statistics. The statistics obtained are in the format of JsonObject:
// send metrics message to the event bus
vertx.setPeriodic(metricsInterval, t -> {
JsonObject metrics = service.getMetricsSnapshot(vertx);
vertx.eventBus().publish("microservice.monitor.metrics", metrics);
});
Copy the code
We set a timer, every once in a while to microservice. The monitor. The metrics address to send the current statistical data.
To see what statistics include, see the vert. x Dropwizard Metrics official documentation.
Now it’s time to receive and display statistics and log messages on the browser side
Receives messages on the Event Bus on the browser side
To receive messages on the Event Bus on the browser side, we first need two libraries: VerTX3-EventBus-Client and SockJS. You can download both libraries via NPM or Bower. We can then create an EventBus instance in our code and register the handler:
var eventbus = new EventBus('/eventbus');
eventbus.onopen = () => {
eventbus.registerHandler('microservice.monitor.metrics', (err, message) => {
$scope.metrics = message.body;
$scope.$apply();
});
}
Copy the code
We can retrieve the corresponding message data via message.body.
We will then run the dashboard to monitor the status of the entire microservice application.
Show time!
Ha ha, now we have seen a complete Micro Shop Micro service source ~ look at the source code to see some tired, now to show time! Here we use Docker Compose to orchestrate the container and run our microservice application, which is very convenient.
Note: It is recommended to reserve 4GB of memory to run this microservice application.
Build projects and containers
Before we could build the entire project, we needed to get the dependencies of the front-end code in the apI-Gateway and Monitor-Dashboard components through Bower. Their bower. Json files in the corresponding SRC/main/resources/webroot. We enter each directory and execute:
bower install
Copy the code
Then we can build the entire project:
mvn clean install
Copy the code
After building the project, let’s build the container (root permission required) :
cd docker
sudo ./build.sh
Copy the code
With the build complete, we are ready to run our microservice application:
sudo ./run.sh
Copy the code
When the entire microservice initialization is complete, we can browse the online shop page in the browser, the default address is https://localhost:8787.
First run?
If we are running this microservice application for the first time (or have previously deleted all containers), we must manually configure the Keycloak server. First we need to add a record to the hosts file:
0.0.0.0 keycloak - serverCopy the code
Then we need to visit http://keycloak-server:8080 and go to the administrator login page. By default, both the user name and password are admin. Once in the admin console, we need to create a Realm with an arbitrary name (vert.x is given in our example). Then enter this Realm and create a Client for our application like this:
Once created, we go to the Installation TAB to copy the corresponding JSON configuration file. We need to copy the contents of the overlay the API – gateway/SRC/config/docker. The corresponding configuration in json. Such as:
{ "api.gateway.http.port": 8787, "api.gateway.http.address": "localhost", "circuit-breaker": { "name": "Api-gateway-cb ", "timeout": 10000, "max-failures": 5}, "realm-public-key": "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAkto9ZZm69cmdA9e7X4NUSo8T4CyvrYzlRiJdhr+LMqELdfN3ghEY0EBpaROiOueva//iUc/KViY GiAHVXEQ3nr3kytF6uZs9iwqkshKvltpxkOm2Qpj/FSRsCyHlB8Ahbt5xBmzH2mI1VDIxmVTdEBze4u6tLoi4ieo72b2q/dz09yrEokRm/sSYqzNgfE0i1JY 6DI8C7FaKszKTK5DRGMIAib8wURrTyf8au0iiisKEXOHKEjo/g0uHCFGSOKqPOprNNIWYwedV+qaQa9oSah2IpwNgFNRLtHpvbcanftMLQOQIR0iufIJ+bHr NhH0RISZhTzcGX3pSIBw/HaERwQIDAQAB auth - server - ", "url" : "http://127.0.0.1:8180/auth", "SSL - required" : "external", "resource": "vertx-blueprint", "credentials": { "secret": "ea99a8e6-f503-4bdb-afbd-9ae322ee7089" }, "use-resource-role-mappings": true }Copy the code
We also need to create a few users to log in through later.
For more details on how to configure and explain Keycloak, see Paulo’s Vertx 3 and Keycloak Tutorial, which is very detailed.
After modifying the corresponding configuration file, we must rebuild the container for the API-Gateway module and then restart the container.
Happy shopping time!
With the configuration complete, let’s go to the front end page!
Now we can visit https://localhost:8787/login to log in, it will jump to Keycloak user login page. If you log in successfully, it automatically redirects to the Micro Shop home page. Now we can enjoy our shopping time to the fullest! This is fantastic!
We can also access the Monitor Dashboard at http://localhost:9100 by default.
A racing boat!
The end!
Not bad, not bad! We’ve finally reached the end of our microservice journey! A: congratulations! We really hope you enjoy this blueprint tutorial and learn about vert. x and microservices 🙂
Here are some recommended readings on microservices and distributed systems:
- Microservices – a definition of this new architectural term
- Event Sourcing
- Cloud Design Patterns: Prescriptive Architecture Guidance for Cloud Applications
Enjoy the carnival of micro services!