基于netty实现Dubbo RPC调用

云惠网小编 2021年12月31日21:17:47
评论
5438字阅读18分7秒
摘要

基于netty实现dubborpc远程接口调用

广告也精彩
public class NettyClientHandler extends SimpleChannelInboundHandler implements Callable {
private ChannelHandlerContext context;
private String para;
private String result;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context = ctx;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("断开连接");
}
@Override
public synchronized Object call() throws Exception {
System.out.println("发送call消息:" + para);
context.writeAndFlush(para);
wait();
return result;
}
@Override
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
result = o.toString();
System.out.println("收到服务端的返回消息:" + o);
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("客户端发生异常");
}
void setPara(String str) {
this.para = str;
}
}

3.1 创建消费者启动程序

2.1 接口实现

3 消费者模块

1.1 程序目录

netty是基于NIO(同步非阻塞)开发的网络通信框架;对比传统BIO(阻塞IO),其并发性能有很大提升。而dubbo的底层就是使用netty作为网络框架,本文就手写简单的基于netty的RPC框架。

public class ClientBootStrap {
public static void main(String[] args) {
NettyClient nettyClient = new NettyClient("127.0.0.1", 40004);
String head = "dubbo#TestServie#";
// nettyClient.init();
TestService service = (TestService) nettyClient.getBean(TestService.class, head);
String result = service.hello("你好,我是服务消费者");
System.out.println("调用返回了结果:" + result);
}
}
channelRead0方法用于接收客户端传来的信息,同时对数据进行校验
校验成功后,截取有效参数调用服务接口

2.4 服务端业务处理Handler

成员变量para: 为调用远程接口服务的参数
成员变量result::为调用远程服务器接口返回结果
需要注意的是该handller实现了Callable接口中call()方法;
执行步骤为:
1、连接建立成功后执行channelActive方法
2、执行call方法发送数据到服务端,同时阻塞线程
3、服务端返回结果后执行channelRead0方法,唤醒线程,
4、执行call方法中wait()后面的步骤,返回结果

2 服务提供者模块

2.3 创建netty服务端

public class TestServiceImpl implements TestService {
@Override
public String hello(String msg) {
System.out.println("TestServiceImpl中hello被调用,参数:" + msg);
return "你好客户端,我已经收到你的消息:" + msg;
}
}
public interface TestService {
String hello(String msg);
}
**此步骤是netty常规服务端创建方式**

1 设计步骤

1.2 定义一个通用接口


public class NettyServer {
private int port;
public NettyServer(int port) {
this.port = port;
}
public void init() {
//创建一个用于接收连接的线程组,参数代表线程个数
EventLoopGroup boss = new NioEventLoopGroup(1);
//创建处理操作时间的线程组,没有参数netty会默认线程为内核数*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture ch = serverBootstrap.bind(port).sync();
ch.channel().closeFuture().sync();
} catch (Exception ex) {
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
通过代理模式调用
定义一个通用接口,作为服务提供者(provider)和消费者(consumer)之间的操作纽带
创建一个服务提供者,实现通用接口,并返回处理结果;网络方面监听消费者请求
创建一个服务消费者,通过代理模式调用远程服务接口

在这里插入图片描述


public class NettyServerHandler extends SimpleChannelInboundHandler {
private static String head = "dubbo#TestServie#";
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到客户端数据:" + msg);
if (msg.toString().startsWith(head)) {
TestService testService = new TestServiceImpl();
String result = testService.hello(msg.toString().substring(head.length()));
ctx.writeAndFlush(result);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("接收到连接请,channelActive被调用:" + ctx.channel().remoteAddress());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("读取完成");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("断开连接");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("抛出异常");
ctx.channel().close();
}
}

public class NettyClient {
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler nettyClientHandler;
private String host;
private int port;
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
//编写一个代理 请求服务提供者接口
public Object getBean(final Class<?> serviceClass, final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serviceClass}, ((proxy, method, args) -> {
System.out.println("开始执行代理");
if (nettyClientHandler == null)
init();
System.out.println("设置代理参数");
nettyClientHandler.setPara(providerName + args[0].toString());
return executorService.submit(nettyClientHandler).get();
}));
}
private static void init() {
System.out.println("开始执行init方法");
nettyClientHandler = new NettyClientHandler();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(nettyClientHandler);
}
});
bootstrap.connect("127.0.0.1", 40004).sync();
// future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//            worker.shutdownGracefully();
//            System.out.println("执行结束");
}
}
}

3.3 创建消费者业务处理handler

public class ServerBootStrap {
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer(40004);
nettyServer.init();
}
}

2.2 定义一个服务启动类

3.2 创建消费者网络通信模块

本文转自 https://blog.csdn.net/m0_38025927/article/details/122239997

腾讯云618
云惠网小编
SpringCloud -- Config、Bus解析 java

SpringCloud — Config、Bus解析

1、Config1.1、概述简介1. 分布式面临的问题微服务意味着要将单体应用中的业务拆分成一个个子服务,每个服务的粒度相对较小,因此系统中会出现大量的服务。由于每个服务都需要必要...
Java数据结构-了解复杂度 java

Java数据结构-了解复杂度

2.实例分析与计算  四.写在最后  // 计算斐波那契递归fibonacci的时间复杂度 int fibonacci(int N) { return N < 2 ? N : fibonacci...
Java数据结构-认识顺序表 java

Java数据结构-认识顺序表

目录二.顺序表1.概念及结构2.顺序表的实现打印顺序表获取顺序表的有效长度在pos位置新增元素判断是否包含某个元素查找某个元素对应的位置获取/查找pos位置的元素给pos位置的元素...
腾讯云618

发表评论