diff --git a/README.md b/README.md
new file mode 100644
index 0000000..5f3ab91
--- /dev/null
+++ b/README.md
@@ -0,0 +1 @@
+# tinyrpc-java
diff --git a/pom.xml b/pom.xml
index 5ea21c4..4de6fe8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,16 +43,6 @@
netty-all
4.1.86.Final
-
- org.apache.logging.log4j
- log4j-api
- 2.19.0
-
-
- org.apache.logging.log4j
- log4j-core
- 2.19.0
-
com.lmax
disruptor
@@ -73,6 +63,11 @@
protobuf-java
3.19.4
+
+ org.reflections
+ reflections
+ 0.9.11
+
diff --git a/src/main/java/com/iker/tinyrpc/TinyRpcInitializer.java b/src/main/java/com/iker/tinyrpc/TinyRpcInitializer.java
new file mode 100644
index 0000000..1a0082f
--- /dev/null
+++ b/src/main/java/com/iker/tinyrpc/TinyRpcInitializer.java
@@ -0,0 +1,33 @@
+package com.iker.tinyrpc;
+
+import com.iker.tinyrpc.util.SpringContextUtil;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.Arrays;
+
+@Component
+@Order(value = 1)
+@Slf4j
+public class TinyRpcInitializer implements CommandLineRunner {
+
+ @Getter
+ @Resource
+ private AnnotationConfigApplicationContext applicationContext;
+
+ @Override
+ public void run(String... args) throws Exception {
+ SpringContextUtil.setApplicationContext(applicationContext);
+ log.info(String.valueOf(applicationContext.getClass()));
+ String[] beans = applicationContext.getBeanDefinitionNames();
+ Arrays.sort(beans);
+ for (String bean : beans) {
+ System.out.println(bean + " of Type :: " + applicationContext.getBean(bean).getClass());
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/iker/tinyrpc/TinyRpcJavaApplication.java b/src/main/java/com/iker/tinyrpc/TinyRpcJavaApplication.java
index 69805f2..989cbcc 100644
--- a/src/main/java/com/iker/tinyrpc/TinyRpcJavaApplication.java
+++ b/src/main/java/com/iker/tinyrpc/TinyRpcJavaApplication.java
@@ -15,31 +15,14 @@
@SpringBootApplication
@Slf4j
-public class TinyRpcJavaApplication implements CommandLineRunner {
+public class TinyRpcJavaApplication {
public static void main(String[] args) {
log.info("TinyRpcJavaApplication run begin");
- ConfigurableApplicationContext context = SpringApplication.run(TinyRpcJavaApplication.class, args);
- SpringContextUtil.setApplicationContext(context);
+ SpringApplication.run(TinyRpcJavaApplication.class, args);
log.info("TinyRpcJavaApplication run end");
- }
-
- @Getter
- @Resource
- private ApplicationContext applicationContext;
- /**
- * @param args incoming main method arguments
- * @throws Exception
- */
- @Override
- public void run(String... args) throws Exception {
-// String[] beans = applicationContext.getBeanDefinitionNames();
-// Arrays.sort(beans);
-// for (String bean : beans)
-// {
-// System.out.println(bean + " of Type :: " + applicationContext.getBean(bean).getClass());
-// }
}
+
}
diff --git a/src/main/java/com/iker/tinyrpc/annotation/AnnotationContextHandler.java b/src/main/java/com/iker/tinyrpc/annotation/AnnotationContextHandler.java
new file mode 100644
index 0000000..e6cd982
--- /dev/null
+++ b/src/main/java/com/iker/tinyrpc/annotation/AnnotationContextHandler.java
@@ -0,0 +1,45 @@
+package com.iker.tinyrpc.annotation;
+
+import lombok.Getter;
+import org.reflections.Reflections;
+
+import java.lang.annotation.Annotation;
+import java.util.Set;
+
+public class AnnotationContextHandler {
+
+ @Getter
+ private final String packageName;
+
+ private final Reflections reflections;
+
+ public AnnotationContextHandler(String packageName) {
+ this.packageName = packageName;
+ reflections = new Reflections(packageName);
+ }
+
+ public Set> scanAnnotation(Class extends Annotation> clazz) {
+ return reflections.getTypesAnnotatedWith(clazz);
+
+// for (Class> item : classSet) {
+// TinyPBService annotation = item.getAnnotation(TinyPBService.class);
+// String name = Optional.ofNullable(annotation).orElseThrow(
+// () -> { throw new RuntimeException("get TinyPBService annotation null"); }
+// ).name();
+//
+// if (name.isEmpty()) {
+// name = item.getSuperclass().getSimpleName();
+// }
+//
+// try {
+// SpringContextUtil.getBean("rpcServiceFactory", ProtobufRpcServiceFactory.class).registerService(name, item.newInstance());
+// } catch (InstantiationException | IllegalAccessException e) {
+// throw new RuntimeException(e);
+// }
+//
+// }
+ }
+
+
+
+}
diff --git a/src/main/java/com/iker/tinyrpc/annotation/TinyPBService.java b/src/main/java/com/iker/tinyrpc/annotation/TinyPBService.java
new file mode 100644
index 0000000..743ee5c
--- /dev/null
+++ b/src/main/java/com/iker/tinyrpc/annotation/TinyPBService.java
@@ -0,0 +1,10 @@
+package com.iker.tinyrpc.annotation;
+
+import java.lang.annotation.*;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface TinyPBService {
+ String serviceName() default "";
+}
diff --git a/src/main/java/com/iker/tinyrpc/controller/TinyPBRpcController.java b/src/main/java/com/iker/tinyrpc/controller/TinyPBRpcController.java
deleted file mode 100644
index caeb7c7..0000000
--- a/src/main/java/com/iker/tinyrpc/controller/TinyPBRpcController.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package com.iker.tinyrpc.controller;
-
-public class TinyPBRpcController {
-}
diff --git a/src/main/java/com/iker/tinyrpc/net/TcpClient.java b/src/main/java/com/iker/tinyrpc/net/TcpClient.java
index d461ac4..41f379a 100644
--- a/src/main/java/com/iker/tinyrpc/net/TcpClient.java
+++ b/src/main/java/com/iker/tinyrpc/net/TcpClient.java
@@ -1,6 +1,6 @@
package com.iker.tinyrpc.net;
-import com.iker.tinyrpc.protocol.AbstractProtocol;
+import com.iker.tinyrpc.net.rpc.protocol.RpcProtocol;
import com.iker.tinyrpc.util.TinyRpcSystemException;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@@ -12,8 +12,9 @@
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
+import java.util.Optional;
-import static com.iker.tinyrpc.util.TinyPBErrorCode.ERROR_FAILED_CONNECT;
+import static com.iker.tinyrpc.util.TinyRpcErrorCode.ERROR_FAILED_CONNECT;
@Slf4j
public class TcpClient {
@@ -32,64 +33,61 @@ public class TcpClient {
private ChannelFuture connectChannelFuture;
- public TcpClient(EventLoopGroup eventLoopGroup) {
- this.eventLoopGroup = eventLoopGroup;
- }
-
public TcpClient(InetSocketAddress peerAddress, EventLoopGroup eventLoopGroup) {
this.peerAddress = peerAddress;
this.eventLoopGroup = eventLoopGroup;
}
- public void connect() throws InterruptedException, TinyRpcSystemException {
- if (this.peerAddress == null) {
- throw new TinyRpcSystemException("connect failed, not set peerAddress");
- }
- connect(this.peerAddress);
- }
-
- public void connect(InetSocketAddress peerAddress) throws TinyRpcSystemException {
- if (this.peerAddress != null) {
- throw new TinyRpcSystemException("init client failed, peerAddress has already set");
- }
- this.peerAddress = peerAddress;
-
+ public ChannelFuture connect() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
- .remoteAddress(peerAddress)
+ .remoteAddress(Optional.ofNullable(peerAddress).orElseThrow(
+ () -> { throw new TinyRpcSystemException(ERROR_FAILED_CONNECT, "peer address is null"); }
+ ))
.handler(new TcpClientChannelInitializer());
-
connectChannelFuture = bootstrap.connect();
channel = connectChannelFuture.channel();
connectChannelFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
- this.peerAddress = (InetSocketAddress) future.channel().remoteAddress();
- this.localAddress = (InetSocketAddress) future.channel().localAddress();
+ peerAddress = (InetSocketAddress) future.channel().remoteAddress();
+ localAddress = (InetSocketAddress) future.channel().localAddress();
log.info(String.format("success connect to remoteAddr[%s:%d], localAddr[%s:%d]",
- this.peerAddress.getHostName(), this.peerAddress.getPort(),
- this.localAddress.getHostName(), this.localAddress.getPort()));
+ peerAddress.getHostName(), peerAddress.getPort(),
+ localAddress.getHostName(), localAddress.getPort()));
} else {
- log.error("connect failed, stack info:");
- future.cause().printStackTrace();
- throw new TinyRpcSystemException(future.cause().getMessage());
+ log.error(String.format("connect failed, peer addr[%s:%d]", peerAddress.getHostName(), peerAddress.getPort()));
+ log.error("exception: ", future.cause());
}
});
+ return connectChannelFuture;
}
- public ChannelFuture sendMessage(AbstractProtocol protocol) {
+ public ChannelFuture sendMessage(RpcProtocol protocol) {
if (!connectChannelFuture.isDone()) {
try {
connectChannelFuture.sync();
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ log.error("exception ", e);
+ throw new TinyRpcSystemException(e.getMessage());
}
}
if (!channel.isActive()) {
- throw new TinyRpcSystemException(ERROR_FAILED_CONNECT, "sendMessage error, connection is not active");
+ throw new TinyRpcSystemException(ERROR_FAILED_CONNECT, String.format("sendMessage error, connection[%s:%d] is not active", peerAddress.getHostName(), peerAddress.getPort()));
}
- return channel.writeAndFlush(protocol);
+ return channel.writeAndFlush(protocol).addListener( future -> {
+ if (future.isSuccess()) {
+ log.info(String.format("success send protocol message[%s] to remoteAddr[%s:%d]", protocol.getMsgReq(), peerAddress.getHostName(), peerAddress.getPort()));
+ } else {
+ log.error(String.format("failed send protocol message[%s] to remoteAddr[%s:%d]", protocol.getMsgReq(), peerAddress.getHostName(), peerAddress.getPort()));
+ log.error("exception: ", future.cause());
+ }
+ });
+ }
+
+ public void awaitResponseWithTimeout(String msgReq, int timeout) throws InterruptedException {
+// channel.
}
}
diff --git a/src/main/java/com/iker/tinyrpc/net/TcpClientChannelInboundHandler.java b/src/main/java/com/iker/tinyrpc/net/TcpClientChannelInboundHandler.java
index 774ab46..2ecc50a 100644
--- a/src/main/java/com/iker/tinyrpc/net/TcpClientChannelInboundHandler.java
+++ b/src/main/java/com/iker/tinyrpc/net/TcpClientChannelInboundHandler.java
@@ -1,9 +1,14 @@
package com.iker.tinyrpc.net;
+import com.iker.tinyrpc.net.rpc.RpcFutureMap;
+import com.iker.tinyrpc.net.rpc.protocol.RpcProtocol;
+import com.iker.tinyrpc.util.SpringContextUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
+import java.util.Optional;
+
@Slf4j
public class TcpClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
@@ -49,6 +54,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ log.debug("channelInactive{}", ctx.channel().remoteAddress());
super.channelInactive(ctx);
}
@@ -59,37 +65,16 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ log.debug("channelRead{}", ctx.channel().remoteAddress().toString());
+ Optional.ofNullable((RpcProtocol) msg).ifPresent(
+ (protocol) -> {
+ log.info(String.format("success get reply protocol of msgReq [%s]", protocol.getMsgReq()));
+ SpringContextUtil.getApplicationContext().getBean(RpcFutureMap.class).getFuture(protocol.getMsgReq()).invoke(protocol);
+ }
+ );
super.channelRead(ctx, msg);
}
- /**
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- super.channelReadComplete(ctx);
- }
-
- /**
- * @param ctx
- * @param evt
- * @throws Exception
- */
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- super.userEventTriggered(ctx, evt);
- }
-
- /**
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
- super.channelWritabilityChanged(ctx);
- }
-
/**
* @param ctx
* @param cause
diff --git a/src/main/java/com/iker/tinyrpc/net/TcpClientChannelInitializer.java b/src/main/java/com/iker/tinyrpc/net/TcpClientChannelInitializer.java
index 5a842fc..c33a3a1 100644
--- a/src/main/java/com/iker/tinyrpc/net/TcpClientChannelInitializer.java
+++ b/src/main/java/com/iker/tinyrpc/net/TcpClientChannelInitializer.java
@@ -1,7 +1,7 @@
package com.iker.tinyrpc.net;
-import com.iker.tinyrpc.codec.TinyPBDecoder;
-import com.iker.tinyrpc.codec.TinyPBEncoder;
+import com.iker.tinyrpc.net.rpc.protocol.tinypb.TinyPBDecoder;
+import com.iker.tinyrpc.net.rpc.protocol.tinypb.TinyPBEncoder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
@@ -15,9 +15,10 @@ public class TcpClientChannelInitializer extends ChannelInitializer {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(75, 75, 75, TimeUnit.SECONDS))
- .addLast("decoder", new TinyPBDecoder())
- .addLast("inboundHandlerAdapter", new TcpClientChannelInboundHandler())
- .addLast("encoder", new TinyPBEncoder())
+ .addLast("tinyPBDecoder", new TinyPBDecoder())
+ .addLast("inBoundHandler", new TcpClientChannelInboundHandler())
+ .addLast("tinyPBEncoder", new TinyPBEncoder())
+ .addLast("outBoundHandler", new TcpClientChannelOutboundHandler())
.addLast("exceptionHandler", new TcpServerExceptionHandler());
}
}
diff --git a/src/main/java/com/iker/tinyrpc/net/TcpClientChannelOutboundHandler.java b/src/main/java/com/iker/tinyrpc/net/TcpClientChannelOutboundHandler.java
new file mode 100644
index 0000000..82335d6
--- /dev/null
+++ b/src/main/java/com/iker/tinyrpc/net/TcpClientChannelOutboundHandler.java
@@ -0,0 +1,57 @@
+package com.iker.tinyrpc.net;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.SocketAddress;
+
+@Slf4j
+public class TcpClientChannelOutboundHandler extends ChannelOutboundHandlerAdapter {
+ public TcpClientChannelOutboundHandler() {
+ super();
+ }
+
+ @Override
+ public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
+ super.bind(ctx, localAddress, promise);
+ }
+
+ @Override
+ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
+ super.connect(ctx, remoteAddress, localAddress, promise);
+ }
+
+ @Override
+ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ super.disconnect(ctx, promise);
+ }
+
+ @Override
+ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ super.close(ctx, promise);
+ }
+
+ @Override
+ public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ super.deregister(ctx, promise);
+ }
+
+ @Override
+ public void read(ChannelHandlerContext ctx) throws Exception {
+ super.read(ctx);
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ log.info("begin to write");
+ super.write(ctx, msg, promise);
+ log.info("end to write");
+ }
+
+ @Override
+ public void flush(ChannelHandlerContext ctx) throws Exception {
+ super.flush(ctx);
+ }
+}
diff --git a/src/main/java/com/iker/tinyrpc/net/TcpServer.java b/src/main/java/com/iker/tinyrpc/net/TcpServer.java
index 3b7f0c3..3dbc230 100644
--- a/src/main/java/com/iker/tinyrpc/net/TcpServer.java
+++ b/src/main/java/com/iker/tinyrpc/net/TcpServer.java
@@ -1,6 +1,9 @@
package com.iker.tinyrpc.net;
-import com.iker.tinyrpc.util.TinyRpcSystemException;
+import com.iker.tinyrpc.annotation.AnnotationContextHandler;
+import com.iker.tinyrpc.annotation.TinyPBService;
+import com.iker.tinyrpc.net.rpc.protobuf.ProtobufRpcServiceFactory;
+import com.iker.tinyrpc.util.SpringContextUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@@ -11,64 +14,91 @@
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.stereotype.Component;
-import javax.annotation.Resource;
import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.Set;
@Slf4j
+@Component
public class TcpServer {
@Getter
- @Setter
private EventLoopGroup mainLoopGroup; // mainReactor
@Getter
- @Setter
private EventLoopGroup workerLoopGroup; // io subReactors
@Getter
- private InetSocketAddress localAddress;
+ @Setter
+ private InetSocketAddress localAddress; // listen addr
+
+ public void initMainLoopGroup(int size) {
+ mainLoopGroup = new NioEventLoopGroup(size);
+ }
- public void start(InetSocketAddress localAddress) throws InterruptedException, TinyRpcSystemException {
- try {
- if(this.localAddress != null) {
- throw new TinyRpcSystemException("TinyRPC TcpServer start error, local address has set.");
+ public void initWorkerLoopGroup(int size) {
+ workerLoopGroup = new NioEventLoopGroup(size);
+ }
+
+ public void registerService() {
+ AnnotationContextHandler annotationContextHandler = new AnnotationContextHandler("com.iker.tinyrpc");
+ Set> classSet = annotationContextHandler.scanAnnotation(TinyPBService.class);
+ for (Class> item : classSet) {
+ TinyPBService annotation = item.getAnnotation(TinyPBService.class);
+ String name = Optional.ofNullable(annotation).orElseThrow(
+ () -> { throw new RuntimeException("get TinyPBService annotation null"); }
+ ).serviceName();
+
+ // register name must be same as the service's name in protobuf file
+ if (name.isEmpty()) {
+ name = item.getSuperclass().getSimpleName();
}
- this.localAddress = localAddress;
-
- workerLoopGroup = new NioEventLoopGroup(4);
- mainLoopGroup = new NioEventLoopGroup(1);
-
- ServerBootstrap serverBootstrap = new ServerBootstrap()
- .group(mainLoopGroup, workerLoopGroup)
- .option(ChannelOption.SO_BACKLOG, 128)
- .channel(NioServerSocketChannel.class)
- .childHandler(new TcpServerChannelInitializer())
- .localAddress(localAddress);
-
- ChannelFuture channelFuture = serverBootstrap.bind().sync();
- channelFuture.addListener((ChannelFutureListener) future -> {
- if (future.isSuccess()) {
- InetSocketAddress address = (InetSocketAddress)future.channel().localAddress();
- assert (address != null);
- log.info(String.format("TinyRPC TcpServer start success, listen on [%s:%d]", getLocalAddress().getHostString(), getLocalAddress().getPort()));
- } else {
- log.error("TinyRPC TcpServer start error");
- future.cause().printStackTrace();
- throw new TinyRpcSystemException(future.cause().getMessage());
- }
- });
-
- // wait until close this channel
- channelFuture.channel().closeFuture().sync();
- log.info("TinyRPC quit success");
- } catch (InterruptedException e) {
- throw e;
- } finally {
- mainLoopGroup.shutdownGracefully().sync();
- workerLoopGroup.shutdownGracefully().sync();
+
+ // 1. register service to BeanFactory
+ GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
+ beanDefinition.setBeanClass(item);
+ SpringContextUtil.getBeanFactory().registerBeanDefinition(item.getName(), beanDefinition);
+
+ // 2. get this bean, then register to RpcServiceFactory
+ SpringContextUtil.getBean(ProtobufRpcServiceFactory.class).registerService(name, SpringContextUtil.getBean(item));
+
}
+ }
+
+ public void start() throws InterruptedException {
+
+ registerService();
+
+ ServerBootstrap serverBootstrap = new ServerBootstrap()
+ .group(mainLoopGroup, workerLoopGroup)
+ .option(ChannelOption.SO_BACKLOG, 128)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new TcpServerChannelInitializer())
+ .localAddress(localAddress);
+ ChannelFuture channelFuture = serverBootstrap.bind().sync();
+ channelFuture.addListener((ChannelFutureListener) future -> {
+ if(future.isSuccess()){
+ InetSocketAddress address = (InetSocketAddress) future.channel().localAddress();
+ assert (address != null);
+ log.info(String.format("TinyRPC TcpServer start success, listen on [%s:%d]", getLocalAddress().getHostString(), getLocalAddress().getPort()));
+ } else {
+ log.error("TinyRPC TcpServer start error");
+ log.error("exception:", future.cause());
+ throw new RuntimeException(future.cause().getMessage());
+ }
+ });
+
+ // wait until close this channel
+ channelFuture.channel().closeFuture().sync();
+ log.info("TinyRPC quit success");
+
+ mainLoopGroup.shutdownGracefully().sync();
+ workerLoopGroup.shutdownGracefully().sync();
}
+
+
}
diff --git a/src/main/java/com/iker/tinyrpc/net/TcpServerChannelInboundHandler.java b/src/main/java/com/iker/tinyrpc/net/TcpServerChannelInboundHandler.java
index c1d9484..b8afa2a 100644
--- a/src/main/java/com/iker/tinyrpc/net/TcpServerChannelInboundHandler.java
+++ b/src/main/java/com/iker/tinyrpc/net/TcpServerChannelInboundHandler.java
@@ -1,13 +1,14 @@
package com.iker.tinyrpc.net;
-import com.iker.tinyrpc.protocol.TinyPBProtocol;
-import com.iker.tinyrpc.util.TinyRpcSystemException;
+import com.iker.tinyrpc.net.rpc.protocol.tinypb.TinyPBRpcDispatcher;
+import com.iker.tinyrpc.net.rpc.protocol.tinypb.TinyPBProtocol;
+import com.iker.tinyrpc.util.SpringContextUtil;
import io.netty.channel.*;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
-
-import static com.iker.tinyrpc.util.TinyPBErrorCode.ERROR_FAILED_DECODE;
+import java.util.Optional;
+;
@Slf4j
@ChannelHandler.Sharable
@@ -90,13 +91,13 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
InetSocketAddress address = (InetSocketAddress)ctx.channel().remoteAddress();
log.info("channelRead, remote addr: " + address.getHostString());
- TinyPBProtocol protocol = (TinyPBProtocol) msg;
- if (protocol != null) {
- log.info(String.format("get protocol of msgReq [%s]", protocol.getMsgReq()));
- } else {
- throw new TinyRpcSystemException(ERROR_FAILED_DECODE, "failed get object");
- }
-// super.channelRead(ctx, msg);
+
+ Optional.ofNullable((TinyPBProtocol) msg).ifPresent(
+ (protocol) -> {
+ log.info(String.format("get protocol of msgReq [%s]", protocol.getMsgReq()));
+ SpringContextUtil.getApplicationContext().getBean(TinyPBRpcDispatcher.class).dispatch(protocol, ctx.channel());
+ }
+ );
}
@@ -157,8 +158,8 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- super.exceptionCaught(ctx, cause);
InetSocketAddress address = (InetSocketAddress)ctx.channel().remoteAddress();
log.info("exceptionCaught, remote addr: " + address.getHostString());
+ super.exceptionCaught(ctx, cause);
}
}
diff --git a/src/main/java/com/iker/tinyrpc/net/TcpServerChannelInitializer.java b/src/main/java/com/iker/tinyrpc/net/TcpServerChannelInitializer.java
index c64047d..8c0f372 100644
--- a/src/main/java/com/iker/tinyrpc/net/TcpServerChannelInitializer.java
+++ b/src/main/java/com/iker/tinyrpc/net/TcpServerChannelInitializer.java
@@ -1,7 +1,7 @@
package com.iker.tinyrpc.net;
-import com.iker.tinyrpc.codec.TinyPBDecoder;
-import com.iker.tinyrpc.codec.TinyPBEncoder;
+import com.iker.tinyrpc.net.rpc.protocol.tinypb.TinyPBDecoder;
+import com.iker.tinyrpc.net.rpc.protocol.tinypb.TinyPBEncoder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
@@ -17,8 +17,9 @@ public class TcpServerChannelInitializer extends ChannelInitializer {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(75, 75, 75, TimeUnit.SECONDS))
.addLast("tinyPBDecoder", new TinyPBDecoder())
+ .addLast("inBoundHandler", new TcpServerChannelInboundHandler())
.addLast("tinyPBEncoder", new TinyPBEncoder())
- .addLast("inboundHandler", new TcpServerChannelInboundHandler())
+ .addLast("outBoundHandler", new TcpServerChannelOutboundHandler())
.addLast("exceptionHandler", new TcpServerExceptionHandler());
}
}
diff --git a/src/main/java/com/iker/tinyrpc/net/TcpServerChannelOutboundHandler.java b/src/main/java/com/iker/tinyrpc/net/TcpServerChannelOutboundHandler.java
new file mode 100644
index 0000000..cf1b804
--- /dev/null
+++ b/src/main/java/com/iker/tinyrpc/net/TcpServerChannelOutboundHandler.java
@@ -0,0 +1,17 @@
+package com.iker.tinyrpc.net;
+
+import io.netty.channel.*;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class TcpServerChannelOutboundHandler extends ChannelOutboundHandlerAdapter {
+ public TcpServerChannelOutboundHandler() {
+ super();
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ super.write(ctx, msg, promise);
+ }
+
+}
diff --git a/src/main/java/com/iker/tinyrpc/net/future/RpcFuture.java b/src/main/java/com/iker/tinyrpc/net/future/RpcFuture.java
new file mode 100644
index 0000000..1608ca3
--- /dev/null
+++ b/src/main/java/com/iker/tinyrpc/net/future/RpcFuture.java
@@ -0,0 +1,10 @@
+package com.iker.tinyrpc.net.future;
+
+import java.util.concurrent.*;
+
+public interface RpcFuture extends Future {
+ String getId();
+
+ // to invoke the future who is call get()
+ void invoke(T object);
+}
diff --git a/src/main/java/com/iker/tinyrpc/net/future/RpcSyncFuture.java b/src/main/java/com/iker/tinyrpc/net/future/RpcSyncFuture.java
new file mode 100644
index 0000000..75b8276
--- /dev/null
+++ b/src/main/java/com/iker/tinyrpc/net/future/RpcSyncFuture.java
@@ -0,0 +1,144 @@
+package com.iker.tinyrpc.net.future;
+
+import com.iker.tinyrpc.net.rpc.protobuf.DefaultRpcCallback;
+import com.iker.tinyrpc.net.rpc.protocol.RpcProtocol;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Optional;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+@Slf4j
+public class RpcSyncFuture implements RpcFuture {
+
+ private static final int DEFAULT_TIMEOUT = 2000; // 默认超时时间,2000 ms
+
+ private final String id;
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
+
+ private final DefaultRpcCallback callback;
+
+ @Setter
+ private boolean done = false;
+
+ @Setter
+ private boolean canceled = false;
+
+ @Setter
+ private boolean needInvoke = false;
+
+ @Setter
+ private RpcProtocol reply;
+
+ public RpcSyncFuture(String id, DefaultRpcCallback callback) {
+ this.id = id;
+ this.callback = callback;
+ }
+
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (canceled) {
+ return true;
+ }
+ canceled = true;
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return canceled;
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ public RpcProtocol get() throws InterruptedException, ExecutionException {
+ return get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+
+ public RpcProtocol get(long timeout) throws InterruptedException, ExecutionException {
+ return get(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public RpcProtocol get(long timeout, TimeUnit unit) {
+ if (done) {
+ return reply;
+ }
+
+ lock.lock();
+ needInvoke = true;
+ if (reply != null) {
+ return reply;
+ }
+ try {
+ while (!done) {
+ if(condition.await(timeout, unit)){
+ // invoked by other thread
+ log.info("RpcFuture:{} invoked", id);
+ } else {
+ // timeout
+ log.info("RpcFuture:{} timeout", id);
+ }
+ }
+ } catch (InterruptedException e) {
+ log.error("InterruptedException", e);
+ } finally {
+ lock.unlock();
+ }
+
+ runCallback();
+
+ return reply;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void invoke(RpcProtocol object) {
+ if (done) {
+ return;
+ }
+ // 是否需要 notify 原线程
+ // 如果主调线程调用了 get, 就需要 notify
+ // 否则直接当前线程执行回调即可
+ if (isNeedInvoke()){
+ lock.lock();
+
+ reply = object;
+
+ setDone(true);
+
+ condition.signal();
+
+ lock.unlock();
+
+ } else {
+ runCallback();
+ }
+ }
+
+ private boolean isNeedInvoke () {
+ return needInvoke;
+ }
+
+ private void runCallback() {
+ Optional.ofNullable(callback).ifPresent(
+ DefaultRpcCallback::run
+ );
+ }
+}
diff --git a/src/main/java/com/iker/tinyrpc/net/rpc/AbstractRpcDispatcher.java b/src/main/java/com/iker/tinyrpc/net/rpc/RpcDispatcher.java
similarity index 62%
rename from src/main/java/com/iker/tinyrpc/net/rpc/AbstractRpcDispatcher.java
rename to src/main/java/com/iker/tinyrpc/net/rpc/RpcDispatcher.java
index 3600567..f630fdf 100644
--- a/src/main/java/com/iker/tinyrpc/net/rpc/AbstractRpcDispatcher.java
+++ b/src/main/java/com/iker/tinyrpc/net/rpc/RpcDispatcher.java
@@ -1,13 +1,14 @@
package com.iker.tinyrpc.net.rpc;
-import com.iker.tinyrpc.protocol.AbstractProtocol;
+import com.iker.tinyrpc.net.rpc.protocol.RpcProtocol;
import com.iker.tinyrpc.util.TinyRpcSystemException;
+import io.netty.channel.Channel;
-public abstract class AbstractRpcDispatcher {
+public interface RpcDispatcher {
/**
* @param protocol protocol object, such as TinyPBProtocol
* @throws TinyRpcSystemException
* To dispatch rpc request according protocol, so that can server call designated method do business things, and reply to client
*/
- abstract void dispatch(AbstractProtocol protocol) throws TinyRpcSystemException;
+ void dispatch(RpcProtocol protocol, Channel channel) throws TinyRpcSystemException;
}
diff --git a/src/main/java/com/iker/tinyrpc/net/rpc/RpcFutureMap.java b/src/main/java/com/iker/tinyrpc/net/rpc/RpcFutureMap.java
new file mode 100644
index 0000000..a06fd9d
--- /dev/null
+++ b/src/main/java/com/iker/tinyrpc/net/rpc/RpcFutureMap.java
@@ -0,0 +1,40 @@
+package com.iker.tinyrpc.net.rpc;
+
+import com.iker.tinyrpc.net.future.RpcFuture;
+import com.iker.tinyrpc.net.rpc.protocol.RpcProtocol;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Component
+public class RpcFutureMap {
+
+ private final Map> rpcResponseMap = new HashMap<>();
+
+ public void addFuture(RpcFuture future) {
+ synchronized (rpcResponseMap) {
+ rpcResponseMap.put(future.getId(), future);
+ }
+
+ }
+
+ public RpcFuture getFuture(String key) {
+ synchronized (rpcResponseMap) {
+ if (rpcResponseMap.containsKey(key)) {
+ return rpcResponseMap.get(key);
+ }
+ return null;
+ }
+ }
+
+ public boolean deleteFuture(String key) {
+ synchronized (rpcResponseMap) {
+ if (rpcResponseMap.containsKey(key)) {
+ rpcResponseMap.remove(key);
+ return true;
+ }
+ return false;
+ }
+ }
+}
diff --git a/src/main/java/com/iker/tinyrpc/net/rpc/RpcServiceFactory.java b/src/main/java/com/iker/tinyrpc/net/rpc/RpcServiceFactory.java
deleted file mode 100644
index 91d7464..0000000
--- a/src/main/java/com/iker/tinyrpc/net/rpc/RpcServiceFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package com.iker.tinyrpc.net.rpc;
-
-import com.iker.tinyrpc.util.TinyRpcSystemException;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-import java.util.Map;
-import java.util.Optional;
-
-@Component
-@Slf4j
-public class RpcServiceFactory {
-
- @Resource
- private Map rpcServiceMap;
-
- public void registerService(String key, Object object) {
- if (rpcServiceMap.containsKey(key)) {
- throw new TinyRpcSystemException(String.format("registerService error, key %s exist", key));
- }
- rpcServiceMap.put(key, object);
- log.info(String.format("register %s success", key));
- }
-
- public Optional