为了避免依赖的问题,我采用了maven创建项目
下面是依赖的包
<dependency> <groupId>com.linkedin</groupId> <artifactId>norbert_2.8.1</artifactId> <version>0.6.12</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.4.0a</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.3.3</version> </dependency>
先上一个norbert官方的例子,官方的缺少protoc生成消息格式类的介绍,具体生成办法请参考 我的另一篇文章<protobuf介绍>
class PingSerializer implements Serializer<Ping, Ping> {
public String requestName() {
return "ping";
}
public String responseName() {
return "pong";
}
public byte[] requestToBytes(Ping message) {
return NorbertExampleProtos.Ping.newBuilder().setTimestamp(message.timestamp).build().toByteArray();
}
public Ping requestFromBytes(byte[] bytes) {
try {
return new Ping(NorbertExampleProtos.Ping.newBuilder().mergeFrom(bytes).build().getTimestamp());
} catch (InvalidProtocolBufferException e) {
System.out.println("Invalid protocol buffer exception " + e.getMessage());
throw new IllegalArgumentException(e);
}
}
public byte[] responseToBytes(Ping message) {
return requestToBytes(message);
}
public Ping responseFromBytes(byte[] bytes) {
return requestFromBytes(bytes);
}
}
必须要实现 serializer接口,这是使用netty传输时对内容进行转换的 NorbertExampleProtos这个类是通过.proto文件生成的,具体生成办法请参考 我的另一篇文章<protobuf介绍> 下面是客户端和服务端的写法, 直接运行就可以看到结果, 据我查到的情况是 norbert能支持1000并发以上没问题。这个对企业大并发还不是很了解,不评论
import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;import proto.Request; import proto.RequestResponseSerializer; import proto.Response;
import com.linkedin.norbert.cluster.InvalidClusterException; import com.linkedin.norbert.javacompat.cluster.ClusterClient; import com.linkedin.norbert.javacompat.cluster.ClusterListener; import com.linkedin.norbert.javacompat.cluster.Node; import com.linkedin.norbert.javacompat.cluster.ZooKeeperClusterClient; import com.linkedin.norbert.javacompat.network.Endpoint; import com.linkedin.norbert.javacompat.network.LoadBalancer; import com.linkedin.norbert.javacompat.network.LoadBalancerFactory; import com.linkedin.norbert.javacompat.network.NettyNetworkClient; import com.linkedin.norbert.javacompat.network.NettyNetworkServer; import com.linkedin.norbert.javacompat.network.NetworkClient; import com.linkedin.norbert.javacompat.network.NetworkClientConfig; import com.linkedin.norbert.javacompat.network.NetworkServer; import com.linkedin.norbert.javacompat.network.NetworkServerConfig; import com.linkedin.norbert.javacompat.network.RequestHandler; import com.linkedin.norbert.javacompat.network.RoundRobinLoadBalancerFactory;
public class Start { public static void main(String[] args) { final String serviceName = "norbert"; final String zkConnectStr = "localhost:2181";
configCluster(serviceName, zkConnectStr); startServer(serviceName, 1, zkConnectStr); startServer(serviceName, 2, zkConnectStr); NetworkClientConfig config = new NetworkClientConfig(); config.setServiceName(serviceName); config.setZooKeeperConnectString(zkConnectStr); config.setZooKeeperSessionTimeoutMillis(30000); config.setConnectTimeoutMillis(1000); config.setWriteTimeoutMillis(150); config.setMaxConnectionsPerNode(5); config.setStaleRequestTimeoutMins(10); config.setStaleRequestCleanupFrequencyMins(10); final LoadBalancerFactory myLB = new LoadBalancerFactory() { @Override public LoadBalancer newLoadBalancer(final Set<Endpoint> endpoints) throws InvalidClusterException { return new LoadBalancer() { @Override public Node nextNode() { return endpoints.iterator().next().getNode(); } }; } }; final NetworkClient nc = new NettyNetworkClient(config, new RoundRobinLoadBalancerFactory()); //PartitionedNetworkClient<Integer> nc = new NettyPartitionedNetworkClient<Integer>(config, new IntegerConsistentHashPartitionedLoadBalancerFactory()); //nc.registerRequest(NetqProtocol.AppendReq.getDefaultInstance(), NetqProtocol.AppendResp.getDefaultInstance()); for(int index = 0;index<10;index++){ new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10;i++){ final Request request = new Request(5+i); System.out.println(Thread.currentThread().getName()+"-client request at "+(5+i)); Future<Response> pingFuture = nc.sendRequest(request, new RequestResponseSerializer()); try { final Response appendResp = pingFuture.get(); System.out.println(Thread.currentThread().getName()+"-client got ping resp: " + appendResp.total); } catch( InterruptedException e ) { e.printStackTrace(); } catch( ExecutionException e ) { e.printStackTrace(); } } } }).start(); } }
private static void startServer(String serviceName,final int nodeId, String zkConnectStr)
{
NetworkServerConfig config = new NetworkServerConfig();
config.setServiceName(serviceName);
config.setZooKeeperConnectString(zkConnectStr);
config.setZooKeeperSessionTimeoutMillis(30000);
config.setRequestThreadCorePoolSize(5);
config.setRequestThreadMaxPoolSize(10);
config.setRequestThreadKeepAliveTimeSecs(300);
NetworkServer ns = new NettyNetworkServer(config);
ns.registerHandler(new RequestHandler<Request, Response>()
{
@Override
public Response handleRequest(Request request) throws Exception
{
System.out.println("server_"+nodeId+":run at "+request.num);
return new Response(request.num+10);
}
}, new RequestResponseSerializer());
ns.bind(nodeId);
}
private static void configCluster(String serviceName, String zkConnectStr)
{
//ClusterClient cc = new InMemoryClusterClient("norbert");//, "localhost:2181", 30000);
final ClusterClient cc = new ZooKeeperClusterClient(serviceName, zkConnectStr, 30000);
cc.awaitConnectionUninterruptibly();
cc.addListener(new ClusterListener()
{
@Override
public void handleClusterConnected(Set<Node> nodes)
{
System.out.println("connected to cluster: " + nodes);
}
@Override
public void handleClusterNodesChanged(Set<Node> nodes)
{
System.out.println("nodes changed: ");
for( Node node : nodes )
{
System.out.println("node: " + node);
}
}
@Override
public void handleClusterDisconnected()
{
final Set<Node> nodes = cc.getNodes();
System.out.println("dis-connected from cluster: " + nodes);
}
@Override
public void handleClusterShutdown()
{
final Set<Node> nodes = cc.getNodes();
System.out.println("cluster shutdown: " + nodes);
}
});
cc.removeNode(1);
cc.removeNode(2);
cc.addNode(1, "localhost:5002");
cc.addNode(2, "localhost:5003");
// cc.markNodeAvailable(1);
// cc.shutdown();
}