Jkes is a search framework based on Java, Kafka, ElasticSearch. Jkes provides annotation-driven JPA-style object/document mapping using REST apis for document searching.

Project home page: https://github.com/chaokunyang/jkes

The installation

You can refer to the JKES-Integration-test project to quickly learn how to use the JKES framework. Jkes-integration-test is a Spring Boot Application that we use to test functional integrity.

  • The installationjkes-index-connectorandjkes-delete-connectorTo the Kafka Connect classpath
  • Install the Smart Chinese Analysis Plugin
sudo bin/elasticsearch-plugin install analysis-smartcn
Copy the code

configuration

  • Introduce the JKES-spring-data-JPA dependency
  • Add the configuration
@EnableAspectJAutoProxy @EnableJkes @Configuration public class JkesConfig { @Bean public PlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport) { return new SearchPlatformTransactionManager(new JpaTransactionManager(factory), eventSupport); }}Copy the code
  • Provide JkesProperties Bean
@Component
@Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {

    @PostConstruct
    public void setUp() {
        Config.setJkesProperties(this);
    }

    @Override
    public String getKafkaBootstrapServers() {
        return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292";
    }

    @Override
    public String getKafkaConnectServers() {
        return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084";
    }

    @Override
    public String getEsBootstrapServers() {
        return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200";
    }

    @Override
    public String getDocumentBasePackage() {
        return "com.timeyang.jkes.integration_test.domain";
    }

    @Override
    public String getClientId() {
        return "integration_test";
    }

}
Copy the code

You can be flexible here, using @ConfigurationProperties to provide the configuration if you are using Spring Boot

  • Adding index management endpoints Because we do not know which Web technology the client is using, the index endpoints need to be added on the client side. For example, inSpring MVC, you can add index endpoints as follows
@RestController @RequestMapping("/api/search") public class SearchEndpoint { private Indexer indexer; @Autowired public SearchEndpoint(Indexer indexer) { this.indexer = indexer; } @RequestMapping(value = "/start_all", method = RequestMethod.POST) public void startAll() { indexer.startAll(); } @RequestMapping(value = "/start/{entityClassName:.+}", method = RequestMethod.POST) public void start(@PathVariable("entityClassName") String entityClassName) { indexer.start(entityClassName); } @RequestMapping(value = "/stop_all", method = RequestMethod.PUT) public Map<String, Boolean> stopAll() { return indexer.stopAll(); } @RequestMapping(value = "/stop/{entityClassName:.+}", method = RequestMethod.PUT) public Boolean stop(@PathVariable("entityClassName") String entityClassName) { return indexer.stop(entityClassName); } @RequestMapping(value = "/progress", method = RequestMethod.GET) public Map<String, IndexProgress> getProgress() { return indexer.getProgress(); }}Copy the code

Quick start

The index API

Using com. Timeyang. Jkes. Core. The annotation package under related annotation tag entities

@lombok.Data
@Entity
@Document
public class Person extends AuditedEntity {

    // @Id will be identified automatically
    // @Field(type = FieldType.Long)
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @MultiFields(
            mainField = @Field(type = FieldType.Text),
            otherFields = {
                    @InnerField(suffix = "raw", type = FieldType.Keyword),
                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")
            }
    )
    private String name;

    @Field(type = FieldType.Keyword)
    private String gender;

    @Field(type = FieldType.Integer)
    private Integer age;

    // don't add @Field to test whether ignored
    // @Field(type = FieldType.Text)
    private String description;

    @Field(type = FieldType.Object)
    @ManyToOne(fetch = FetchType.EAGER)
    @JoinColumn(name = "group_id")
    private PersonGroup personGroup;

}
Copy the code
@lombok.Data @Entity @Document(type = "person_group", alias = "person_group_alias") public class PersonGroup extends AuditedEntity { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String name; private String interests; @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval = true) private List<Person> persons; private String description; @DocumentId @Field(type = FieldType.Long) public Long getId() { return id; } @MultiFields( mainField = @Field(type = FieldType.Text), otherFields = { @InnerField(suffix = "raw", type = FieldType.Keyword), @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english") } ) public String getName() { return name; } @Field(type = FieldType.Text) public String getInterests() { return interests; } @Field(type = FieldType.Nested) public List<Person> getPersons() { return persons; } /** * public String getDescription() {return description; }}Copy the code

When entities are updated, documents are automatically indexed to ElasticSearch; When an entity is deleted, the document is automatically removed from ElasticSearch.

Search API

Jkes-search-service is a Spring Boot Application that provides rest search APIS and runs on port 9000 by default.

  • URI query
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search? from=3&size=10Copy the code
  • Nested query
integration_test_person_group/person_group/_search? from=0&size=10 { "query": { "nested": { "path": "persons", "score_mode": "avg", "query": { "bool": { "must": [ { "range": { "persons.age": { "gt": 5 } } } ] } } } } }Copy the code
  • match query
integration_test_person_group/person_group/_search? from=0&size=10 { "query": { "match": { "interests": "Hadoop" } } }Copy the code
  • bool query
{ "query": { "bool" : { "must" : { "match" : { "interests" : "Hadoop" } }, "filter": { "term" : { "name.raw" : "name0" } }, "should" : [ { "match" : { "interests" : "Flink" } }, { "nested" : { "path" : "persons", "score_mode" : "avg", "query" : { "bool" : { "must" : [ { "match" : {"persons.name" : "name40"} }, { "match" : {"persons.interests" : "interests"} } ], "must_not" : { "range" : { "age" : { "gte" : 50, "lte" : 60 } } } } } } } ], "minimum_should_match" : 1, "boost" : 1.0}}Copy the code
  • Source filtering
integration_test_person_group/person_group/_search
{
    "_source": false,
    "query" : {
        "match" : { "name" : "name17" }
    }
}
Copy the code
integration_test_person_group/person_group/_search
{
    "_source": {
            "includes": [ "name", "persons.*" ],
            "excludes": [ "date*", "version", "persons.age" ]
        },
    "query" : {
        "match" : { "name" : "name17" }
    }
}
Copy the code
  • prefix
integration_test_person_group/person_group/_search
{ 
  "query": {
    "prefix" : { "name" : "name" }
  }
}
Copy the code
  • wildcard
integration_test_person_group/person_group/_search
{
    "query": {
        "wildcard" : { "name" : "name*" }
    }
}
Copy the code
  • regexp
integration_test_person_group/person_group/_search
{
    "query": {
        "regexp":{
            "name": "na.*17"
        }
    }
}
Copy the code

How Jkes works

How indexes work:

  • When the application starts, Jkes scans all annotations@DocumentAnnotated entities for which metadata is built.
  • Based on the built metadata, createindexandmappingThe configuration in Json format is then passedElasticSearch Java Rest ClientWill create/updateindexConfiguration.
  • Create/update for each documentKafka ElasticSearch ConnectorFor creating/updating documents
  • Start/update for the entire projectJkes Deleter ConnectorTo delete a document
  • Methods of intercepting data operations. will* save(*)Method returns data wrapped asSaveEventSave toEventContainer; use(* delete*(..)Method to generate aDeleteEvent/DeleteAllEventSave toEventContainer.
  • Intercepting transactions. Used after a transaction has committedJkesKafkaProducersendSaveEventKafka will use what we provideJkesJsonSerializerSerialize the specified data and send it to Kafka.
  • withSaveEventDifferent,DeleteEventInstead of sending a single copy of data, it is serialized directly and sent to Kafka
  • withSaveEventandDeleteEventDifferent,DeleteAllEventData is not sent to Kafka, but directly throughElasticSearch Java Rest ClientDelete the correspondingindexThen rebuild the index and restartKafka ElasticSearch Connector

Query working principle:

  • The query service is provided through the REST API
  • We didn’t use ElasticSearch directly for queries because we needed to use machine learning for search sorting in later versions, and coupling ElasticSearch directly would have made the search sorting API more difficult to access
  • The query service is a Spring Boot Application packaged as an image using Docker
  • Query services provide multiple versions of apis for API evolution and compatibility
  • Query service resolutionjsonRequest, do some pre-processing after useElasticSearch Java Rest ClientForward to ElasticSearch, parse the resulting response, and return it to the client for further processing.
  • To make it easier for the client to develop, the query service provides a query UI interface where the developer can copy the JSON request body into the application after the expected results are obtained from the page.

The flow chart

Module is introduced

jkes-core

Jkes-core is the core of the entire JKES. It mainly includes the following functions:

  • annotationThe package provides the core annotations for JKES
  • elasticsearchPackage encapsulates theelasticsearchRelated operations, such as creating/updating indexes for all documents and updating the mapping
  • kafkaThe package provides Kafka producer, Kafka Json Serializer, Kafka Connect Client
  • metadataThe package provides a construction and structural model for core annotation metadata
  • eventPackages provide event models and containers
  • exceptionPackages provide common Jkes exceptions
  • httpPackage is based onApache Http ClientEncapsulates common HTTP JSON requests
  • supportPackages expose Jkes core configuration support
  • utilThe package provides several utility classes for ease of development. For example: implies, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils

jkes-boot

Jkes-boot is used for integration with some third-party open source frameworks.

Currently, we provide integration with spring Data JPA through jkes-spring-data-jPA. Through the use of Spring AOP mechanism, method of the Repository to intercept, generate SaveEvent/DeleteEvent/DeleteAllEvent save to EventContainer. Through the use of we provide SearchPlatformTransactionManager, transaction manager of commonly used (such as JpaTransactionManager) for packaging, providing transaction interception function.

In future releases, we will provide integration with more frameworks.

Jkes – spring – data – jpa specification:

  • ContextSupportClass is used to fetch from the bean factoryRepository Bean
  • @EnableJkesMake it easy for clients to turn on Jkes functionality and provide a configuration model consistent with Spring
  • EventSupportHandle the details of events to generate corresponding events when saving and deleting dataEventContainerTo handle the event when the transaction commits and rolls back
  • SearchPlatformTransactionManagerThe client wrapped transaction manager is added at transaction commit and rollback timeThe callback hooks
  • auditThe package provides a simpleAuditedEntityParent class, easy to add audit functionality, version information can be used to combineElasticSearchThe versioning mechanism ensures that out-of-date document data is not indexed
  • exceptionPackages encapsulate common exceptions
  • interceptThe package provides AOP pointcuts and facets
  • indexPackage providesFull quantity indexFunction. Currently, we provide a basisThe thread poolIndex mechanism and based onForkJoinIndex mechanism. In future releases, we will refactor the code to add basedBlocking queuetheProducer-consumerPattern to provide concurrency performance

jkes-services

Jkes-services is used to provide services. Currently, JKES-Services offers the following services:

  • jkes-delete-connector

    • Jkes-delete-connector is a Kafka connector that can be used to retrieve index delete events from Kafka clusters and then delete ElasticSearch files using the Jest Client.

    • With the rest Admin API of Kafka Connect, we can easily implement document deletion on multi-tenant platforms. By starting a JKES-delete-Connector for each project, document deletion for that project can be handled automatically. Instead of having to manually start a Kafka Consumer every time we start a new project to handle document deletion for that project. While you can reduce this effort with regular subscriptions, it’s still very inflexible

  • jkes-search-service

    • jkes-search-serviceIs a restful search service that provides multiple versions of the REST Query API. Query services provide multiple versions of apis for API evolution and compatibility
    • jkes-search-serviceUri-style search and JSON request body style search are currently supported.
    • We didn’t use ElasticSearch directly for queries because we needed to use machine learning for search sorting in later versions, and coupling ElasticSearch directly would have made search sorting more difficult to access
    • The query service is a Spring Boot Application packaged as an image using Docker
    • Query service resolutionjsonRequest, do some pre-processing after useElasticSearch Java Rest ClientForward to ElasticSearch, parse the resulting response, and return it to the client for further processing.
    • To make it easier for the client to develop, the query service provides a query UI interface where the developer can copy the JSON request body into the application after the expected results are obtained from the page.

In the future, we will build index clusters based on ZooKeeper to provide cluster index management functions

jkes-integration-test

Jkes-integration-test is a Spring Boot-based integration test project for functional testing. Also measure the throughput of some common operations

The development of

To build a development version you’ll need a recent version of Kafka. You can build jkes with Maven using the standard lifecycle phases.

Contribute

  • Source Code: https://github.com/chaokunyang/jkes
  • Issue Tracker: https://github.com/chaokunyang/jkes/issues

LICENSE

This project is licensed under Apache License 2.0.