首页 > 编程语言 > 如何使用Java操作Zookeeper
2021
06-05

如何使用Java操作Zookeeper

简介

Java操作Zookeeper有很多种方式,如zookeeper、zkclient、curator等等,下面介绍下使用zkclient的方式操作Zookeeper。

Maven依赖:

1
2
3
4
5
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
</dependency>

使用zkclient操作Zookeeper

创建节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void testCreateNode() {
    //建立连接
    //zkServers: Zookeeper服务器IP地址和端口号,如果是集群情况下用逗号分割多个Zookeeper服务器地址
    //sessionTimeout: 会话超时时间
    //connectionTimeout: 连接超时时间
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
 
    //创建数据
    Student student = new Student();
    student.setName("张三");
    student.setAge(18);
    student.setPhone("1585454xxxx");
 
    //创建持久节点
    zkClient.createPersistent("/p_node", student);
    //创建持久顺序节点
    zkClient.createPersistentSequential("/ps_node", student);
    //创建临时节点
    zkClient.createEphemeral("/e_node", student);
    //创建临时顺序节点
    zkClient.createEphemeralSequential("/ps_node", student);
 
    //关闭客户端
    //关闭客户端的同时,前面创建的临时节点也会被删除
    zkClient.close();
}

读取节点数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void testReadNodeData() {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
 
    Stat stat = new Stat();
    Student student = zkClient.readData("/p_node", stat);
    System.out.println("节点状态信息:" + JSON.toJSONString(student));
    System.out.println("节点数据:" + JSON.toJSONString(stat));
 
    zkClient.close();
}
//输出结果:
//节点状态信息:{"age":18,"name":"张三","phone":"1585454xxxx"}
//节点数据:{"aversion":0,"ctime":1619165355431,"cversion":0,"czxid":165,"dataLength":260,"ephemeralOwner":0,"mtime":1619165355431,"mzxid":165,"numChildren":0,"pzxid":165,"version":0}

删除节点:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testDeleteNode() {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
 
    //删除节点,不能删除带有子节点的节点
    zkClient.delete("/p_node");
    //删除节点,递归删除所有子孙节点
    zkClient.deleteRecursive("/p_node2");
 
    zkClient.close();
}

更新节点数据:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testWriteNodeData() {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
 
    //更新节点数据
    zkClient.writeData("/p_node", "myData1");
    //使用CAS更新节点数据
    //zkClient.writeData("/p_node", "myData2", 1);
 
    zkClient.close();
}

获取子节点列表:

1
2
3
4
5
6
7
8
9
10
@Test
public void testGetChildNodes() {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
 
    //获取子节点列表
    List<String> childList = zkClient.getChildren("/p_node");
    childList.stream().forEach(System.out::println);
 
    zkClient.close();
}

节点监听

Java节点监听都是永久的,触发一次后不会被删除。

监听节点的子节点变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void testSubscribeChildChanges() throws IOException {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
    zkClient.subscribeChildChanges("/p_node", new IZkChildListener(){
 
        //子节点改变时调用
        @Override
        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
            System.out.println(parentPath + "子节点发生改变");
            System.out.println("当前子节点列表:" + currentChilds);
        }
 
    });
 
    //阻塞客户端,便于测试
    System.in.read();
}

监听节点数据变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void testSubscribeDataChanges() throws IOException {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
    zkClient.subscribeDataChanges("/p_node", new IZkDataListener(){
 
        //节点数据改变时调用
        @Override
        public void handleDataChange(String dataPath, Object data) throws Exception {
            System.out.println(dataPath + "节点数据发生变化");
            System.out.println("修改后的数据为:" + data.toString());
        }
 
        //节点被删除时调用
        @Override
        public void handleDataDeleted(String dataPath) throws Exception {
            System.out.println(dataPath + "节点已被删除");
        }
    });
 
    //阻塞客户端,便于测试
    System.in.read();
}

监听Zookeeper连接状态变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void testSubscribeStateChanges() throws IOException {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
    zkClient.subscribeStateChanges(new IZkStateListener(){
 
        //当zookeeper连接状态改变时调用
        @Override
        public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
            System.out.println("当前状态" + state);
        }
 
        //在zookeeper会话过期并且创建了一个新的会话之后调用
        @Override
        public void handleNewSession() throws Exception {
            System.out.println("会话过期,已创建新的会话");
        }
 
        //当会话不能重新建立时调用
        @Override
        public void handleSessionEstablishmentError(Throwable error) throws Exception {
            error.printStackTrace();
        }
    });
 
    //阻塞客户端,便于测试
    System.in.read();
}

以上就是如何使用Java操作Zookeeper的详细内容,更多关于Java操作Zookeeper的资料请关注自学编程网其它相关文章!

编程技巧