1. Watch事件监听1.1 一次性监听方式Watcher利用 Watcher 来对节点进行监听操作可以典型业务场景需要使用可考虑但一般情况不推荐使用。public class CuratorWatchTest { ​ private CuratorFramework client; ​ /** * 建立连接 */ Before public void testConnect(){ ​ /** * String connectString, 连接字符串 zk地址 端口 192.168.58.100:2181 * int sessionTimeoutMs, 会话超时时间 * int connectionTimeoutMs, 连接超时时间 * RetryPolicy retryPolicy 重试策略 */ //1. 第一种方式 RetryPolicy retryPolicy new ExponentialBackoffRetry(3000,10); ​ //2. 第二种方式 client CuratorFrameworkFactory.builder() .connectString(192.168.58.100:2181) .sessionTimeoutMs(60*1000) .connectionTimeoutMs(15*1000) .retryPolicy(retryPolicy) .namespace(mashibing) //当前程序创建目录的根目录 .build(); ​ client.start(); } ​ /** * 演示一次性监听 */ Test public void testOneListener() throws Exception { ​ byte[] data client.getData().usingWatcher(new Watcher() { Override public void process(WatchedEvent watchedEvent) { System.out.println(监听器 watchedEvent: watchedEvent); } }).forPath(/test); ​ System.out.println(监听节点内容 new String(data)); ​ while(true){ ​ } } After public void close(){ client.close(); } }上面这段代码对/test节点注册了一个 Watcher 监听事件并且返回当前节点的内容。后面进行两次数据变更实际上第二次变更时监听已经失效无法再次获得节点变动事件了。测试中控制台输出的信息如下1.2 Curator事件监听机制ZooKeeper 原生支持通过注册Watcher来进行事件监听但是其使用并不是特别方便需要开发人员自己反复注册Watcher比较繁琐。Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。ZooKeeper提供了三种WatcherNodeCache : 只是监听某一个特定的节点PathChildrenCache : 监控一个ZNode的子节点.TreeCache : 可以监控整个树上的所有节点类似于PathChildrenCache和NodeCache的组合1watch监听 NodeCache监听数据节点本身的变化。NodeCacheListener 来完成后续处理。public class CuratorWatchTest { /** * 演示 NodeCache : 给指定一个节点注册监听 */ Test public void testNodeCache() throws Exception { ​ //1. 创建NodeCache对象 NodeCache nodeCache new NodeCache(client, /app1); //监听的是 /mashibing和其子目录app1 ​ //2. 注册监听 nodeCache.getListenable().addListener(new NodeCacheListener() { Override public void nodeChanged() throws Exception { System.out.println(节点变化了。。。。。。); ​ //获取修改节点后的数据 byte[] data nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); ​ //3. 设置为true开启监听 nodeCache.start(true); ​ while(true){ ​ } } }NodeCache不仅可以监听节点内容变化还可以监听指定节点是否存在。如果原本节点不存在那么Cache就会在节点被创建时触发监听事件如果该节点被删除就无法再触发监听事件。2watch监听 PathChildrenCache/** * 演示 PathChildrenCache: 监听某个节点的所有子节点 */ Test public void testPathChildrenCache() throws Exception { ​ //1.创建监听器对象 (第三个参数表示缓存每次节点更新后的数据) PathChildrenCache pathChildrenCache new PathChildrenCache(client, /app2, true); ​ //2.绑定监听器 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println(子节点发生变化了。。。。。。); System.out.println(pathChildrenCacheEvent); ​ if(PathChildrenCacheEvent.Type.CHILD_UPDATED pathChildrenCacheEvent.getType()){ //更新子节点 System.out.println(子节点更新了); //在一个getData中有很多数据我们只拿data部分 byte[] data pathChildrenCacheEvent.getData().getData(); System.out.println(更新后的值为 new String(data)); ​ }else if(PathChildrenCacheEvent.Type.CHILD_ADDED pathChildrenCacheEvent.getType()){ //添加子节点 System.out.println(添加子节点); String path pathChildrenCacheEvent.getData().getPath(); System.out.println(子节点路径为 path); ​ }else if(PathChildrenCacheEvent.Type.CHILD_REMOVED pathChildrenCacheEvent.getType()){ //删除子节点 System.out.println(删除了子节点); String path pathChildrenCacheEvent.getData().getPath(); System.out.println(子节点路径为 path); } } }); ​ //3. 开启 pathChildrenCache.start(); ​ while(true){ ​ } }事件对象信息分析PathChildrenCacheEvent{ typeCHILD_UPDATED, dataChildData { path/app2/m1, stat164,166,1670114647087,1670114698259,1,0,0,0,3,0,164, data[49, 50, 51] } }3watch监听 TreeCacheTreeCache相当于NodeCache只监听当前结点 PathChildrenCache只监听子结点的结合版即监听当前和子结点。/** * 演示 TreeCache: 监听某个节点的所有子节点 */ Test public void testCache() throws Exception { ​ //1.创建监听器对象 TreeCache treeCache new TreeCache(client, /app2); ​ //2.绑定监听器 treeCache.getListenable().addListener(new TreeCacheListener() { Override public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { System.out.println(节点变化了); System.out.println(treeCacheEvent); ​ if(TreeCacheEvent.Type.NODE_UPDATED treeCacheEvent.getType()){ //更新节点 System.out.println(节点更新了); //在一个getData中有很多数据我们只拿data部分 byte[] data treeCacheEvent.getData().getData(); System.out.println(更新后的值为 new String(data)); ​ }else if(TreeCacheEvent.Type.NODE_ADDED treeCacheEvent.getType()){ //添加子节点 System.out.println(添加节点); String path treeCacheEvent.getData().getPath(); System.out.println(子节点路径为 path); ​ }else if(TreeCacheEvent.Type.NODE_REMOVED treeCacheEvent.getType()){ //删除子节点 System.out.println(删除节点); String path treeCacheEvent.getData().getPath(); System.out.println(删除节点路径为 path); } } }); ​ //3. 开启 treeCache.start(); ​ while(true){ ​ } }2. 事务异步操作演示2.1 事务演示CuratorFramework 的实例包含 inTransaction( ) 接口方法调用此方法开启一个 ZooKeeper 事务。可以复合create、 setData、 check、and/or delete 等操作然后调用 commit() 作为一个原子操作提交。/** * 事务操作 */ Test public void TestTransaction() throws Exception { ​ //1. 创建Curator对象用于定义事务操作 CuratorOp createOp client.transactionOp().create().forPath(/app3, app1-data.getBytes()); CuratorOp setDataOp client.transactionOp().setData().forPath(/app2, app2-data.getBytes()); CuratorOp deleteOp client.transactionOp().delete().forPath(/app2); ​ //2. 添加事务操 CollectionCuratorTransactionResult results client.transaction().forOperations(createOp, setDataOp, deleteOp); ​ //3. 遍历事务操作结果 for (CuratorTransactionResult result : results) { System.out.println(result.getForPath() - result.getType()); } }2.2 异步操作前面提到的增删改查都是同步的但是 Curator 也提供了异步接口引入了 BackgroundCallback 接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback 接口中一个重要的回调值为 CuratorEvent里面包含事件类型、响应码和节点的详细信息。​ // 异步操作 Test public void TestAsync() throws Exception { ​ while(true){ ​ // 异步获取子节点列表 GetChildrenBuilder builder client.getChildren(); builder.inBackground(new BackgroundCallback() { Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println(子节点列表 curatorEvent.getChildren()); } }).forPath(/); ​ TimeUnit.SECONDS.sleep(5); } ​ }3. Zookeeper权限控制3.1 zk权限控制介绍Zookeeper作为一个分布式协调框架内部存储了一些分布式系统运行时的状态的数据比如master选举、比如分布式锁。对这些数据的操作会直接影响到分布式系统的运行状态。因此为了保证zookeeper中的数据的安全性避免误操作带来的影响。Zookeeper提供了一套ACL权限控制机制来保证数据的安全。ACL权限控制使用scheme:id:perm来标识。Scheme权限模式标识授权策略ID授权对象Permission授予的权限ZooKeeper的权限控制是基于每个znode节点的需要对每个节点设置权限每个znode支持设置多种权限控制方案和多个权限子节点不会继承父节点的权限客户端无权访问某节点但可能可以访问它的子节点。3.2 Scheme 权限模式Zookeeper提供以下权限模式所谓权限模式就是使用什么样的方式来进行授权。world:默认方式相当于全部都能访问。auth代表已经认证通过的用户cli中可以通过addauth digest user:pwd来添加当前上下文中的授权用户digest即用户名:密码这种方式认证这也是业务系统中最常用的。用username:password字符串来产生一个MD5串然后该串被用来作为ACL ID。认证是通过明文发送username:password来进行的当用在ACL时表达式为username:base64base64是password的SHA1摘要的编码。ip通过ip地址来做权限控制比如 ip:192.168.1.1 表示权限控制都是针对这个ip地址的。也可以针对网段 ip:192.168.1.1/24此时addr中的有效位与客户端addr中的有效位进行比对。3.3 ID 授权对象指权限赋予的用户或一个指定的实体不同的权限模式下授权对象不同。Id ipId new Id(ip, 192.168.58.100); Id ANYONE_ID_UNSAFE new Id(world, anyone);3.4 Permission权限类型指通过权限检查后可以被允许的操作create /delete /read/write/adminCreate 允许对子节点Create 操作Read 允许对本节点GetChildren 和GetData 操作Write 允许对本节点SetData 操作Delete 允许对子节点Delete 操作Admin 允许对本节点setAcl 操作权限模式(Schema)和授权对象主要用来确认权限验证过程中使用的验证策略比如ip地址、digest:username:password匹配到验证策略并验证成功后再根据权限操作类型来决定当前客户端的访问权限。3.5 在控制台实现操作在Zookeeper中提供了ACL相关的命令getAcl getAcl path 读取ACL权限 setAcl setAcl path acl 设置ACL权限 addauth addauth scheme auth 添加认证用户1word方式创建一个节点后默认就是world模式[zk: localhost:2181(CONNECTED) 6] create /auth Created /auth [zk: localhost:2181(CONNECTED) 7] getAcl /auth world,anyone : cdrwa [zk: localhost:2181(CONNECTED) 8] create /auth2 Created /auth2 [zk: localhost:2181(CONNECTED) 9] getAcl /auth2 world,anyone : cdrwa [zk: localhost:2181(CONNECTED) 10]其中 cdrwa分别对应 create . delete read write admin2IP方式在ip模式中首先连接到zkServer的命令需要使用如下方式zkCli.sh -server 127.0.0.1:2181接着按照IP的方式操作如下[zk: 127.0.0.1:2181(CONNECTED) 0] create /ip-model Created /ip-model [zk: 127.0.0.1:2181(CONNECTED) 1] setAcl /ip-model ip:127.0.0.1:cdrwa [zk: 127.0.0.1:2181(CONNECTED) 3] getAcl /ip-model ip,127.0.0.1 : cdrwa3 Auth模式auth模式的操作如下。[zk: 127.0.0.1:2181(CONNECTED) 5] create /spike Created /spike [zk: 127.0.0.1:2181(CONNECTED) 6] addauth digest spike:123456 [zk: 127.0.0.1:2181(CONNECTED) 9] setAcl /spike auth:spike:cdrwa [zk: 127.0.0.1:2181(CONNECTED) 10] getAcl /spike digest,spike:pPeKgz2N9Xc8Um6wwnzFUMteLxk : cdrwa当我们退出当前的会话后再次连接执行如下操作会提示没有权限[zk: localhost:2181(CONNECTED) 0] get /spike Insufficient permission : /spike这时候我们需要重新授权。[zk: localhost:2181(CONNECTED) 1] addauth digest spike:123456 [zk: localhost:2181(CONNECTED) 2] get /spike null4 Digest模式使用语法会发现使用方式和Auth模式相同setAcl /digest digest:用户名:密码:权限但是有一个不一样的点密码需要用加密后的否则无法被识别。密码 用户名和密码加密后的字符串。使用下面程序生成密码public class TestAcl { Test public void createPw() throws NoSuchAlgorithmException { String up msb:msb; byte[] digest MessageDigest.getInstance(SHA1).digest(up.getBytes()); String encodeStr Base64.getEncoder().encodeToString(digest); System.out.println(encodeStr); } }得到5FAC7McRhLdx0QUWsfEbK8pqwxc再回到client上进行如下操作[zk: localhost:2181(CONNECTED) 14] create /digest Created /digest [zk: localhost:2181(CONNECTED) 15] setAcl /digest digest:msb:5FAC7McRhLdx0QUWsfEbK8pqwxc:cdrwa [zk: localhost:2181(CONNECTED) 16] getAcl /digest digest,msb:5FAC7McRhLdx0QUWsfEbK8pqwxc : cdrwa当退出当前会话后需要再次授权才能访问/digest节点[zk: localhost:2181(CONNECTED) 0] get /digest Insufficient permission : /digest [zk: localhost:2181(CONNECTED) 1] addauth digest msb:msb [zk: localhost:2181(CONNECTED) 2] get /digest null3.6 Curator演示ACL的使用接下来我们使用Curator简单演示一下ACL权限的访问操作。public class TestAcl { private CuratorFramework client; Test public void createPw() throws NoSuchAlgorithmException { String up msb:msb; byte[] digest MessageDigest.getInstance(SHA1).digest(up.getBytes()); String encodeStr Base64.getEncoder().encodeToString(digest); System.out.println(encodeStr); } //1.创建连接 Before public void createConnect(){ client CuratorFrameworkFactory.builder() .connectString(192.168.58.100:2181) .sessionTimeoutMs(5000).connectionTimeoutMs(20000) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .namespace(msbAcl).build(); client.start(); } Test public void testCuratorAcl() throws Exception { //创建ID以Digest方式认证用户名和密码为 msb:msb Id id new Id(digest, DigestAuthenticationProvider.generateDigest(msb:msb)); //为ID对象指定权限 ListACL acls new ArrayList(); acls.add(new ACL(ZooDefs.Perms.ALL,id)); //创建节点 auth,设置节点数据并设置ACL权限 String node client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) // 设置节点类型是持久节点 .withACL(acls,false) //设置节点的ACL权限 .forPath(/auth,hello.getBytes()); //设置节点的路径和数据 System.out.println(成功创建带权限的节点 node); //获取刚刚创建的节点的数据 byte[] bytes client.getData().forPath(node); System.out.println(获取数据结果 new String(bytes)); } }上述代码执行后会报错先删除节点[zk: localhost:2181(CONNECTED) 6] deleteall /msbAcl修改代码 连接时增加授权4. Zookeeper集群搭建4.1 搭建要求真实的集群是需要部署在不同的服务器上的但是在我们测试时同时启动很多个虚拟机内存会吃不消所以我们通常会搭建伪集群也就是把所有的服务都搭建在一台虚拟机上用端口进行区分。我们这里要求搭建一个三个节点的Zookeeper集群伪集群。4.2 Zookeeper集群角色zookeeper集群中的节点有三种角色Leader处理集群的所有事务请求增删改集群中只有一个Leader。Follower只能处理读请求参与Leader选举。Observer只能处理读请求提升集群读的性能但不能参与Leader选举。4.2 准备工作重新部署一台虚拟机作为我们搭建集群的测试服务器。1安装JDK 【此步骤省略】。2Zookeeper压缩包上传到服务器 3将Zookeeper解压 建立/usr/local/zookeeper-cluster目录将解压后的Zookeeper复制到以下三个目录。[rootlocalhost ~]# mkdir /usr/local/zookeeper-cluster [rootlocalhost software]# cp -r apache-zookeeper-3.7.1-bin /usr/local/zookeeper-cluster/zookeeper-1 [rootlocalhost software]# cp -r apache-zookeeper-3.7.1-bin /usr/local/zookeeper-cluster/zookeeper-2 [rootlocalhost software]# cp -r apache-zookeeper-3.7.1-bin /usr/local/zookeeper-cluster/zookeeper-34创建data目录 并且将 conf下zoo_sample.cfg 文件改名为 zoo.cfgmkdir /usr/local/zookeeper-cluster/zookeeper-1/data mkdir /usr/local/zookeeper-cluster/zookeeper-2/data mkdir /usr/local/zookeeper-cluster/zookeeper-3/data mv /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg mv /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg mv /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg5 配置每一个Zookeeper 的dataDir 和 clientPort 分别为2181 2182 2183修改/usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfgvim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg clientPort2181 dataDir/usr/local/zookeeper-cluster/zookeeper-1/data修改/usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfgvim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg clientPort2182 dataDir/usr/local/zookeeper-cluster/zookeeper-2/data修改/usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfgvim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg clientPort2183 dataDir/usr/local/zookeeper-cluster/zookeeper-3/data4.3 配置集群1在每个zookeeper的 data 目录下创建一个 myid 文件内容分别是1、2、3 。这个文件就是记录每个服务器的ID[rootlocalhost software]# echo 1 /usr/local/zookeeper-cluster/zookeeper-1/data/myid [rootlocalhost software]# echo 2 /usr/local/zookeeper-cluster/zookeeper-2/data/myid [rootlocalhost software]# echo 3 /usr/local/zookeeper-cluster/zookeeper-3/data/myid2在每一个zookeeper 的 zoo.cfg配置客户端访问端口clientPort和集群服务器IP列表。集群服务器IP列表如下vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg server.1192.168.58.200:2881:3881 server.2192.168.58.200:2882:3882 server.3192.168.58.200:2883:3883解释server.服务器ID服务器IP地址服务器之间通信端口服务器之间投票选举端口4.4 启动集群启动集群就是分别启动每个实例。/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start /usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start启动后我们查询一下每个实例的运行状态/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status /usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status查询第一个服务Mode为follower表示是跟随者从再查询第二个服务Mode 为leader表示是领导者主查询第三个为跟随者从5.Zookeeper集群操作5.1 客户端操作zk集群1) 第一步启动集群,启动后查看Zookeeper进程。jps命令 作用是显示当前所有java 进程的pid 的命令QuorumPeerMain是zookeeper集群的启动入口类2) 客户端连接连接集群所有客户端[rootlocalhost zookeeper-1]# ./bin/zkCli.sh -server 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183连接集群单个客户端# 连接2181 [rootlocalhost zookeeper-1]# ./bin/zkCli.sh -server 192.168.58.200:2181 # 连接2182 [rootlocalhost zookeeper-1]# ./bin/zkCli.sh -server 192.168.58.200:2182 # 在2181中创建节点 [zk: 192.168.58.200:2181(CONNECTED) 0] create /test2 # 在2182中查询发现数据已同步 [zk: 192.168.58.200:2182(CONNECTED) 0] ls / [test1, test2, zookeeper]以上两种方式的区别在于如果只连接单个客户端如果当前连接的服务器挂掉当前客户端连接也会挂掉连接失败。如果是连接所有客户端的形式则允许集群中半数以下的服务挂掉当半数以上服务挂掉才会停止服务可用性更高一点3集群节点信息查看集群中的节点信息被存放在每一个节点/zookeeper/config/目录下5.2 模拟集群异常操作1首先我们先测试如果是从服务器挂掉会怎么样把3号服务器停掉观察1号和2号发现状态并没有变化/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh stop /usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status由此得出结论3个节点的集群从服务器挂掉集群正常2我们再把1号服务器从服务器也停掉查看2号主服务器的状态发现已经停止运行了。/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh stop /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status由此得出结论3个节点的集群2个从服务器都挂掉主服务器也无法运行。因为可运行的机器没有超过集群总数量的半数。3我们再次把1号服务器启动起来发现2号服务器又开始正常工作了。而且依然是领导者。4我们把3号服务器也启动起来把2号服务器停掉,停掉后观察1号和3号的状态。/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh stop /usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status /usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status发现新的leader产生了~由此我们得出结论当集群中的主服务器挂了集群中的其他服务器会自动进行选举状态然后产生新得leader 。5我们再次测试当我们把2号服务器重新启动起来启动后会发生什么2号服务器会再次成为新的领导吗我们看结果/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status /usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status我们会发现2号服务器启动后依然是跟随者从服务器3号服务器依然是领导者主服务器没有撼动3号服务器的领导地位。由此我们得出结论当领导者产生后再次有新服务器加入集群不会影响到现任领导者。5.3 curate客户端连接zookeeper集群public class CuratorCluster { //zookeeper连接 private final static String CLUSTER_CONNECT 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183; //session超时时间 private static final int sessionTimeoutMs 60 * 1000; //连接超时时间 private static final int connectionTimeoutMs 5000; private static CuratorFramework client; public static String getClusterConnect() { return CLUSTER_CONNECT; } Before public void init(){ // 重试策略 RetryPolicy retryPolicy new ExponentialBackoffRetry(3000,10); // zookeeper连接 client CuratorFrameworkFactory.builder() .connectString(getClusterConnect()) .sessionTimeoutMs(60*1000) .connectionTimeoutMs(15*1000) .retryPolicy(retryPolicy) .namespace(mashibing) //当前程序创建目录的根目录 .build(); // 添加监听器 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { System.out.println(连接成功); } }); client.start(); } //创建节点 public void createIfNeed(String path) throws Exception { Stat stat client.checkExists().forPath(path); if(stat null){ String s client.create().forPath(path); System.out.println(创建节点 s); } } //从集群中获取数据 Test public void testCluster() throws Exception { createIfNeed(/test); //每隔一段时间 获取一次数据 while(true){ byte[] data client.getData().forPath(/test); System.out.println(new String(data)); TimeUnit.SECONDS.sleep(5); } } }在集群中的任意服务器节点为test设置数据[zk: 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183(CONNECTED) 2] set /mashibing/test 12345