为了避免依赖的问题,我采用了maven创建项目
下面是依赖的包
<dependency> <groupId>com.linkedin</groupId> <artifactId>norbert_2.8.1</artifactId> <version>0.6.12</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.4.0a</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.3.3</version> </dependency>
先上一个norbert官方的例子,官方的缺少protoc生成消息格式类的介绍,具体生成办法请参考 我的另一篇文章<protobuf介绍>
class PingSerializer implements Serializer<Ping, Ping> { public String requestName() { return "ping"; }public String responseName() { return "pong"; } public byte[] requestToBytes(Ping message) { return NorbertExampleProtos.Ping.newBuilder().setTimestamp(message.timestamp).build().toByteArray(); } public Ping requestFromBytes(byte[] bytes) { try { return new Ping(NorbertExampleProtos.Ping.newBuilder().mergeFrom(bytes).build().getTimestamp()); } catch (InvalidProtocolBufferException e) { System.out.println("Invalid protocol buffer exception " + e.getMessage()); throw new IllegalArgumentException(e); } } public byte[] responseToBytes(Ping message) { return requestToBytes(message); } public Ping responseFromBytes(byte[] bytes) { return requestFromBytes(bytes); }
}
必须要实现 serializer接口,这是使用netty传输时对内容进行转换的 NorbertExampleProtos这个类是通过.proto文件生成的,具体生成办法请参考 我的另一篇文章<protobuf介绍> 下面是客户端和服务端的写法, 直接运行就可以看到结果, 据我查到的情况是 norbert能支持1000并发以上没问题。这个对企业大并发还不是很了解,不评论
import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;import proto.Request; import proto.RequestResponseSerializer; import proto.Response;
import com.linkedin.norbert.cluster.InvalidClusterException; import com.linkedin.norbert.javacompat.cluster.ClusterClient; import com.linkedin.norbert.javacompat.cluster.ClusterListener; import com.linkedin.norbert.javacompat.cluster.Node; import com.linkedin.norbert.javacompat.cluster.ZooKeeperClusterClient; import com.linkedin.norbert.javacompat.network.Endpoint; import com.linkedin.norbert.javacompat.network.LoadBalancer; import com.linkedin.norbert.javacompat.network.LoadBalancerFactory; import com.linkedin.norbert.javacompat.network.NettyNetworkClient; import com.linkedin.norbert.javacompat.network.NettyNetworkServer; import com.linkedin.norbert.javacompat.network.NetworkClient; import com.linkedin.norbert.javacompat.network.NetworkClientConfig; import com.linkedin.norbert.javacompat.network.NetworkServer; import com.linkedin.norbert.javacompat.network.NetworkServerConfig; import com.linkedin.norbert.javacompat.network.RequestHandler; import com.linkedin.norbert.javacompat.network.RoundRobinLoadBalancerFactory;
public class Start { public static void main(String[] args) { final String serviceName = "norbert"; final String zkConnectStr = "localhost:2181";
configCluster(serviceName, zkConnectStr); startServer(serviceName, 1, zkConnectStr); startServer(serviceName, 2, zkConnectStr); NetworkClientConfig config = new NetworkClientConfig(); config.setServiceName(serviceName); config.setZooKeeperConnectString(zkConnectStr); config.setZooKeeperSessionTimeoutMillis(30000); config.setConnectTimeoutMillis(1000); config.setWriteTimeoutMillis(150); config.setMaxConnectionsPerNode(5); config.setStaleRequestTimeoutMins(10); config.setStaleRequestCleanupFrequencyMins(10); final LoadBalancerFactory myLB = new LoadBalancerFactory() { @Override public LoadBalancer newLoadBalancer(final Set<Endpoint> endpoints) throws InvalidClusterException { return new LoadBalancer() { @Override public Node nextNode() { return endpoints.iterator().next().getNode(); } }; } }; final NetworkClient nc = new NettyNetworkClient(config, new RoundRobinLoadBalancerFactory()); //PartitionedNetworkClient<Integer> nc = new NettyPartitionedNetworkClient<Integer>(config, new IntegerConsistentHashPartitionedLoadBalancerFactory()); //nc.registerRequest(NetqProtocol.AppendReq.getDefaultInstance(), NetqProtocol.AppendResp.getDefaultInstance()); for(int index = 0;index<10;index++){ new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10;i++){ final Request request = new Request(5+i); System.out.println(Thread.currentThread().getName()+"-client request at "+(5+i)); Future<Response> pingFuture = nc.sendRequest(request, new RequestResponseSerializer()); try { final Response appendResp = pingFuture.get(); System.out.println(Thread.currentThread().getName()+"-client got ping resp: " + appendResp.total); } catch( InterruptedException e ) { e.printStackTrace(); } catch( ExecutionException e ) { e.printStackTrace(); } } } }).start(); } }
private static void startServer(String serviceName,final int nodeId, String zkConnectStr) { NetworkServerConfig config = new NetworkServerConfig(); config.setServiceName(serviceName); config.setZooKeeperConnectString(zkConnectStr); config.setZooKeeperSessionTimeoutMillis(30000); config.setRequestThreadCorePoolSize(5); config.setRequestThreadMaxPoolSize(10); config.setRequestThreadKeepAliveTimeSecs(300);NetworkServer ns = new NettyNetworkServer(config); ns.registerHandler(new RequestHandler<Request, Response>() { @Override public Response handleRequest(Request request) throws Exception { System.out.println("server_"+nodeId+":run at "+request.num); return new Response(request.num+10); } }, new RequestResponseSerializer()); ns.bind(nodeId); } private static void configCluster(String serviceName, String zkConnectStr) { //ClusterClient cc = new InMemoryClusterClient("norbert");//, "localhost:2181", 30000); final ClusterClient cc = new ZooKeeperClusterClient(serviceName, zkConnectStr, 30000); cc.awaitConnectionUninterruptibly(); cc.addListener(new ClusterListener() { @Override public void handleClusterConnected(Set<Node> nodes) { System.out.println("connected to cluster: " + nodes); } @Override public void handleClusterNodesChanged(Set<Node> nodes) { System.out.println("nodes changed: "); for( Node node : nodes ) { System.out.println("node: " + node); } } @Override public void handleClusterDisconnected() { final Set<Node> nodes = cc.getNodes(); System.out.println("dis-connected from cluster: " + nodes); } @Override public void handleClusterShutdown() { final Set<Node> nodes = cc.getNodes(); System.out.println("cluster shutdown: " + nodes); } }); cc.removeNode(1); cc.removeNode(2); cc.addNode(1, "localhost:5002"); cc.addNode(2, "localhost:5003");
// cc.markNodeAvailable(1); // cc.shutdown(); }