为了避免依赖的问题,我采用了maven创建项目

下面是依赖的包


    <dependency>
  		<groupId>com.linkedin</groupId>
  		<artifactId>norbert_2.8.1</artifactId>
  		<version>0.6.12</version>
  	</dependency>
  	<dependency>
  		<groupId>org.scala-lang</groupId>
  		<artifactId>scala-library</artifactId>
  		<version>2.8.1</version>
  	</dependency>
  	<dependency>
  		<groupId>com.google.protobuf</groupId>
  		<artifactId>protobuf-java</artifactId>
  		<version>2.4.0a</version>
  	</dependency>
  	<dependency>
  		<groupId>org.apache.zookeeper</groupId>
  		<artifactId>zookeeper</artifactId>
  		<version>3.3.3</version>
  	</dependency>


  先上一个norbert官方的例子,官方的缺少protoc生成消息格式类的介绍,具体生成办法请参考 我的另一篇文章<protobuf介绍>

  

class PingSerializer implements Serializer<Ping, Ping> {
    public String requestName() {
      return "ping";
    }
public String responseName() {
  return "pong";
}

public byte[] requestToBytes(Ping message) {
  return NorbertExampleProtos.Ping.newBuilder().setTimestamp(message.timestamp).build().toByteArray();
}

public Ping requestFromBytes(byte[] bytes) {
  try {
    return new Ping(NorbertExampleProtos.Ping.newBuilder().mergeFrom(bytes).build().getTimestamp());
  } catch (InvalidProtocolBufferException e) {
     System.out.println("Invalid protocol buffer exception " + e.getMessage());
     throw new IllegalArgumentException(e);
  }
}

public byte[] responseToBytes(Ping message) {
  return requestToBytes(message);
}

public Ping responseFromBytes(byte[] bytes) {
  return requestFromBytes(bytes);
}

}

必须要实现 serializer接口,这是使用netty传输时对内容进行转换的 NorbertExampleProtos这个类是通过.proto文件生成的,具体生成办法请参考 我的另一篇文章<protobuf介绍> 下面是客户端和服务端的写法, 直接运行就可以看到结果, 据我查到的情况是 norbert能支持1000并发以上没问题。这个对企业大并发还不是很了解,不评论


import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import proto.Request; import proto.RequestResponseSerializer; import proto.Response;

import com.linkedin.norbert.cluster.InvalidClusterException; import com.linkedin.norbert.javacompat.cluster.ClusterClient; import com.linkedin.norbert.javacompat.cluster.ClusterListener; import com.linkedin.norbert.javacompat.cluster.Node; import com.linkedin.norbert.javacompat.cluster.ZooKeeperClusterClient; import com.linkedin.norbert.javacompat.network.Endpoint; import com.linkedin.norbert.javacompat.network.LoadBalancer; import com.linkedin.norbert.javacompat.network.LoadBalancerFactory; import com.linkedin.norbert.javacompat.network.NettyNetworkClient; import com.linkedin.norbert.javacompat.network.NettyNetworkServer; import com.linkedin.norbert.javacompat.network.NetworkClient; import com.linkedin.norbert.javacompat.network.NetworkClientConfig; import com.linkedin.norbert.javacompat.network.NetworkServer; import com.linkedin.norbert.javacompat.network.NetworkServerConfig; import com.linkedin.norbert.javacompat.network.RequestHandler; import com.linkedin.norbert.javacompat.network.RoundRobinLoadBalancerFactory;

public class Start { public static void main(String[] args) { final String serviceName = "norbert"; final String zkConnectStr = "localhost:2181";

    configCluster(serviceName, zkConnectStr);
    startServer(serviceName, 1, zkConnectStr);
    startServer(serviceName, 2, zkConnectStr);

    NetworkClientConfig config = new NetworkClientConfig();
    config.setServiceName(serviceName);
    config.setZooKeeperConnectString(zkConnectStr);
    config.setZooKeeperSessionTimeoutMillis(30000);
    config.setConnectTimeoutMillis(1000);
    config.setWriteTimeoutMillis(150);
    config.setMaxConnectionsPerNode(5);
    config.setStaleRequestTimeoutMins(10);
    config.setStaleRequestCleanupFrequencyMins(10);

    final LoadBalancerFactory myLB = new LoadBalancerFactory()
    {
        @Override
        public LoadBalancer newLoadBalancer(final Set&lt;Endpoint&gt; endpoints) throws InvalidClusterException
        {
            return new LoadBalancer()
            {
                @Override
                public Node nextNode()
                {
                    return endpoints.iterator().next().getNode();
                }
            };
        }
    };
    final NetworkClient nc = new NettyNetworkClient(config, new RoundRobinLoadBalancerFactory());
    //PartitionedNetworkClient&lt;Integer&gt; nc = new NettyPartitionedNetworkClient&lt;Integer&gt;(config, new IntegerConsistentHashPartitionedLoadBalancerFactory());

    //nc.registerRequest(NetqProtocol.AppendReq.getDefaultInstance(), NetqProtocol.AppendResp.getDefaultInstance());

    for(int index = 0;index&lt;10;index++){
	    new Thread(new Runnable() {
			@Override
			public void run() {
				for(int i=0;i&lt;10;i++){
			        final Request request = new Request(5+i);
			        System.out.println(Thread.currentThread().getName()+"-client request at "+(5+i));
			        Future&lt;Response&gt; pingFuture = nc.sendRequest(request, new RequestResponseSerializer());
			
			        try
			        {	
			            final Response appendResp = pingFuture.get();
			            System.out.println(Thread.currentThread().getName()+"-client got ping resp: " + appendResp.total);
			        }
			        catch( InterruptedException e )
			        {
			            e.printStackTrace();
			        }
			        catch( ExecutionException e )
			        {
			            e.printStackTrace();
			        }
		        }

				
			}
		}).start();    
   }
}

 
private static void startServer(String serviceName,final int nodeId, String zkConnectStr)
    {
        NetworkServerConfig config = new NetworkServerConfig();
        config.setServiceName(serviceName);
        config.setZooKeeperConnectString(zkConnectStr);
        config.setZooKeeperSessionTimeoutMillis(30000);
        config.setRequestThreadCorePoolSize(5);
        config.setRequestThreadMaxPoolSize(10);
        config.setRequestThreadKeepAliveTimeSecs(300);
    NetworkServer ns = new NettyNetworkServer(config);

    ns.registerHandler(new RequestHandler&lt;Request, Response&gt;()
    {
        @Override
        public Response handleRequest(Request request) throws Exception
        {
        	System.out.println("server_"+nodeId+":run at "+request.num);
            return new Response(request.num+10);
        }
    }, new RequestResponseSerializer());


    ns.bind(nodeId);
}

private static void configCluster(String serviceName, String zkConnectStr)
{
    //ClusterClient cc = new InMemoryClusterClient("norbert");//, "localhost:2181", 30000);
    final ClusterClient cc = new ZooKeeperClusterClient(serviceName, zkConnectStr, 30000);
    cc.awaitConnectionUninterruptibly();

    cc.addListener(new ClusterListener()
    {
        @Override
        public void handleClusterConnected(Set&lt;Node&gt; nodes)
        {
            System.out.println("connected to cluster: " + nodes);
        }

        @Override
        public void handleClusterNodesChanged(Set&lt;Node&gt; nodes)
        {
            System.out.println("nodes changed: ");
            for( Node node : nodes )
            {
                System.out.println("node: " + node);
            }
        }

        @Override
        public void handleClusterDisconnected()
        {
            final Set&lt;Node&gt; nodes = cc.getNodes();
            System.out.println("dis-connected from cluster: " + nodes);
        }

        @Override
        public void handleClusterShutdown()
        {
            final Set&lt;Node&gt; nodes = cc.getNodes();
            System.out.println("cluster shutdown: " + nodes);
        }
    });

    cc.removeNode(1);
    cc.removeNode(2);
    cc.addNode(1, "localhost:5002");
    cc.addNode(2, "localhost:5003");

// cc.markNodeAvailable(1); // cc.shutdown(); }