Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# tinyrpc-java
15 changes: 5 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,6 @@
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
Expand All @@ -73,6 +63,11 @@
<artifactId>protobuf-java</artifactId>
<version>3.19.4</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.11</version>
</dependency>
</dependencies>

<build>
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/com/iker/tinyrpc/TinyRpcInitializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.iker.tinyrpc;

import com.iker.tinyrpc.util.SpringContextUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Arrays;

@Component
@Order(value = 1)
@Slf4j
public class TinyRpcInitializer implements CommandLineRunner {

@Getter
@Resource
private AnnotationConfigApplicationContext applicationContext;

@Override
public void run(String... args) throws Exception {
SpringContextUtil.setApplicationContext(applicationContext);
log.info(String.valueOf(applicationContext.getClass()));
String[] beans = applicationContext.getBeanDefinitionNames();
Arrays.sort(beans);
for (String bean : beans) {
System.out.println(bean + " of Type :: " + applicationContext.getBean(bean).getClass());
}
}
}
23 changes: 3 additions & 20 deletions src/main/java/com/iker/tinyrpc/TinyRpcJavaApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,14 @@

@SpringBootApplication
@Slf4j
public class TinyRpcJavaApplication implements CommandLineRunner {
public class TinyRpcJavaApplication {

public static void main(String[] args) {
log.info("TinyRpcJavaApplication run begin");
ConfigurableApplicationContext context = SpringApplication.run(TinyRpcJavaApplication.class, args);
SpringContextUtil.setApplicationContext(context);
SpringApplication.run(TinyRpcJavaApplication.class, args);
log.info("TinyRpcJavaApplication run end");
}

@Getter
@Resource
private ApplicationContext applicationContext;


/**
* @param args incoming main method arguments
* @throws Exception
*/
@Override
public void run(String... args) throws Exception {
// String[] beans = applicationContext.getBeanDefinitionNames();
// Arrays.sort(beans);
// for (String bean : beans)
// {
// System.out.println(bean + " of Type :: " + applicationContext.getBean(bean).getClass());
// }
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.iker.tinyrpc.annotation;

import lombok.Getter;
import org.reflections.Reflections;

import java.lang.annotation.Annotation;
import java.util.Set;

public class AnnotationContextHandler {

@Getter
private final String packageName;

private final Reflections reflections;

public AnnotationContextHandler(String packageName) {
this.packageName = packageName;
reflections = new Reflections(packageName);
}

public Set<Class<?>> scanAnnotation(Class<? extends Annotation> clazz) {
return reflections.getTypesAnnotatedWith(clazz);

// for (Class<?> item : classSet) {
// TinyPBService annotation = item.getAnnotation(TinyPBService.class);
// String name = Optional.ofNullable(annotation).<RuntimeException>orElseThrow(
// () -> { throw new RuntimeException("get TinyPBService annotation null"); }
// ).name();
//
// if (name.isEmpty()) {
// name = item.getSuperclass().getSimpleName();
// }
//
// try {
// SpringContextUtil.getBean("rpcServiceFactory", ProtobufRpcServiceFactory.class).registerService(name, item.newInstance());
// } catch (InstantiationException | IllegalAccessException e) {
// throw new RuntimeException(e);
// }
//
// }
}



}
10 changes: 10 additions & 0 deletions src/main/java/com/iker/tinyrpc/annotation/TinyPBService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.iker.tinyrpc.annotation;

import java.lang.annotation.*;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TinyPBService {
String serviceName() default "";
}

This file was deleted.

62 changes: 30 additions & 32 deletions src/main/java/com/iker/tinyrpc/net/TcpClient.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.iker.tinyrpc.net;

import com.iker.tinyrpc.protocol.AbstractProtocol;
import com.iker.tinyrpc.net.rpc.protocol.RpcProtocol;
import com.iker.tinyrpc.util.TinyRpcSystemException;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
Expand All @@ -12,8 +12,9 @@
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.Optional;

import static com.iker.tinyrpc.util.TinyPBErrorCode.ERROR_FAILED_CONNECT;
import static com.iker.tinyrpc.util.TinyRpcErrorCode.ERROR_FAILED_CONNECT;

@Slf4j
public class TcpClient {
Expand All @@ -32,64 +33,61 @@ public class TcpClient {

private ChannelFuture connectChannelFuture;

public TcpClient(EventLoopGroup eventLoopGroup) {
this.eventLoopGroup = eventLoopGroup;
}

public TcpClient(InetSocketAddress peerAddress, EventLoopGroup eventLoopGroup) {
this.peerAddress = peerAddress;
this.eventLoopGroup = eventLoopGroup;
}

public void connect() throws InterruptedException, TinyRpcSystemException {
if (this.peerAddress == null) {
throw new TinyRpcSystemException("connect failed, not set peerAddress");
}
connect(this.peerAddress);
}

public void connect(InetSocketAddress peerAddress) throws TinyRpcSystemException {
if (this.peerAddress != null) {
throw new TinyRpcSystemException("init client failed, peerAddress has already set");
}
this.peerAddress = peerAddress;

public ChannelFuture connect() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.remoteAddress(peerAddress)
.remoteAddress(Optional.ofNullable(peerAddress).<TinyRpcSystemException>orElseThrow(
() -> { throw new TinyRpcSystemException(ERROR_FAILED_CONNECT, "peer address is null"); }
))
.handler(new TcpClientChannelInitializer());

connectChannelFuture = bootstrap.connect();
channel = connectChannelFuture.channel();
connectChannelFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
this.peerAddress = (InetSocketAddress) future.channel().remoteAddress();
this.localAddress = (InetSocketAddress) future.channel().localAddress();
peerAddress = (InetSocketAddress) future.channel().remoteAddress();
localAddress = (InetSocketAddress) future.channel().localAddress();
log.info(String.format("success connect to remoteAddr[%s:%d], localAddr[%s:%d]",
this.peerAddress.getHostName(), this.peerAddress.getPort(),
this.localAddress.getHostName(), this.localAddress.getPort()));
peerAddress.getHostName(), peerAddress.getPort(),
localAddress.getHostName(), localAddress.getPort()));
} else {
log.error("connect failed, stack info:");
future.cause().printStackTrace();
throw new TinyRpcSystemException(future.cause().getMessage());
log.error(String.format("connect failed, peer addr[%s:%d]", peerAddress.getHostName(), peerAddress.getPort()));
log.error("exception: ", future.cause());
}
});
return connectChannelFuture;
}


public ChannelFuture sendMessage(AbstractProtocol protocol) {
public ChannelFuture sendMessage(RpcProtocol protocol) {
if (!connectChannelFuture.isDone()) {
try {
connectChannelFuture.sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
log.error("exception ", e);
throw new TinyRpcSystemException(e.getMessage());
}
}
if (!channel.isActive()) {
throw new TinyRpcSystemException(ERROR_FAILED_CONNECT, "sendMessage error, connection is not active");
throw new TinyRpcSystemException(ERROR_FAILED_CONNECT, String.format("sendMessage error, connection[%s:%d] is not active", peerAddress.getHostName(), peerAddress.getPort()));
}
return channel.writeAndFlush(protocol);
return channel.writeAndFlush(protocol).addListener( future -> {
if (future.isSuccess()) {
log.info(String.format("success send protocol message[%s] to remoteAddr[%s:%d]", protocol.getMsgReq(), peerAddress.getHostName(), peerAddress.getPort()));
} else {
log.error(String.format("failed send protocol message[%s] to remoteAddr[%s:%d]", protocol.getMsgReq(), peerAddress.getHostName(), peerAddress.getPort()));
log.error("exception: ", future.cause());
}
});
}

public void awaitResponseWithTimeout(String msgReq, int timeout) throws InterruptedException {
// channel.
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package com.iker.tinyrpc.net;

import com.iker.tinyrpc.net.rpc.RpcFutureMap;
import com.iker.tinyrpc.net.rpc.protocol.RpcProtocol;
import com.iker.tinyrpc.util.SpringContextUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;


@Slf4j
public class TcpClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
Expand Down Expand Up @@ -49,6 +54,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("channelInactive{}", ctx.channel().remoteAddress());
super.channelInactive(ctx);
}

Expand All @@ -59,37 +65,16 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("channelRead{}", ctx.channel().remoteAddress().toString());
Optional.ofNullable((RpcProtocol) msg).ifPresent(
(protocol) -> {
log.info(String.format("success get reply protocol of msgReq [%s]", protocol.getMsgReq()));
SpringContextUtil.getApplicationContext().getBean(RpcFutureMap.class).getFuture(protocol.getMsgReq()).invoke(protocol);
}
);
super.channelRead(ctx, msg);
}

/**
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}

/**
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
}

/**
* @param ctx
* @throws Exception
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
}

/**
* @param ctx
* @param cause
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.iker.tinyrpc.net;

import com.iker.tinyrpc.codec.TinyPBDecoder;
import com.iker.tinyrpc.codec.TinyPBEncoder;
import com.iker.tinyrpc.net.rpc.protocol.tinypb.TinyPBDecoder;
import com.iker.tinyrpc.net.rpc.protocol.tinypb.TinyPBEncoder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
Expand All @@ -15,9 +15,10 @@ public class TcpClientChannelInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(75, 75, 75, TimeUnit.SECONDS))
.addLast("decoder", new TinyPBDecoder())
.addLast("inboundHandlerAdapter", new TcpClientChannelInboundHandler())
.addLast("encoder", new TinyPBEncoder())
.addLast("tinyPBDecoder", new TinyPBDecoder())
.addLast("inBoundHandler", new TcpClientChannelInboundHandler())
.addLast("tinyPBEncoder", new TinyPBEncoder())
.addLast("outBoundHandler", new TcpClientChannelOutboundHandler())
.addLast("exceptionHandler", new TcpServerExceptionHandler());
}
}
Loading