项目存档通知
随着阿里业务发展和技术更新迭代,alibaba-rsocket-broker项目将会进行存档操作。 预计存档时间:2024年7月5日。项目存档后,问题、拉取请求、代码、标签、重要事件、wiki、版本、提交、标记、分支等将变为只读状态,但项目仍可被fork和标星。
目前社区替代的开源项目为 https://github.com/reactive-rsocket-broker ,欢迎大家继续使用和贡献。
Alibaba RSocket Broker是一款基于RSocket协议的反应式对等通信系统,为多方通信构建分布式的RPC、发布/订阅、流式等通信支持。
- 反应式:无需担心线程模型、全异步化、流式背压支持、独特的对等通信模式可适应各种内部网络环境和跨云混云的需求。
- 可编程:完善的控制面(Control Plane)支持,可定制和方便的功能扩展,如支持反向的Prometheus指标采集、ZipKin RSocket收集器、混沌工程等。
- 面向消息:面向消息通信,服务路由、过滤、可观察性都非常简单。
- 交换系统:完全分布式、异构系统整合简单,无论应用使用何种语言开发、部署在哪里,都可以相互通信。
更多RSocket Broker资源和介绍,请访问以下链接:
- Alibaba RSocket Broker Wiki https://github.com/alibaba/alibaba-rsocket-broker/wiki
- Alibaba RSocket Broker示例: https://github.com/alibaba-rsocket-broker/
- RSocket示例: http://rsocketbyexample.info
- Github讨论区: https://github.com/alibaba/alibaba-rsocket-broker/discussions
RSocket Broker工作原理
RSocket Broker充当应用间通信的中间人角色。应用启动后,与Broker建立长连接,连接时需标明身份,如果是服务提供者,会注册自己能提供的服务信息。Broker会为所有连接和服务列表建立对应的映射关系。 当应用需要调用其他服务时,会以消息形式将请求发送给Broker,Broker解析消息元信息,根据路由表将请求转发给服务提供者,然后将处理结果再转发给调用方。Broker完全异步化,无需关心线程池概念,且消息转发基于零拷贝,性能极高,这也是为何不用担心中心化Broker成为性能瓶颈的主要原因。
通过上述架构,RSocket Broker彻底解决了传统设计中的诸多问题:
- 配置推送:连接已建立,只需通过RSocket的metadataPush完成配置推送
- 服务注册和发现:应用与Broker建立长连接后,这个连接就是服务注册和发现,无需额外的服务注册中心
- 透明路由:应用调用服务时,无需知道服务对应的应用信息,Broker会完成路由
- 服务间调用:RSocket提供的4种模型可很好地解决服务间调用的各种复杂需求
- 负载均衡:所有应用与Broker建立长连接后,负载均衡在broker中心路由表完成,对应用完全透明
- 断路保护:现已调整为背压支持,更贴近实际业务场景
- 分布式消息:RSocket本身就基于消息推送,且是分布式的
- 多语言支持:RSocket是一套标准协议,主流语言的SDK都有支持,详情请访问 RSocket SDK Stack
项目模块
- alibaba-rsocket-service-common: RSocket服务接口定义基础模块,包括注解、响应式相关框架和支持类
- alibaba-rsocket-core: RSocket核心功能模块
- alibaba-rsocket-spring-boot-starter: Spring Boot Starter for RSocket,包括RSocket服务发布和消费
- alibaba-broker-spring-boot-starter: Spring Boot Starter for RSocket Broker,便于第三方进行扩展
- alibaba-rsocket-broker: Alibaba RSocket Broker参考实现
- alibaba-broker-registry-client-spring-boot-starter: 通过RSocket Broker对外提供服务发现服务
- alibaba-broker-config-client-spring-boot-starter: 通过RSocket Broker对外提供配置推送服务
- rsocket-broker-gateway-http: RSocket Broker HTTP网关,将HTTP转换为RSocket协议
- rsocket-broker-gateway-grpc: RSocket Broker gRPC网关,将gRPC转换为RSocket协议
开发环境要求
- JDK 11: RSocket Broker服务器基于Java 11,但Broker客户端等兼容Java 8
- Maven 3.5.x
- Node 16+: RSocket Broker采用Vaadin 23.0版本构建控制界面,需要安装Node 16以上版本
如何运行示例?
注意: 示例代码中的AccountService接口使用Protobuf进行序列化,使用了protobuf-maven-plugin生成对应的Protobuf。建议在IDE导入项目前,先在项目根目录执行"mvn -DskipTests package"完成Protobuf对应的代码生成,否则直接在IDE中编译可能出现编译失败的情况。
项目提供了完整的示例,您可以在example模块下找到,包括服务接口定义、服务实现和服务调用三个部分。
启动RSocket Broker
- Jbang方式启动: 通过
jbang rsocket-broker@alibaba-rsocket-broker
命令启动RSocket Broker - Docker Compose运行RSocket Broker: 在RSocket Broker项目目录下执行'docker-compose up -d'启动RSocket Broker
- 在IDE中运行RSocket Broker: 找到AlibabaRSocketBrokerServer类,运行main函数,启动RSocket Broker
运行RSocket Responder和Requester
- 找到RSocketResponderServer类,运行main函数,启动RSocket Responder对外提供响应式服务
- 找到RSocketRequesterApp类,运行main函数,启动RSocket Requester,进行响应式服务消费
- 在IDEA中,找到example.http,运行"GET http://localhost:8181/user/2"或执行以下命令,进行服务调用测试。
$ curl http://localhost:8181/user/2
示例的详细介绍请访问 Example
RSocket服务编写流程
包括如何创建响应式服务接口,在Responder端实现该接口,在Requester完成响应式服务调用,以及通信双方如何与Broker交互。
- 创建RSocket服务接口,可以创建单独的Maven模块存放这些接口,如user-service-api,示例代码如下:
public interface UserService {
Mono<User> findById(Integer id);
}
- 在RSocket Responder端实现该接口,同时给实现类添加@RSocketService注解,如下:
@RSocketService(serviceInterface = UserService.class)
@Service
public class UserServiceImpl implements UserService {
@Override
public Mono<User> findById(Integer id) {
return Mono.just(new User(1, "nick:" + id));
}
}
许多开发者会问,如果使用MySQL数据库,如何与Reactive集成。目前R2DBC支持MySQL,您可以参考Spring Cloud RSocket + R2DBC + MySQL的Demo实现: https://github.com/linux-china/spring-cloud-function-demo/
- 在RSocket Requester中,以代理方式创建响应式服务接口对应的Spring bean,如下:
@Bean
public UserService userService(@Autowired UpstreamManager upstreamManager) {
return RSocketRemoteServiceBuilder
.client(UserService.class)
.upstreamManager(upstreamManager)
.build();
}
- 在RSocket Requester端,进行代码调用,如提供HTTP REST API:
@RestController
public class PortalController {
@Autowired
UserService userService;
@GetMapping("/user/{id}")
public Mono<User> user(@PathVariable Integer id) {
return userService.findById(id);
}
}
示例项目请参考: https://github.com/alibaba-rsocket-broker/rsocket-broker-simple-example
参考资料
- RSocket: http://rsocket.io/
- RSocket Java SDK: https://github.com/rsocket/rsocket-java
- Spring RSocket: https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#rsocket
- Spring Boot RSocket Starter: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#boot-features-rsocket
- Project Reactor: http://projectreactor.io/
- Reactive Foundation: https://reactive.foundation/
- NATS执行引擎: https://docs.nats.io/using-nats/nex