Github provides a review guide for students who need to study Java. GitHub address: github.com/abel-max/Ja…


preface

In today’s software development, Cloud Native is gradually accepted by more and more companies with the increasingly perfect functions of enterprises. This change is not only the change of technology selection, but also the change of development, operation and maintenance, and project management concept. For example, when we developed enterprise software before, we would write the front and back ends in a large application, and the team organization was divided into development, operation and maintenance, test team and architect, and managed the whole project delivery in Waterfall way. But in Cloud Native culture, these have been replaced by a new set of methods and technology stacks (or a new culture), such as: DevOps, Continuous delivery (CI/CD), MicroServices, full stack development, Agile development, Domain Driven Development, Test Driven Development, Event Sourcing, and more. Combined with the embedding of ARTIFICIAL intelligence in the business, our application has generated greater value in the business.

Micronaut, introduced today, is a microservices, back-end technology. I am optimistic that it will replace Springboot as the most popular server framework in the JVM language.

This article will introduce Mircornaut from the basics and integrate Kafka Producer, Kafka Streams and GraphQL to write a simple back-end Web application. To show how to use them to complete the Event Sourcing system data history storage and update the current View function. Readers can use this small template as a base from which to expand into a Micronaut MicroService backend in their own Event Sourcing system.

1 Why use Micronaut?

I am often asked why my articles are all about NLP, Kafka, GraphQL, MicroService. My reason is simple. The combination of these technologies is the best technology combination for developing enterprise-level microservice software in my opinion, which is powerful in the following aspects:

  • GraphQL is different from REST services in that it makes the front end more flexible in accessing the back end interface
  • Architecturally, the Monolithic Application was decomposed into small microservices based on its function and domain, which reduced the complexity of design architecture and business logic, and reduced the coupling between services
  • The design of the system based on Event Sourcing takes into account the theme of Data Driven Business, which will provide continuous power for our machine learning in the ERA of AI
  • The AI model can eventually be deployed as a microservice to support online business
  • Microservices applications greatly increase the speed of software testing and deployment cycles, making tools and agile management seamless
  • With Cloud Native’s technology stack, we can achieve highly automated development, operation and maintenance of the whole process through CI/CD
  • Kubernetes works with Kafka and distributed databases to expand the background level and make the system more robust and stable
  • Painless migration between enterprise cloud and public cloud
  • Micronaut offers a lot of cloud capabilities out of the box such as AWS Serverless
  • The simple summary is: data-driven + small, fast, clever, stable

Unfortunately, enterprise-class frameworks led by Java are usually counter-examples of small quick spirit in practice. For example, Springboot should be said to be the most popular framework in the Java ecosystem in recent years, and Spring Cloud makes further extensions to Cloud services on this basis. Although the complexity is greatly reduced compared with the previous Spring, the bottom layer of Spring Boot still more or less continues the weakness of the Spring framework is too redundant. Even if it is made into micro services, due to the existence of its internal reflection mechanism, indicators such as build time and memory occupation and Go, Node.js is much more bulky than it is.

Unlike Springboot, which also has Tomcat built in, Micronaut not only selectively provides the tools needed to build microservice applications, but also further optimizes for startup speed and memory overhead. And in the design of the continuation of Spring dependency injection sequence and other fine traditions, but also removed a lot of Spring redundant modules, memory consumption reflection mechanism, so that the application development, testing, deployment, operation and maintenance become more efficient and concise.

This article will design a complete Event Sourcing based microservice project from scratch from the perspective of a Springboot programmer.

2. Configure the CLI tool

Micronaut provides a powerful command line tool to help us set up the project. I personally use MacOS, but other Unix-like systems should operate similarly.

Start by downloading and installing the SDKMAN

$ curl -s https://get.sdkman.io | bash

$ source "$HOME/.sdkman/bin/sdkman-init.sh"
Copy the code

Check whether the configuration is successful:

$ sdk
==== BROADCAST =================================================================
* 2020-04-16: Jbang 0.22.0.2 released on SDKMAN! See https://github.com/maxandersen/jbang/releases/tag/v0.22.0.2 #jbang
* 2020-04-15: Gradle 6.4-rc-1 released on SDKMAN! #gradle
* 2020-04-15: Kotlin 1.3.72 released on SDKMAN! #kotlin
================================================================================

Usage: sdk <command> [candidate] [version]
       sdk offline <enable|disable>

   commands:
       install   or i    <candidate> [version] [local-path]
       uninstall or rm   <candidate> <version>
       list      or ls   [candidate]
       use       or u    <candidate> <version>
       default   or d    <candidate> [version]
       current   or c    [candidate]
       upgrade   or ug   [candidate]
       version   or v
       broadcast or b
       help      or h
       offline           [enable|disable]
       selfupdate        [force]
       update
       flush             <broadcast|archives|temp>

   candidate  :  the SDK to install: groovy, scala, grails, gradle, kotlin, etc.
                 use list command for comprehensive list of candidates
                 eg: $ sdk list
   version    :  where optional, defaults to latest stable if not provided
                 eg: $ sdk install groovy
   local-path :  optional path to an existing local$SDK install Groovy 2.4.13-local /opt/groovy-2.4.13Copy the code

Install Micronaut CLI

$SDK Install Micronaut Downloading: Micronaut 1.3.4 In Progress...# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # % 100, 0 # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # % 100, 0

Installing: micronaut 1.3.4
Done installing!

Setting micronaut 1.3.4 as default.
Copy the code

3 Create a new project

The following uses Micronaut command line tool to create an integrated project for Kotlin, Kafka Client, Kafka Streams and GraphQL.

$ mn create-app micronaut-kafka-graphql -f kafka-streams,graphql,kafka -l kotlin
Copy the code

-f: indicates the dependency to be added. -l: language Settings More details about create-app options can be viewed by typing mn and using the help API on the interactive command line:

mn> help create-app
Usage: mn create-app [-hinvVx] [-b=BUILD-TOOL] [-l=LANG] [-p=PROFILE] [-f=FEATURE[,FEATURE...] ]... [NAME] Creates an application [NAME] The name of the application to create. -b, --build=BUILD-TOOL Which build tool to configure. Possible values: gradle, maven.-f, --features=FEATURE[,FEATURE...]  The features to use. Possible values: annotation-api, application, asciidoctor, aws-api-gateway, aws-api-gateway-graal, cassandra, config-consul, data-hibernate-jpa, data-jdbc, discovery-consul, discovery-eureka, ehcache, elasticsearch, file-watch, flyway, graal-native-image, graphql, hazelcast, hibernate-gorm, hibernate-jpa, http-client, http-server, jdbc-dbcp, jdbc-hikari, jdbc-tomcat, jib, jrebel, junit, kafka, kafka-streams, kotlintest, kubernetes, liquibase,log4j2, logback, management, micrometer,
                             micrometer-appoptics, micrometer-atlas, micrometer-azure-monitor,
                             micrometer-cloudwatch, micrometer-datadog, micrometer-dynatrace,
                             micrometer-elastic, micrometer-ganglia, micrometer-graphite,
                             micrometer-humio, micrometer-influx, micrometer-jmx,
                             micrometer-kairos, micrometer-new-relic, micrometer-prometheus,
                             micrometer-signalfx, micrometer-stackdriver, micrometer-statsd,
                             micrometer-wavefront, mongo-gorm, mongo-reactive, neo4j-bolt,
                             neo4j-gorm, netflix-archaius, netflix-hystrix, netflix-ribbon,
                             picocli, postgres-reactive, rabbitmq, redis-lettuce, security-jwt,
                             security-session, spek, spock, springloaded, swagger-groovy,
                             swagger-java, swagger-kotlin, tracing-jaeger, tracing-zipkin,
                             vertx-mysql-client, vertx-pg-client
  -h, --help               Show this help message and exit.
  -i, --inplace            Create a service using the current directory
  -l, --lang=LANG          Which language to use. Possible values: java, groovy, kotlin.
  -n, --plain-output       Use plain text instead of ANSI colors and styles.
  -p, --profile=PROFILE    The profile to use. Possible values: base, cli, configuration,
                             federation, function.function-aws, function-aws-alexa, grpc, kafka,
                             profile, rabbitmq, service.
  -v, --verbose            Create verbose output.
  -V, --version            Print version information and exit.
  -x, --stacktrace         Show full stack trace when exceptions occur.
Copy the code

Open the project in Intellij after it is successfully created:

As shown in Figure 1, the command line tool successfully generated the project and Gradle configuration for us, and provided a Dockerfile template to deploy the application.

4. Implement a simple Event Sourcing system

In the Event Sourcing system designed by the project, Kafka is the most core component, it has good high throughput, high fault tolerance, distributed horizontal expansion ability, and on this basis to ensure zero data loss. In terms of system design Kafka can be regarded as a central Event Bus through which all asynchronous operations of business logic and communication between microservices are transmitted.

In order to facilitate the functional demonstration, I will focus on the realization of the most common scenarios in the business in this project:

Implement GraphQL Mutation interface for the front-end service, This interface is responsible for receiving writes to the back-end implementation Kafka Producer, which is used to write data from the GraphQL interface to Kafka Broker implementation Kafka Streams, which takes data already written to Kafka further The GraphQL Query accesses the View to obtain the latest data and returns the results to the front-end hypothesis. Our demand is to collect information about market changes: on the one hand, we want to store the historical changes of all the past markets in the background for data analysis and statistics; On the other hand, we need to return only the latest market information to the front-end query.

We simply define a Kotlin data class, equivalent to Java’s Pojo class, to represent the Market data model (create a model directory under the project and create a Market data class) :

data class Market (
    var marketId: String,
    val currentStatus: String,
    val country: String,
    val zipcode: String,
    val timestamp: Long = System.currentTimeMillis()
)

Copy the code

4.1 GraphQL Mutation add data interface definition

In order for the front-end framework to communicate with our backend, we first need an interface that handles front-end write requests. For those of you who are experienced in big data warehouse, we will not do row-level Update operations in massive data. All written data is written to Kafka as an Append. Kafka is essentially a distributed journaling file system. Therefore, these operations of adding, deleting, searching and modifying in the Event Sourcing system can be summarized as POST requests on the GraphQL or HTTP level. In GraphQL, we defined all interfaces except lookup as Mutation.

First we define a Request input for the front end of the class mapping in the Model folder named:

MarketInput: data class MarketInput (val marketId: String, val currentStatus: String, val country: String, val zipcode: String )Copy the code

Then configure the Micronaut GraphQL interface. Create the schema.graphqls file under the project Resources folder and define the interface:

type Market {
    marketId: ID!
    currentStatus: String!
    country: String!
    zipcode: String!
    timestamp: Long!
}

input MarketInput {
    marketId: ID!
    currentStatus: String!
    country: String!
    zipcode: String!
}

type Mutation {
    createMarket(marketInput: MarketInput): Market
}

type Query {
    allMarkets: [Market]
}

schema {
    mutation: Mutation
    query: Query
}
Copy the code

The two interfaces are as follows:

The createMarket method receives a MarketInput object from the front end and generates a Market object based on this input. The rest of it is standard GraphQL configuration syntax, which I won’t explain in detail here.

4.2 Implementing Kafka Producer

Before we configure the GraphQL interface, we need to implement a Service to write data into Kafka. Create a service folder under the project and create class CreateMarketService as follows:

package micronaut.kafka.graphql.service

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import micronaut.kafka.graphql.model.Market

@KafkaClient
interface CreateMarketService {
    @Topic("markt-event-store")
    fun createMarket(@KafkaKey id: String, market: Market)
}
Copy the code

Micronaut’s interface decorated with @kafkaclient automatically generates the corresponding Producer implementation when run, and @Topic defines the Kafka Topic that receives the Producer data. If you use the @kafkakey modifier in a method parameter, that parameter is treated as the Topic Key. Micronaut defaults to using JSON to serialize Pojo objects without special configuration.

4.3 Configuring the GraphQL Mutation Interface

Now that we have the interface definition and Service implementation, all we need to do is connect the two parts.

Create a new graphQL directory under your project directory and add two classes to this directory:

The factory class GraphQLFactory is used to register all queries and mutations required by the GraphQL interface:

package micronaut.kafka.graphql.graphql

import graphql.GraphQL
import graphql.schema.idl.RuntimeWiring
import graphql.schema.idl.SchemaGenerator
import graphql.schema.idl.SchemaParser
import graphql.schema.idl.TypeDefinitionRegistry
import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
import io.micronaut.core.io.ResourceResolver
import java.io.BufferedReader
import java.io.InputStreamReader
import javax.inject.Singleton


@SuppressWarnings("Duplicates")
@Factory
class GraphQLFactory {

    @Bean
    @Singleton
    fun graphQL(resourceResolver: ResourceResolver,
                createMarketDataFetcher: CreateMarketDataFetcher): GraphQL {

        val schemaParser = SchemaParser()
        val schemaGenerator = SchemaGenerator()
        val typeRegistry = TypeDefinitionRegistry()

        typeRegistry.merge(schemaParser.parse(BufferedReader(InputStreamReader(
                resourceResolver.getResourceAsStream("classpath:schema.graphqls").get()))))

        val runtimeWiring = RuntimeWiring.newRuntimeWiring()
                .type("Mutation") { typeWiring -> typeWiring
                        .dataFetcher("createMarket", createMarketDataFetcher)
                }
                .build()

        val graphQLSchema = schemaGenerator.makeExecutableSchema(typeRegistry, runtimeWiring)

        return GraphQL.newGraphQL(graphQLSchema).build()
    }
}
Copy the code

As shown in the code, we provided the factory class, provided the Mutation interface, and registered the GraphQL method createMarket.

The class needs to be decorated with a Factory, and the graphQl methods decorated with Singleton and beans, ensuring that only one graphQl Factory is registered across the entire application.

CreateMarketDataFetcher implementation class CreateMarketDataFetcher under graphql:

package micronaut.kafka.graphql.graphql

import com.fasterxml.jackson.databind.ObjectMapper
import graphql.schema.DataFetcher
import graphql.schema.DataFetchingEnvironment
import micronaut.kafka.graphql.model.Market
import micronaut.kafka.graphql.model.MarketInput
import micronaut.kafka.graphql.service.CreateMarketService
import micronaut.kafka.graphql.service.CurrentMarketStore
import javax.inject.Singleton

@Singleton
@SuppressWarnings("Duplicates")
class CreateMarketDataFetcher(private val createMarketService: CreateMarketService,
                              private val objectMapper: ObjectMapper) : DataFetcher<Market> {

    override fun get(env: DataFetchingEnvironment): Market {
        val marketInput =
                objectMapper.convertValue(env.getArgument("marketInput"), MarketInput::class.java)

        val market = Market(
            marketId = marketInput.marketId,
            currentStatus = marketInput.currentStatus,
            country = marketInput.country,
            zipcode = marketInput.zipcode
        )

        createMarketService.createMarket(id = market.marketId, market = market)
        return market
    }
}
Copy the code

This class must inherit from the DataFetcher and also be decorated by Singleton, and automatically assemble CreateMarketService and ObjectMapper (for sequences, deserializing JSON data) that we defined earlier.

In order for this class to register the GraphQL interface through the factory, we need to implement the GET method ourselves. Since the object generated by this class provides an implementation for createMarket(inputMarket: inputMarket) of Mutation, we need to get the inputMarket parameter first. The DataFetchingEnvironment provides a convenient way to get this parameter passed in through the getArgument method. If it’s a primitive datatype argument, it can be parsed directly, but if it’s a datatype object like ours, getArgument will give you a JSON string. ObjectMapper is a class auto-loaded object that can be used to parse it. See code for syntax.

Once we have an InputMarket object, we can use this input to create a Market object and write data into a Kafka Topic using createMarketService. And returns the market object to the front end as Response.

4.4 Configuring Kafka Streams

Kafka is one of the most powerful middleware out there today, in large part because it provides rich streaming apis like Kafka Streams that make it easy to transform real-time data, Join data, windowing data, You can also use RocksDB, which comes with Kafka Streams, to provide a View to the front end query via Statestore.

Create MarketStream class under service folder, implement as follows:

package micronaut.kafka.graphql.service import com.fasterxml.jackson.databind.ObjectMapper import io.micronaut.configuration.kafka.serde.JsonSerde import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder  import io.micronaut.context.annotation.Factory import micronaut.kafka.graphql.model.Market import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.kstream.* import org.apache.kafka.streams.state.Stores import javax.inject.Named import javax.inject.Singleton const val MARKET_EVENT_TOPIC ="market-event-store"
const val CURRENT_MARKET_STORE = "current-market-store"
const val MARKET_APP_ID = "market-stream"

@Factory
class MarketStream {

    @Singleton
    @Named(MARKET_APP_ID)
    fun buildMarketStream(builder: ConfiguredStreamBuilder, objectMapper: ObjectMapper): KStream<String, Market>? {
        val marketStore = Stores.inMemoryKeyValueStore(CURRENT_MARKET_STORE)

        builder.configuration[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.EXACTLY_ONCE
        builder.configuration[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
        builder.configuration[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = true

        val stream = builder.stream(MARKET_EVENT_TOPIC,
                Consumed.with(Serdes.String(), JsonSerde(objectMapper, Market::class.java)))

        stream.groupBy(
            { _, value -> value.country },
            Grouped.with(
                Serdes.String(),
                JsonSerde(objectMapper, Market::class.java)))
            .reduce(
                {value1, value2 ->
                    if(value1.timestamp <= value2.timestamp) {
                        return@reduce value2
                    } else {
                        return@reduce value1
                    }
                },
                Materialized
                    .`as`<String, Market>(marketStore)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(JsonSerde(objectMapper, Market::class.java))
            )
            .toStream()
            .print(Printed.toSysOut())

        return stream
    }
}
Copy the code

The implementation of Micronaut Kafka Streams is exactly the same as the native Streams API, with a little embellishment with Factory (which returns the KStream object).

The purpose of this code is to determine which data is the latest by comparing the size of timestamp after I get the same marketId object, and store the latest data in Statestore (for code convenience, I only write and update data in store, Does not include deleting specified data.

After the aggregation operation, the aggregation operation result can be written into Statestore through Materialized.

4.5 Statestore access

Streams only writes to Statestore, and we need an interface to read data from it. So go ahead and create a new class CurrentMarketStore under service.

package micronaut.kafka.graphql.service

import io.micronaut.configuration.kafka.streams.InteractiveQueryService
import micronaut.kafka.graphql.model.Market
import org.apache.kafka.streams.state.QueryableStoreTypes
import javax.inject.Singleton

@Singleton
class CurrentMarketStore(private val interactiveQueryService: InteractiveQueryService) {

    fun getAllMarkets(): List<Market> {
        val marketStore = interactiveQueryService
                .getQueryableStore(CURRENT_MARKET_STORE, QueryableStoreTypes.keyValueStore<String, Market>())
        return marketStore
                .map { kvStore -> kvStore.all().asSequence().map { v -> v.value }.toList() }
                .orElse( emptyList<Market>())
    }
}
Copy the code

The methods in this class return all the data in Statestore as a List, but the logic is too simple to cover. See the API description for details.

4.6 Create GraphQL Query to read data

Add a new Query method to the schema.graphqls file:

typeQuery {allMarkets: [Market]} In MarketDataFetchers under GraphQL, add a new class: @singleton @SuppressWarnings("Duplicates")
class AllMarketDataFetcher(private val currentMarketStore: CurrentMarketStore) : DataFetcher<List<Market>> {
    override fun get(env: DataFetchingEnvironment): List<Market> {
        return currentMarketStore.getAllMarkets()
    }
}
Copy the code

This Mutation is similar to the previous Mutation, except that no arguments are passed and CurrentMarketStore is automatically installed, retrieving all Market objects in Statestore by calling getAllMarkets().

Finally, the new DataFetcher object is registered in the GraphQL Factory.

The class ends up like this:

package micronaut.kafka.graphql.graphql

import graphql.GraphQL
import graphql.schema.idl.RuntimeWiring
import graphql.schema.idl.SchemaGenerator
import graphql.schema.idl.SchemaParser
import graphql.schema.idl.TypeDefinitionRegistry
import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
import io.micronaut.core.io.ResourceResolver
import java.io.BufferedReader
import java.io.InputStreamReader
import javax.inject.Singleton


@SuppressWarnings("Duplicates")
@Factory
class GraphQLFactory {

    @Bean
    @Singleton
    fun graphQL(resourceResolver: ResourceResolver,
                createMarketDataFetcher: CreateMarketDataFetcher,
                allMarketDataFetcher: AllMarketDataFetcher ): GraphQL {

        val schemaParser = SchemaParser()
        val schemaGenerator = SchemaGenerator()
        val typeRegistry = TypeDefinitionRegistry()

        typeRegistry.merge(schemaParser.parse(BufferedReader(InputStreamReader(
                resourceResolver.getResourceAsStream("classpath:schema.graphqls").get()))))

        val runtimeWiring = RuntimeWiring.newRuntimeWiring()
                .type("Mutation") { typeWiring -> typeWiring
                        .dataFetcher("createMarket", createMarketDataFetcher)
                }
                .type("Query") { typeWiring -> typeWiring
                        .dataFetcher("allMarkets", allMarketDataFetcher)
                }
                .build()

        val graphQLSchema = schemaGenerator.makeExecutableSchema(typeRegistry, runtimeWiring)

        return GraphQL.newGraphQL(graphQLSchema).build()
    }
}
Copy the code

As shown in the code, the new Query method allMarkets is now registered on the GraphQL object as well, so the code part of our program is complete.

5 Complete process display

In order to see the effect, we will go through the Event Sourcing process from data input, Kafka storage, data status update, to read the full operation again. To run the code locally, we need to install a local Kafka Cluster in addition to our written program. The easiest way to install Kafka is to pull a Docker image from Confluent. If you are not used to using Docker, you can also configure a manual, see the official documentation.

Assuming you have Confluent configured locally, start Zookeeper and Kafka Broker (excluding Confluent’s other services) first:

$ confluent start kafka
Copy the code

After Zookeeper and Kafka are started, we need to manually create a “market-event-store” Topic to store historical data. (This can also be done using Kafka AdminClient. I personally don’t recommend this in production), since we only have one node, set replication-factor to 1:

$ kafka-topics --create --zookeeper localhost:2181 --topic market-event-store --partitions 10 --replication-factor 1 Yml: GraphQL: graphiQL: enabled:true
Copy the code

Then start the daemon with Gradle Run (you’ll be surprised how quickly it starts!). .

Good background to start after can interact through http://localhost:8080/graphiql GraphQL written statement and the background. Start with mutation and create a new Market object:

mutation {
  createMarket(marketInput: {
    marketId: "id-1", currentStatus: "closed", country: "china", zipcode:"130000"
  }) {
    marketId
    currentStatus
    country
    zipcode
    timestamp
  }
}
Copy the code

Returns on success:

Then enter a new Market, this time set marketId to ID-2, and get:

Finally, to check if all data changes are successfully captured in Kafka, type:

$kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic market-event-store"marketId":"id-1"."currentStatus":"closed"."country":"china"."zipcode":"130000"."timestamp": 1587549158784} {"marketId":"id-2"."currentStatus":"closed"."country":"germany"."zipcode":"81477"."timestamp": 1587549166942} {"marketId":"id-1"."currentStatus":"open"."country":"china"."zipcode":"130000"."timestamp": 1587549177136}Copy the code

It can be observed that the data with marketId as ID-1 appeared twice in the historical data, which also met our previous requirements.

6 summarizes

As you can see from the code, Micronaut’s overall implementation is very similar to Springboot’s. Micronaut continues the dependency injection and bean-like concept and builds on it with support for Cloud Native. Examples include AWS Serverless support, Docker deployment, Health Check, Metrics, Distributed Tracking, etc.

On the server side, I deliberately switched from Java to Kotlin, but at the same time referenced some libraries from the classic Java ecosystem in Kotlin, and the two seamlessly worked together to make development easier and more fun. Currently, there is a lack of testing in the code. This knowledge involves a wide range of content, and I hope it can be completed slowly in the future.

After Data is entered into Kafka losslessly, tools like Kafka Connect can be used to import it into an enterprise’s Data warehouse or into the Data Lake supported by distributed systems like Hadoop. Modeling the data warehouse on this basis (which can also be done with Kafka Streams) provides high-quality data for machine learning. At the same time, the streaming data processing method of ELT (different from ETL) enables the whole data warehouse to do real-time historical data statistics more efficiently and with low delay, providing more business value for enterprises.

Another big beneficiary of the Event Sourcing system is the knowledge graph in the NLP domain. In fact, the construction of knowledge graph is essentially a process of graph database modeling, but the data state of this graph database needs to be updated regularly in order to generate greater value. The Event Sourcing system supported by Kafka Streams allows us to update the status and relationships of each node in the map in real time, and combine machine learning, graph algorithms and distributed streaming processing to do more fancy operations.

In terms of performance, Kubernetes can easily expand Kafka Streams in high-concurrency business scenarios, or provide more efficient distributed database for background access. If necessary, Producer can write Kafka Streams in this project. Data Transformation and read further microservice fragmentation, removing Netty Server where Web services are not needed.

This article does not cover asynchronous operation, test development, monitoring, deployment, operation and maintenance, security and other topics. Some areas, such as “security,” are also outside of my personal tech stack and deployed differently from company to company, with a basic introduction at best.

Source: zhihu – zhuanlan.zhihu.com/p/133594611