Jkes is a search framework based on Java, Kafka, ElasticSearch. Jkes provides annotation-driven JPA-style object/document mapping using REST apis for document searching.
Project home page: https://github.com/chaokunyang/jkes
The installation
You can refer to the JKES-Integration-test project to quickly learn how to use the JKES framework. Jkes-integration-test is a Spring Boot Application that we use to test functional integrity.
- The installation
jkes-index-connector
andjkes-delete-connector
To the Kafka Connect classpath - Install the Smart Chinese Analysis Plugin
sudo bin/elasticsearch-plugin install analysis-smartcn
Copy the code
configuration
- Introduce the JKES-spring-data-JPA dependency
- Add the configuration
@EnableAspectJAutoProxy @EnableJkes @Configuration public class JkesConfig { @Bean public PlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport) { return new SearchPlatformTransactionManager(new JpaTransactionManager(factory), eventSupport); }}Copy the code
- Provide JkesProperties Bean
@Component
@Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {
@PostConstruct
public void setUp() {
Config.setJkesProperties(this);
}
@Override
public String getKafkaBootstrapServers() {
return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292";
}
@Override
public String getKafkaConnectServers() {
return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084";
}
@Override
public String getEsBootstrapServers() {
return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200";
}
@Override
public String getDocumentBasePackage() {
return "com.timeyang.jkes.integration_test.domain";
}
@Override
public String getClientId() {
return "integration_test";
}
}
Copy the code
You can be flexible here, using @ConfigurationProperties to provide the configuration if you are using Spring Boot
- Adding index management endpoints Because we do not know which Web technology the client is using, the index endpoints need to be added on the client side. For example, in
Spring MVC
, you can add index endpoints as follows
@RestController @RequestMapping("/api/search") public class SearchEndpoint { private Indexer indexer; @Autowired public SearchEndpoint(Indexer indexer) { this.indexer = indexer; } @RequestMapping(value = "/start_all", method = RequestMethod.POST) public void startAll() { indexer.startAll(); } @RequestMapping(value = "/start/{entityClassName:.+}", method = RequestMethod.POST) public void start(@PathVariable("entityClassName") String entityClassName) { indexer.start(entityClassName); } @RequestMapping(value = "/stop_all", method = RequestMethod.PUT) public Map<String, Boolean> stopAll() { return indexer.stopAll(); } @RequestMapping(value = "/stop/{entityClassName:.+}", method = RequestMethod.PUT) public Boolean stop(@PathVariable("entityClassName") String entityClassName) { return indexer.stop(entityClassName); } @RequestMapping(value = "/progress", method = RequestMethod.GET) public Map<String, IndexProgress> getProgress() { return indexer.getProgress(); }}Copy the code
Quick start
The index API
Using com. Timeyang. Jkes. Core. The annotation package under related annotation tag entities
@lombok.Data
@Entity
@Document
public class Person extends AuditedEntity {
// @Id will be identified automatically
// @Field(type = FieldType.Long)
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@MultiFields(
mainField = @Field(type = FieldType.Text),
otherFields = {
@InnerField(suffix = "raw", type = FieldType.Keyword),
@InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")
}
)
private String name;
@Field(type = FieldType.Keyword)
private String gender;
@Field(type = FieldType.Integer)
private Integer age;
// don't add @Field to test whether ignored
// @Field(type = FieldType.Text)
private String description;
@Field(type = FieldType.Object)
@ManyToOne(fetch = FetchType.EAGER)
@JoinColumn(name = "group_id")
private PersonGroup personGroup;
}
Copy the code
@lombok.Data @Entity @Document(type = "person_group", alias = "person_group_alias") public class PersonGroup extends AuditedEntity { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String name; private String interests; @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval = true) private List<Person> persons; private String description; @DocumentId @Field(type = FieldType.Long) public Long getId() { return id; } @MultiFields( mainField = @Field(type = FieldType.Text), otherFields = { @InnerField(suffix = "raw", type = FieldType.Keyword), @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english") } ) public String getName() { return name; } @Field(type = FieldType.Text) public String getInterests() { return interests; } @Field(type = FieldType.Nested) public List<Person> getPersons() { return persons; } /** * public String getDescription() {return description; }}Copy the code
When entities are updated, documents are automatically indexed to ElasticSearch; When an entity is deleted, the document is automatically removed from ElasticSearch.
Search API
Jkes-search-service is a Spring Boot Application that provides rest search APIS and runs on port 9000 by default.
- URI query
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search? from=3&size=10Copy the code
- Nested query
integration_test_person_group/person_group/_search? from=0&size=10 { "query": { "nested": { "path": "persons", "score_mode": "avg", "query": { "bool": { "must": [ { "range": { "persons.age": { "gt": 5 } } } ] } } } } }Copy the code
- match query
integration_test_person_group/person_group/_search? from=0&size=10 { "query": { "match": { "interests": "Hadoop" } } }Copy the code
- bool query
{ "query": { "bool" : { "must" : { "match" : { "interests" : "Hadoop" } }, "filter": { "term" : { "name.raw" : "name0" } }, "should" : [ { "match" : { "interests" : "Flink" } }, { "nested" : { "path" : "persons", "score_mode" : "avg", "query" : { "bool" : { "must" : [ { "match" : {"persons.name" : "name40"} }, { "match" : {"persons.interests" : "interests"} } ], "must_not" : { "range" : { "age" : { "gte" : 50, "lte" : 60 } } } } } } } ], "minimum_should_match" : 1, "boost" : 1.0}}Copy the code
- Source filtering
integration_test_person_group/person_group/_search
{
"_source": false,
"query" : {
"match" : { "name" : "name17" }
}
}
Copy the code
integration_test_person_group/person_group/_search
{
"_source": {
"includes": [ "name", "persons.*" ],
"excludes": [ "date*", "version", "persons.age" ]
},
"query" : {
"match" : { "name" : "name17" }
}
}
Copy the code
- prefix
integration_test_person_group/person_group/_search
{
"query": {
"prefix" : { "name" : "name" }
}
}
Copy the code
- wildcard
integration_test_person_group/person_group/_search
{
"query": {
"wildcard" : { "name" : "name*" }
}
}
Copy the code
- regexp
integration_test_person_group/person_group/_search
{
"query": {
"regexp":{
"name": "na.*17"
}
}
}
Copy the code
How Jkes works
How indexes work:
- When the application starts, Jkes scans all annotations
@Document
Annotated entities for which metadata is built. - Based on the built metadata, create
index
andmapping
The configuration in Json format is then passedElasticSearch Java Rest Client
Will create/updateindex
Configuration. - Create/update for each document
Kafka ElasticSearch Connector
For creating/updating documents - Start/update for the entire project
Jkes Deleter Connector
To delete a document - Methods of intercepting data operations. will
* save(*)
Method returns data wrapped asSaveEvent
Save toEventContainer
; use(* delete*(..)
Method to generate aDeleteEvent/DeleteAllEvent
Save toEventContainer
. - Intercepting transactions. Used after a transaction has committed
JkesKafkaProducer
sendSaveEvent
Kafka will use what we provideJkesJsonSerializer
Serialize the specified data and send it to Kafka. - with
SaveEvent
Different,DeleteEvent
Instead of sending a single copy of data, it is serialized directly and sent to Kafka - with
SaveEvent
andDeleteEvent
Different,DeleteAllEvent
Data is not sent to Kafka, but directly throughElasticSearch Java Rest Client
Delete the correspondingindex
Then rebuild the index and restartKafka ElasticSearch Connector
Query working principle:
- The query service is provided through the REST API
- We didn’t use ElasticSearch directly for queries because we needed to use machine learning for search sorting in later versions, and coupling ElasticSearch directly would have made the search sorting API more difficult to access
- The query service is a Spring Boot Application packaged as an image using Docker
- Query services provide multiple versions of apis for API evolution and compatibility
- Query service resolution
json
Request, do some pre-processing after useElasticSearch Java Rest Client
Forward to ElasticSearch, parse the resulting response, and return it to the client for further processing. - To make it easier for the client to develop, the query service provides a query UI interface where the developer can copy the JSON request body into the application after the expected results are obtained from the page.
The flow chart
Module is introduced
jkes-core
Jkes-core is the core of the entire JKES. It mainly includes the following functions:
annotation
The package provides the core annotations for JKESelasticsearch
Package encapsulates theelasticsearch
Related operations, such as creating/updating indexes for all documents and updating the mappingkafka
The package provides Kafka producer, Kafka Json Serializer, Kafka Connect Clientmetadata
The package provides a construction and structural model for core annotation metadataevent
Packages provide event models and containersexception
Packages provide common Jkes exceptionshttp
Package is based onApache Http Client
Encapsulates common HTTP JSON requestssupport
Packages expose Jkes core configuration supportutil
The package provides several utility classes for ease of development. For example: implies, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils
jkes-boot
Jkes-boot is used for integration with some third-party open source frameworks.
Currently, we provide integration with spring Data JPA through jkes-spring-data-jPA. Through the use of Spring AOP mechanism, method of the Repository to intercept, generate SaveEvent/DeleteEvent/DeleteAllEvent save to EventContainer. Through the use of we provide SearchPlatformTransactionManager, transaction manager of commonly used (such as JpaTransactionManager) for packaging, providing transaction interception function.
In future releases, we will provide integration with more frameworks.
Jkes – spring – data – jpa specification:
ContextSupport
Class is used to fetch from the bean factoryRepository Bean
@EnableJkes
Make it easy for clients to turn on Jkes functionality and provide a configuration model consistent with SpringEventSupport
Handle the details of events to generate corresponding events when saving and deleting dataEventContainer
To handle the event when the transaction commits and rolls backSearchPlatformTransactionManager
The client wrapped transaction manager is added at transaction commit and rollback timeThe callback hooks
audit
The package provides a simpleAuditedEntity
Parent class, easy to add audit functionality, version information can be used to combineElasticSearch
The versioning mechanism ensures that out-of-date document data is not indexedexception
Packages encapsulate common exceptionsintercept
The package provides AOP pointcuts and facetsindex
Package providesFull quantity index
Function. Currently, we provide a basisThe thread pool
Index mechanism and based onForkJoin
Index mechanism. In future releases, we will refactor the code to add basedBlocking queue
theProducer-consumer
Pattern to provide concurrency performance
jkes-services
Jkes-services is used to provide services. Currently, JKES-Services offers the following services:
-
jkes-delete-connector
-
Jkes-delete-connector is a Kafka connector that can be used to retrieve index delete events from Kafka clusters and then delete ElasticSearch files using the Jest Client.
-
With the rest Admin API of Kafka Connect, we can easily implement document deletion on multi-tenant platforms. By starting a JKES-delete-Connector for each project, document deletion for that project can be handled automatically. Instead of having to manually start a Kafka Consumer every time we start a new project to handle document deletion for that project. While you can reduce this effort with regular subscriptions, it’s still very inflexible
-
-
jkes-search-service
jkes-search-service
Is a restful search service that provides multiple versions of the REST Query API. Query services provide multiple versions of apis for API evolution and compatibilityjkes-search-service
Uri-style search and JSON request body style search are currently supported.- We didn’t use ElasticSearch directly for queries because we needed to use machine learning for search sorting in later versions, and coupling ElasticSearch directly would have made search sorting more difficult to access
- The query service is a Spring Boot Application packaged as an image using Docker
- Query service resolution
json
Request, do some pre-processing after useElasticSearch Java Rest Client
Forward to ElasticSearch, parse the resulting response, and return it to the client for further processing. - To make it easier for the client to develop, the query service provides a query UI interface where the developer can copy the JSON request body into the application after the expected results are obtained from the page.
In the future, we will build index clusters based on ZooKeeper to provide cluster index management functions
jkes-integration-test
Jkes-integration-test is a Spring Boot-based integration test project for functional testing. Also measure the throughput of some common operations
The development of
To build a development version you’ll need a recent version of Kafka. You can build jkes with Maven using the standard lifecycle phases.
Contribute
- Source Code: https://github.com/chaokunyang/jkes
- Issue Tracker: https://github.com/chaokunyang/jkes/issues
LICENSE
This project is licensed under Apache License 2.0.