jkes

jkes

将Java、Kafka和ElasticSearch集成的全功能搜索解决方案

Jkes是一个整合Java、Kafka和ElasticSearch技术的搜索框架。它采用注解驱动的JPA风格对象/文档映射,并提供RESTful API进行文档搜索。该框架支持自动索引更新和删除,同时提供灵活的查询接口。Jkes易于集成,可扩展性强,适合需要高性能搜索功能的Java应用。其架构支持多租户平台,为后续引入机器学习搜索排序功能做好了准备。

Jkes搜索框架JavaKafkaElasticSearchGithub开源项目

Jkes

Jkes是一个基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驱动的JPA风格的对象/文档映射,使用REST API进行文档搜索。

安装

可以参考jkes-integration-test项目快速掌握Jkes框架的使用方法。jkes-integration-test是我们用来测试功能完整性的一个Spring Boot应用。

  • jkes-index-connectorjkes-delete-connector安装到Kafka Connect的类路径
  • 安装Smart Chinese Analysis插件
sudo bin/elasticsearch-plugin install analysis-smartcn

配置

  • 引入jkes-spring-data-jpa依赖
  • 添加配置
@EnableAspectJAutoProxy @EnableJkes @Configuration public class JkesConfig { @Bean public PlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport) { return new SearchPlatformTransactionManager(new JpaTransactionManager(factory), eventSupport); } }
  • 提供JkesProperties Bean
@Component @Configuration public class JkesConf extends DefaultJkesPropertiesImpl { @PostConstruct public void setUp() { Config.setJkesProperties(this); } @Override public String getKafkaBootstrapServers() { return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292"; } @Override public String getKafkaConnectServers() { return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084"; } @Override public String getEsBootstrapServers() { return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200"; } @Override public String getDocumentBasePackage() { return "com.timeyang.jkes.integration_test.domain"; } @Override public String getClientId() { return "integration_test"; } }

这里可以很灵活,如果使用Spring Boot,可以使用@ConfigurationProperties提供配置

  • 增加索引管理端点 因为我们不知道客户端使用的是哪种Web技术,所以索引端点需要在客户端添加。例如在Spring MVC中,可以按照如下方式添加索引端点:
@RestController @RequestMapping("/api/search") public class SearchEndpoint { private Indexer indexer; @Autowired public SearchEndpoint(Indexer indexer) { this.indexer = indexer; } @RequestMapping(value = "/start_all", method = RequestMethod.POST) public void startAll() { indexer.startAll(); } @RequestMapping(value = "/start/{entityClassName:.+}", method = RequestMethod.POST) public void start(@PathVariable("entityClassName") String entityClassName) { indexer.start(entityClassName); } @RequestMapping(value = "/stop_all", method = RequestMethod.PUT) public Map<String, Boolean> stopAll() { return indexer.stopAll(); } @RequestMapping(value = "/stop/{entityClassName:.+}", method = RequestMethod.PUT) public Boolean stop(@PathVariable("entityClassName") String entityClassName) { return indexer.stop(entityClassName); } @RequestMapping(value = "/progress", method = RequestMethod.GET) public Map<String, IndexProgress> getProgress() { return indexer.getProgress(); } }

快速开始

索引API

使用com.timeyang.jkes.core.annotation包下相关注解标记实体

@lombok.Data @Entity @Document public class Person extends AuditedEntity { // @Id将自动被识别 // @Field(type = FieldType.Long) @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; @MultiFields( mainField = @Field(type = FieldType.Text), otherFields = { @InnerField(suffix = "raw", type = FieldType.Keyword), @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english") } ) private String name; @Field(type = FieldType.Keyword) private String gender; @Field(type = FieldType.Integer) private Integer age; // 不添加@Field注解来测试是否被忽略 // @Field(type = FieldType.Text) private String description; @Field(type = FieldType.Object) @ManyToOne(fetch = FetchType.EAGER) @JoinColumn(name = "group_id") private PersonGroup personGroup; }
@lombok.Data @Entity @Document(type = "person_group", alias = "person_group_alias") public class PersonGroup extends AuditedEntity { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String name; private String interests; @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval = true) private List<Person> persons; private String description; @DocumentId @Field(type = FieldType.Long) public Long getId() { return id; } @MultiFields( mainField = @Field(type = FieldType.Text), otherFields = { @InnerField(suffix = "raw", type = FieldType.Keyword), @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english") } ) public String getName() { return name; } @Field(type = FieldType.Text) public String getInterests() { return interests; } @Field(type = FieldType.Nested) public List<Person> getPersons() { return persons; } /** * 不加Field注解,测试序列化时是否忽略 */ public String getDescription() { return description; } }

当更新实体时,文档会被自动索引到ElasticSearch;删除实体时,文档会自动从ElasticSearch中删除。

搜索API

启动搜索服务jkes-search-service,搜索服务是一个Spring Boot应用,提供REST搜索API,默认运行在9000端口。

  • URI查询
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
  • 嵌套查询
integration_test_person_group/person_group/_search?from=0&size=10
{
  "query": {
    "nested": {
      "path": "persons",
      "score_mode": "avg",
      "query": {
        "bool": {
          "must": [
            {
              "range": {
                "persons.age": {
                  "gt": 5
                }
              }
            }
          ]
        }
      }
    }
  }
}
  • 匹配查询
integration_test_person_group/person_group/_search?from=0&size=10
{
  "query": {
      "match": {
        "interests": "Hadoop"
      }
    }
}
  • 布尔查询
{
  "query": {
    "bool" : {
      "must" : {
        "match" : { "interests" : "Hadoop" }
      },
      "filter": {
        "term" : { "name.raw" : "name0" }
      },
      "should" : [
        { "match" : { "interests" : "Flink" } },
        {
            "nested" : {
                "path" : "persons",
                "score_mode" : "avg",

"query" : { "bool" : { "must" : [ { "match" : {"persons.name" : "name40"} }, { "match" : {"persons.interests" : "interests"} } ], "must_not" : { "range" : { "age" : { "gte" : 50, "lte" : 60 } } } } } }

], "minimum_should_match" : 1, "boost" : 1.0 }

}

}

- 源过滤

integration_test_person_group/person_group/_search { "_source": false, "query" : { "match" : { "name" : "name17" } } }

integration_test_person_group/person_group/_search { "_source": { "includes": [ "name", "persons." ], "excludes": [ "date", "version", "persons.age" ] }, "query" : { "match" : { "name" : "name17" } } }

- 前缀

integration_test_person_group/person_group/_search { "query": { "prefix" : { "name" : "name" } } }

- 通配符

integration_test_person_group/person_group/_search { "query": { "wildcard" : { "name" : "name*" } } }

- 正则表达式

integration_test_person_group/person_group/_search { "query": { "regexp":{ "name": "na.*17" } } }


## Jkes工作原理
索引工作原理:
- 应用启动时,Jkes扫描所有标注`@Document`注解的实体,为它们构建元数据。
- 基于构建的元数据,创建`index`和`mapping` Json格式的配置,然后通过`ElasticSearch Java Rest Client`创建/更新`index`配置。
- 为每个文档创建/更新`Kafka ElasticSearch Connector`,用于创建/更新文档
- 为整个项目启动/更新`Jkes Deleter Connector`,用于删除文档
- 拦截数据操作方法。将`* save(*)`方法返回的数据包装为`SaveEvent`保存到`EventContainer`;使用`(* delete*(..)`方法的参数,生成一个`DeleteEvent/DeleteAllEvent`保存到`EventContainer`。
- 拦截事务。在事务提交后使用`JkesKafkaProducer`发送`SaveEvent`中的实体到Kafka,Kafka会使用我们提供的`JkesJsonSerializer`序列化指定的数据,然后发送到Kafka。
- 与`SaveEvent`不同,`DeleteEvent`会直接被序列化,然后发送到Kafka,而不是只发送一份数据
- 与`SaveEvent`和`DeleteEvent`不同,`DeleteAllEvent`不会发送数据到Kafka,而是直接通过`ElasticSearch Java Rest Client`删除相应的`index`,然后重建该索引,重启`Kafka ElasticSearch Connector`

查询工作原理:
- 查询服务通过rest api提供
- 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序API的接入难度
- 查询服务是一个Spring Boot应用,使用docker打包为镜像
- 查询服务提供多版本API,用于API演进和兼容
- 查询服务解析`json`请求,进行一些预处理后,使用`ElasticSearch Java Rest Client`转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。
- 为了便于客户端人员开发,查询服务提供了一个[查询UI界面](http://localhost:9000/api/v1),开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。

## 流程图
![Jkes流程图](https://yellow-cdn.veclightyear.com/0a4dffa0/da9ec837-8810-46ee-929c-8373f9843373.png)

## 模块介绍
### jkes-core
`jkes-core`是整个`jkes`的核心部分。主要包括以下功能:
- `annotation`包提供了jkes的核心注解
- `elasticsearch`包封装了`elasticsearch`相关的操作,如为所有的文档创建/更新索引,更新mapping
- `kafka`包提供了Kafka生产者,Kafka Json序列化器,Kafka Connect客户端
- `metadata`包提供了核心的注解元数据的构建与结构化模型
- `event`包提供了事件模型与容器
- `exception`包提供了常见的Jkes异常
- `http`包基于`Apache Http Client`封装了常见的http json请求
- `support`包暴露了Jkes核心配置支持
- `util`包提供了一些工具类,便于开发。如:Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils

### jkes-boot
`jkes-boot`用于与一些第三方开源框架进行集成。

目前,我们通过`jkes-spring-data-jpa`,提供了与`spring data jpa`的集成。通过使用Spring的AOP机制,对`Repository`方法进行拦截,生成`SaveEvent/DeleteEvent/DeleteAllEvent`保存到`EventContainer`。通过使用我们提供的`SearchPlatformTransactionManager`,对常用的事务管理器(如`JpaTransactionManager`)进行包装,提供事务拦截功能。

在后续版本,我们会提供与更多框架的集成。

`jkes-spring-data-jpa`说明:
- `ContextSupport`类用于从bean工厂获取`Repository Bean`
- `@EnableJkes`让客户端能够轻松开启Jkes的功能,提供了与Spring一致的配置模型
- `EventSupport`处理事件的细节,在保存和删除数据时生成相应事件存放到`EventContainer`,在事务提交和回滚时处理相应的事件
- `SearchPlatformTransactionManager`包装了客户端的事务管理器,在事务提交和回滚时加入了`回调hook`
- `audit`包提供了一个简单的`AuditedEntity`父类,方便添加审计功能,版本信息可用于结合`ElasticSearch`的版本机制保证不会索引过期文档数据
- `exception`包封装了常见异常
- `intercept`包提供了AOP切点和切面
- `index`包提供了`全量索引`功能。当前,我们提供了基于`线程池`的索引机制和基于`ForkJoin`的索引机制。在后续版本,我们会重构代码,增加基于`阻塞队列`的`生产者-消费者`模式,提供并发性能
### jkes-services
`jkes-services`主要用于提供一些服务。
目前,`jkes-services`提供了以下服务:
- `jkes-delete-connector`
    
  - `jkes-delete-connector`是一个`Kafka Connector`,用于从Kafka集群获取索引删除事件(`DeleteEvent`),然后使用`Jest Client`删除ElasticSearch中相应的文档。
   
  - 借助Kafka Connect的REST管理API,我们轻松地实现了多租户平台上的文档删除功能。只需为每个项目启动一个`jkes-delete-connector`,就可以自动处理该项目的文档删除工作。这避免了每次启动新项目时,都需要手动启动一个Kafka Consumer来处理该项目的文档删除工作。虽然可以通过正则订阅来减少这种工作,但仍然非常不灵活。
    
- `jkes-search-service`
  
  - `jkes-search-service`是一个RESTful搜索服务,提供了多个版本的REST查询API。查询服务提供多版本API,用于API演进和兼容。
  - `jkes-search-service`目前支持URI风格的搜索和JSON请求体风格的搜索。
  - 我们没有直接使用ElasticSearch进行查询,因为我们计划在后续版本中使用机器学习进行搜索排序,而直接与ElasticSearch耦合会增加搜索排序的接入难度。
  - 查询服务是一个Spring Boot应用程序,使用Docker打包为镜像。
  - 查询服务解析`json`请求,进行一些预处理后,使用`ElasticSearch Java REST Client`转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回给客户端。
  - 为了方便客户端开发人员,查询服务提供了一个[查询UI界面](http://localhost:9000/api/v1),开发人员可以在这个页面获得预期结果后再将JSON请求体复制到程序中。

后续,我们将基于`ZooKeeper`构建索引集群,提供集群索引管理功能。

### jkes-integration-test
`jkes-integration-test`是一个基于Spring Boot的集成测试项目,用于进行`功能测试`。同时测量一些常见操作的`吞吐率`。

## 开发
要构建开发版本,你需要使用较新版本的Kafka。你可以使用Maven的标准生命周期阶段来构建jkes。

## 贡献
- 源代码:https://github.com/chaokunyang/jkes
- 问题跟踪:https://github.com/chaokunyang/jkes/issues

## 许可证
本项目基于Apache License 2.0许可证。

编辑推荐精选

Trae

Trae

字节跳动发布的AI编程神器IDE

Trae是一种自适应的集成开发环境(IDE),通过自动化和多元协作改变开发流程。利用Trae,团队能够更快速、精确地编写和部署代码,从而提高编程效率和项目交付速度。Trae具备上下文感知和代码自动完成功能,是提升开发效率的理想工具。

AI工具TraeAI IDE协作生产力转型热门
问小白

问小白

全能AI智能助手,随时解答生活与工作的多样问题

问小白,由元石科技研发的AI智能助手,快速准确地解答各种生活和工作问题,包括但不限于搜索、规划和社交互动,帮助用户在日常生活中提高效率,轻松管理个人事务。

热门AI助手AI对话AI工具聊天机器人
Transly

Transly

实时语音翻译/同声传译工具

Transly是一个多场景的AI大语言模型驱动的同声传译、专业翻译助手,它拥有超精准的音频识别翻译能力,几乎零延迟的使用体验和支持多国语言可以让你带它走遍全球,无论你是留学生、商务人士、韩剧美剧爱好者,还是出国游玩、多国会议、跨国追星等等,都可以满足你所有需要同传的场景需求,线上线下通用,扫除语言障碍,让全世界的语言交流不再有国界。

讯飞智文

讯飞智文

一键生成PPT和Word,让学习生活更轻松

讯飞智文是一个利用 AI 技术的项目,能够帮助用户生成 PPT 以及各类文档。无论是商业领域的市场分析报告、年度目标制定,还是学生群体的职业生涯规划、实习避坑指南,亦或是活动策划、旅游攻略等内容,它都能提供支持,帮助用户精准表达,轻松呈现各种信息。

AI办公办公工具AI工具讯飞智文AI在线生成PPTAI撰写助手多语种文档生成AI自动配图热门
讯飞星火

讯飞星火

深度推理能力全新升级,全面对标OpenAI o1

科大讯飞的星火大模型,支持语言理解、知识问答和文本创作等多功能,适用于多种文件和业务场景,提升办公和日常生活的效率。讯飞星火是一个提供丰富智能服务的平台,涵盖科技资讯、图像创作、写作辅助、编程解答、科研文献解读等功能,能为不同需求的用户提供便捷高效的帮助,助力用户轻松获取信息、解决问题,满足多样化使用场景。

热门AI开发模型训练AI工具讯飞星火大模型智能问答内容创作多语种支持智慧生活
Spark-TTS

Spark-TTS

一种基于大语言模型的高效单流解耦语音令牌文本到语音合成模型

Spark-TTS 是一个基于 PyTorch 的开源文本到语音合成项目,由多个知名机构联合参与。该项目提供了高效的 LLM(大语言模型)驱动的语音合成方案,支持语音克隆和语音创建功能,可通过命令行界面(CLI)和 Web UI 两种方式使用。用户可以根据需求调整语音的性别、音高、速度等参数,生成高质量的语音。该项目适用于多种场景,如有声读物制作、智能语音助手开发等。

咔片PPT

咔片PPT

AI助力,做PPT更简单!

咔片是一款轻量化在线演示设计工具,借助 AI 技术,实现从内容生成到智能设计的一站式 PPT 制作服务。支持多种文档格式导入生成 PPT,提供海量模板、智能美化、素材替换等功能,适用于销售、教师、学生等各类人群,能高效制作出高品质 PPT,满足不同场景演示需求。

讯飞绘文

讯飞绘文

选题、配图、成文,一站式创作,让内容运营更高效

讯飞绘文,一个AI集成平台,支持写作、选题、配图、排版和发布。高效生成适用于各类媒体的定制内容,加速品牌传播,提升内容营销效果。

热门AI辅助写作AI工具讯飞绘文内容运营AI创作个性化文章多平台分发AI助手
材料星

材料星

专业的AI公文写作平台,公文写作神器

AI 材料星,专业的 AI 公文写作辅助平台,为体制内工作人员提供高效的公文写作解决方案。拥有海量公文文库、9 大核心 AI 功能,支持 30 + 文稿类型生成,助力快速完成领导讲话、工作总结、述职报告等材料,提升办公效率,是体制打工人的得力写作神器。

openai-agents-python

openai-agents-python

OpenAI Agents SDK,助力开发者便捷使用 OpenAI 相关功能。

openai-agents-python 是 OpenAI 推出的一款强大 Python SDK,它为开发者提供了与 OpenAI 模型交互的高效工具,支持工具调用、结果处理、追踪等功能,涵盖多种应用场景,如研究助手、财务研究等,能显著提升开发效率,让开发者更轻松地利用 OpenAI 的技术优势。

下拉加载更多