ZooKeeper小总结

源码:点击查看
下面会分三部分进行讲解
1、
==> zookeeper简介
==> zookeeper数据模型
==> zookeeper单机安装
==> zookeeper常用shell命令
==> zookeeper的Acl权限控制
==> zookeeper的javaApi
2、
==> zookeeper 事件监听机制
==> zookeeper 集群搭建
==> 一致性协议:zab协议
==> zookeeper的leader选举
==> observer角色及其配置
==> zookeeperAPI连接集群
3、
==> zookeeper 开源客户端curator介绍
==> zookeeper四字监控命令
==> zookeeper图形化的客户端工具(ZooInspector)
==> taokeeper监控工具的使用

zookeeper简介

zookeeper是什么

zookeeper官网:https://zookeeper.apache.org/
在这里插入图片描述

zooKeeper由雅虎研究院开发,是Google Chubby的开源实现,后来托管到 Apache,于2010年11月正式成为Apache的顶级项目。

大数据生态系统里的很多组件的命名都是某种动物或者昆虫,比如hadoop就是 🐘大象,hive就是🐝蜜蜂。
zookeeper即动物园管理者,顾名思义就是管理大数据生态系统各组件 的管理员,如下图所示:
在这里插入图片描述

zookeeper应用场景

zooKeeper是一个经典的分布式数据一致性解决方案,致力于为分布式应用提供一 个高性能、高可用,且具有严格顺序访问控制能力的分布式协调存储服务。

  • 维护配置信息
  • 分布式锁服务
  • 集群管理
  • 生成分布式唯一ID

1、维护配置信息
java编程经常会遇到配置项,比如数据库的url、schema、user和password 等。通常这些配置项我们会放置在配置文件中,再将配置文件放置在服务器上当需要更 改配置项时,需要去服务器上修改对应的配置文件。但是随着分布式系统的兴起,由于 许多服务都需要使用到该配置文件,因此有必须保证该配置服务的高可用性(high availability)和各台服务器上配置数据的一致性。通常会将配置文件部署在一个集群上, 然而一个集群动辄上千台服务器,此时如果再一台台服务器逐个修改配置文件那将是非 常繁琐且危险的的操作,因此就需要一种服务,能够高效快速且可靠地完成配置项的更 改等操作,并能够保证各配置项在每台服务器上的数据一致性。

zookeeper就可以提供这样一种服务,其使用Zab这种一致性协议来保证一致
性。现在有很多开源项目使用zookeeper来维护配置,比如在hbase中,客户端就是连接 一个zookeeper,获得必要的hbase集群的配置信息,然后才可以进一步操作。还有在开 源的消息队列kafka中,也使用zookeeper来维护broker的信息。在alibaba开源的soa框 架dubbo中也广泛的使用zookeeper管理一些配置来实现服务治理。
在这里插入图片描述

2、分布式锁服务
一个集群是一个分布式系统,由多台服务器组成。为了提高并发度和可靠性, 多台服务器上运行着同一种服务。当多个服务在运行时就需要协调各服务的进度,有时 候需要保证当某个服务在进行某个操作时,其他的服务都不能进行该操作,即对该操作 进行加锁,如果当前机器挂掉后,释放锁并fail over 到其他的机器继续执行该服务。
在这里插入图片描述

3、集群管理
一个集群有时会因为各种软硬件故障或者网络故障,出现某些服务器挂掉而被移除 集群,而某些服务器加入到集群中的情况,zookeeper会将这些服务器加入/移出的情况 通知给集群中的其他正常工作的服务器,以及时调整存储和计算等任务的分配和执行 等。此外zookeeper还会对故障的服务器做出诊断并尝试修复。
在这里插入图片描述

4、生成分布式唯一ID
在过去的单库单表型系统中,通常可以使用数据库字段自带的auto_increment 属性来自动为每条记录生成一个唯一的ID。但是分库分表后,就无法在依靠数据库的 auto_increment属性来唯一标识一条记录了。此时我们就可以用zookeeper在分布式环 境下生成全局唯一ID。做法如下:每次要生成一个新Id时,创建一个持久顺序节点,创建 操作返回的节点序号,即为新Id,然后把比自己节点小的删除即可

zookeeper的设计目标

zooKeeper致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访 问控制能力的分布式协调服务

  1. 高性能:zooKeeper将全量数据存储在内存中,并直接服务于客户端的所有非事务请求,尤其适用于以读为主的应用场景
  2. 高可用:zooKeeper一般以集群的方式对外提供服务,一般3 ~ 5台机器就可以组成一个可用 的Zookeeper集群了,每台机器都会在内存中维护当前的服务器状态,并且每台机器之间都相 互保持着通信。只要集群中超过一半的机器都能够正常工作,那么整个集群就能够正常对外服 务
  3. 严格顺序访问:对于来自客户端的每个更新请求,ZooKeeper都会分配一个全局唯一的递增编号, 这个编号反映了所有事务操作的先后顺序

zookeeper的数据模型

zookeeper的数据节点可以视为树状结构(或者目录),树中的各节点被称为 znode(即zookeeper node),一个znode可以有多个子节点。zookeeper节点在结构 上表现为树状;使用路径path来定位某个znode,比如/ns-1/taopanfeng/mysql/schema1/table1,此处ns-1、taopanfeng、mysql、schema1、table1分别是 根节点、2级节点、3级节点以及4级节点;其中ns-1是taopanfeng的父节点,taopanfeng是ns-1的子 节点,taopanfeng是mysql的父节点,mysql是taopanfeng的子节点,以此类推。
在这里插入图片描述

那么如何描述一个znode呢?一个znode大体上分为3各部分:

  • 节点的数据:即znode data(节点path, 节点data)的关系就像是java map中(key, value)的关系
  • 节点的子节点children
  • 节点的状态stat:用来描述当前节点的创建、修改记录,包括cZxid、ctime等

节点状态stat的属性
在zookeeper shell中使用get命令查看指定路径节点的data、stat信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: localhost:2181(CONNECTED) 7] get /ns-1/tenant
cZxid = 0x6a0000000a # 数据节点创建时的事务 ID
ctime = Wed Mar 27 09:56:44 CST 2019 # 数据节点创建时的时间
mZxid = 0x6a0000000a # 数据节点最后一次更新时的事务 ID
mtime = Wed Mar 27 09:56:44 CST 2019 # 数据节点最后一次更新时的时间
pZxid = 0x6a0000000e # 数据节点的子节点最后一次被修改时的事务 ID
cversion = 2 # 子节点的更改次数
dataVersion = 0 # 节点数据的更改次数
aclVersion = 0 # 节点的 ACL 的更改次数
ephemeralOwner = 0x0 # 如果节点是临时节点,则表示创建该节点的会话的
# SessionID # 如果节点是持久节点,则该属性值为 0
dataLength = 0 # 数据内容的长度
numChildren = 2 # 数据节点当前的子节点个数

节点类型
zookeeper中的节点有两种,分别为临时节点和永久节点。节点的类型在创建时即 被确定,并且不能改变。

  • 临时节点:该节点的生命周期依赖于创建它们的会话。一旦会话(Session)结束,临 时节点将被自动删除,当然可以也可以手动删除。虽然每个临时的Znode都会绑定到 一个客户端会话,但他们对所有的客户端还是可见的。另外,ZooKeeper的临时节 点不允许拥有子节点。
  • 持久化节点:该节点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作 的时候,他们才能被删除

zookeeper单机安装

Docker 安装 ZooKeeper

当前测试系统环境centos7.3
jdk:jdk-8u131-linux-x64.tar.gz
zookeeper:zookeeper-3.4.10.tar.gz

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# 1. 在centos中使用root用户创建zookeeper用户,用户名:zookeeper 密 码:zookeeper
useradd zookeeper
passwd zookeeper

# 2. zookeeper底层依赖于jdk,zookeeper用户登录后,根目录下先进行jdk的安装,jdk 使用jdk-8u131-linux-x64.tar.gz版本,上传并解压jdk
tar -xzvf jdk-8u131-linux-x64.tar.gz # 解压jdk

# 3. 配置jdk环境变量
// vim打开 .bash_profile文件
vi .bash_profile

// 文件中加入如下内容
JAVA_HOME=/home/zookeeper/jdk1.8.0_131
export JAVA_HOME

PATH=$JAVA_HOME/bin:$PATH
export PATH

// 使环境变量生效
source .bash_profile

# 4. 检测jdk安装
// 敲如下命令,系统如图反馈说明安装成功
java -version

# 5. zookeeper使用zookeeper-3.4.10.tar.gz,上传并解压
tar -xzvf zookeeper-3.4.10.tar.gz # 解压zookeeper

# 6. 为zookeeper准备配置文件
// 进入conf目录
cd /home/zookeeper/zookeeper-3.4.10/conf

// 复制配置文件
cp zoo_sample.cfg zoo.cfg
// zookeeper根目录下新建data目录
mkdir data

// vi 修改配置文件中的dataDir
// 此路径用于存储zookeeper中数据的内存快照、及事物日志文件
dataDir=/home/zookeeper/zookeeper-3.4.10/data

# 7. 启动zookeeper
// 进入zookeeper的bin目录
cd /home/zookeeper/zookeeper-3.4.10/bin

// 启动zookeeper
./zkServer.sh start

zkServer.sh start # 启动
zkServer.sh stop # 停止
zkServer.sh status # 查看状态

zkClient 常用命令

新增节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 语法
create [-s] [-e] path data #其中-s 为有序节点,-e 临时节点

# 创建持久化节点并写入数据
create /hadoop "123456"

# 创建持久化有序节点,此时创建的节点名为指定节点名 + 自增序号
[zk: localhost:2181(CONNECTED) 2] create -s /a "aaa"
Created /a0000000000
[zk: localhost:2181(CONNECTED) 3] create -s /b "bbb"
Created /b0000000001
[zk: localhost:2181(CONNECTED) 4] create -s /c "ccc"
Created /c0000000002

# 创建临时节点,临时节点会在会话过期后被删除:
[zk: localhost:2181(CONNECTED) 5] create -e /tmp "tmp"
Created /tmp

# 创建临时有序节点,临时节点会在会话过期后被删除:
[zk: localhost:2181(CONNECTED) 6] create -s -e /aa 'aaa'
Created /aa0000000004
[zk: localhost:2181(CONNECTED) 7] create -s -e /bb 'bbb'
Created /bb0000000005
[zk: localhost:2181(CONNECTED) 8] create -s -e /cc 'ccc'
Created /cc0000000006

更新节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 更新节点的命令是 set ,可以直接进行修改,如下:
[zk: localhost:2181(CONNECTED) 3] set /hadoop "345"
cZxid = 0x4
ctime = Thu Dec 12 14:55:53 CST 2019
mZxid = 0x5
mtime = Thu Dec 12 15:01:59 CST 2019
pZxid = 0x4
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0


# 也可以基于版本号进行更改,此时类似于乐观锁机制.
# 当你传入的数据版本号 (dataVersion) 和当前节点的数据版本号不符合时,zookeeper 会拒绝本次修改:
[zk: localhost:2181(CONNECTED) 10] set /hadoop "3456" 1
version No is not valid : /hadoop

删除节点

1
2
3
4
5
6
7
8
9
# 语法
delete path [version]

# 和更新节点数据一样,也可以传入版本号.
# 当你传入的数据版本号 (dataVersion) 和当前节点的数据版本号不符合时,zookeeper 不会执行删除操作。
[zk: localhost:2181(CONNECTED) 36] delete /hadoop 0
version No is not valid : /hadoop #无效的版本号
[zk: localhost:2181(CONNECTED) 37] delete /hadoop 1
[zk: localhost:2181(CONNECTED) 38]

查看节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 语法
get path


[zk: localhost:2181(CONNECTED) 1] get /hadoop
123456
cZxid = 0x4
ctime = Thu Dec 12 14:55:53 CST 2019
mZxid = 0x4
mtime = Thu Dec 12 14:55:53 CST 2019
pZxid = 0x4
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

节点各个属性如下表。
其中一个重要的概念是 Zxid(ZooKeeper Transaction Id),ZooKeeper 节点的每一次更改都具有唯一的 Zxid。
如果 Zxid1 小于 Zxid2,则 Zxid1 的更改发生在 Zxid2 更改之前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
状态属性         说明
---------------------------------------------
cZxid 数据节点创建时的事务 ID
ctime 数据节点创建时的时间
mZxid 数据节点最后一次更新时的事务 ID
mtime 数据节点最后一次更新时的时间
pZxid 数据节点的子节点最后一次被修改时的事务 ID
cversion 子节点的更改次数
dataVersion 节点数据的更改次数
aclVersion 节点的 ACL 的更改次数
ephemeralOwner 如果节点是临时节点,则表示创建该节点的会话的
SessionID 如果节点是持久节点,则该属性值为 0
dataLength 数据内容的长度
numChildren 数据节点当前的子节点个数

查看节点状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 可以使用 stat 命令查看节点状态
# 它的返回值和 get 命令类似,但不会返回 节点数据
[zk: localhost:2181(CONNECTED) 2] stat /hadoop
cZxid = 0x4
ctime = Thu Dec 12 14:55:53 CST 2019
mZxid = 0x4
mtime = Thu Dec 12 14:55:53 CST 2019
pZxid = 0x4
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

查看节点列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 查看节点列表有 ls path 和 ls2 path 两个命令
# 后者是前者的增强,不仅可以查看指定路径下的所有节点,还可以查看当前节点的信息
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, brokers, storm, zookeeper, admin, ...]
[zk: localhost:2181(CONNECTED) 1] ls2 /
[cluster, controller_epoch, brokers, storm, zookeeper, admin, ....]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x130
cversion = 19
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 11

监听器get path [watch]

注意:watch 是一次性用品

1
2
3
4
5
6
# 使用 get path [watch] 注册的监听器能够在节点内容发生改变的时候,向客 户端发出通知。
# 需要注意的是 zookeeper 的触发器是一次性的 (One-time trigger),即 触发一次后就会立即失效。
[zk: localhost:2181(CONNECTED) 4] get /hadoop watch
[zk: localhost:2181(CONNECTED) 5] set /hadoop 45678
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/hadoop #节点 值改变

监听器stat path [watch]

1
2
3
4
5
# 使用 stat path [watch] 注册的监听器能够在节点状态发生改变的时候,向客 户端发出通知
[zk: localhost:2181(CONNECTED) 7] stat /hadoop watch
[zk: localhost:2181(CONNECTED) 8] set /hadoop 112233
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/hadoop #节点 值改变

监听器ls\ls2 path [watch]

1
2
3
4
5
6
7
# 使用 ls path [watch]
# 或 ls2 path [watch] 注册的监听器能够监听该节点下 所有子节点的增加和删除操作。
[zk: localhost:2181(CONNECTED) 9] ls /hadoop watch
[]
[zk: localhost:2181(CONNECTED) 10] create /hadoop/yarn "aaa"
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/hadoop

ACL 权限控制

概述

ACL:全称 access control list 访问控制列表

zookeeper 类似文件系统,client 可以创建节点、更新节点、删除节点,那么如何做到节点的权限的控制呢?
zookeeper的access control list 访问控制列表可以做到 这一点。

acl 权限控制,使用scheme:id:permission 来标识,主要涵盖 3 个方面:

  • 权限模式(scheme):授权的策略
  • 授权对象(id):授权的对象
  • 权限(permission):授予的权限

其特性如下:

  • zooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限
  • 每个znode支持设置多种权限控制方案和多个权限
  • 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点

例如:

1
2
# 将节点权限设置为Ip:192.168.60.130 的客户端可以对节点进行增、删、改、查、管理权限
setAcl /test2 ip:192.168.60.130:crwda

权限模式

采用何种方式授权

1
2
3
4
5
6
方案    描述
--------------------------------------------
world 只有一个用户:anyone,代表登录zokeeper所有人(默认)
ip 对客户端使用IP地址认证
auth 使用已添加认证的用户认证
digest 使用“用户名:密码”方式认证

授权的对象

给谁授予权限

授权对象ID是指,权限赋予的实体,例如:IP 地址或用户。

授予的权限

授予什么权限?
create、delete、read、writer、admin也就是 增、删、改、查、管理权限, 这5种权限简写为cdrwa。
注意:这5种权限中,delete是指对子节点的删除权限,其它4种 权限指对自身节点的操作权限

1
2
3
4
5
6
7
权限    ACL简写   描述
---------------------------------------
create c 可以创建子节点
delete d 可以删除子节点(仅下一级节点)
read r 可以读取节点数据及显示子节点列表
write w 可以设置节点数据
admin a 可以设置节点访问控制列表权限

授权的相关命令

1
2
3
4
5
命令      使用方式    描述
-----------------------------------
getAcl getAcl 读取ACL权限
setAcl setAcl 设置ACL权限
addauth addauth 添加认证用户

案例

world授权模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 命令
setAcl <path> world:anyone:<acl>

# 案例
[zk: localhost:2181(CONNECTED) 1] create /node1 "node1"
Created /node1

[zk: localhost:2181(CONNECTED) 2] getAcl /node1
'world,'anyone #world方式对所有用户进行授权
: cdrwa #增、删、改、查、管理

[zk: localhost:2181(CONNECTED) 3] setAcl /node1 world:anyone:cdrwa
cZxid = 0x2
ctime = Fri Dec 13 22:25:24 CST 2019
mZxid = 0x2
mtime = Fri Dec 13 22:25:24 CST 2019
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

ip授权模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 命令
setAcl <path> ip:<ip>:<acl>

# 案例
# 注意:远程登录zookeeper命令:./zkCli.sh -server ip
[zk: localhost:2181(CONNECTED) 18] create /node2 "node2"
Created /node2

[zk: localhost:2181(CONNECTED) 23] setAcl /node2
ip:192.168.60.129:cdrwa
cZxid = 0xe
ctime = Fri Dec 13 22:30:29 CST 2019
mZxid = 0x10
mtime = Fri Dec 13 22:33:36 CST 2019
pZxid = 0xe
cversion = 0
dataVersion = 2
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 20
numChildren = 0

[zk: localhost:2181(CONNECTED) 25] getAcl /node2
'ip,'192.168.60.129
: cdrwa #使用IP非 192.168.60.129 的机器

[zk: localhost:2181(CONNECTED) 0] get /node2
Authentication is not valid : /node2 #没有权限

auth授权模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 命令
addauth digest <user>:<password> #添加认证用户
setAcl <path> auth:<user>:<acl>

# 案例
[zk: localhost:2181(CONNECTED) 2] create /node3 "node3"
Created /node3

# 添加认证用户
[zk: localhost:2181(CONNECTED) 4] addauth digest taopanfeng:123456

[zk: localhost:2181(CONNECTED) 1] setAcl /node3 auth:taopanfeng:cdrwa
cZxid = 0x15
ctime = Fri Dec 13 22:41:04 CST 2019
mZxid = 0x15
mtime = Fri Dec 13 22:41:04 CST 2019
pZxid = 0x15
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

[zk: localhost:2181(CONNECTED) 0] getAcl /node3
'digest,'taopanfeng:673OfZhUE8JEFMcu0l64qI8e5ek=
: cdrw

#添加认证用户后可以访问
[zk: localhost:2181(CONNECTED) 3] get /node3
node3
cZxid = 0x15
ctime = Fri Dec 13 22:41:04 CST 2019
mZxid = 0x15
mtime = Fri Dec 13 22:41:04 CST 2019
pZxid = 0x15
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

digest授权模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 命令
setAcl <path> digest:<user>:<password>:<acl>

# 这里的密码是经过SHA1及BASE64处理的密文,在SHELL中可以通过以下命令计算:
echo -n <user>:<password> | openssl dgst -binary -sha1 | openssl base64
qlzQzCLKhBROghkooLvb+Mlwv4A=

# 先来计算一个密文
echo -n taopanfeng:123456 | openssl dgst -binary -sha1 | openssl base64

# 案例:
[zk: localhost:2181(CONNECTED) 4] create /node4 "node4"
Created /node4

#使用是上面算好的密文密码添加权限:
[zk: localhost:2181(CONNECTED) 5] setAcl /node4
digest:taopanfeng:qlzQzCLKhBROghkooLvb+Mlwv4A=:cdrwa
cZxid = 0x1c
ctime = Fri Dec 13 22:52:21 CST 2019
mZxid = 0x1c
mtime = Fri Dec 13 22:52:21 CST 2019
pZxid = 0x1c
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

[zk: localhost:2181(CONNECTED) 6] getAcl /node4
'digest,'taopanfeng:qlzQzCLKhBROghkooLvb+Mlwv4A=
: cdrwa

[zk: localhost:2181(CONNECTED) 3] get /node4
Authentication is not valid : /node4 #没有权限

[zk: localhost:2181(CONNECTED) 4] addauth digest taopanfeng:123456 #添加 认证用户

[zk: localhost:2181(CONNECTED) 5] get /node4
1 #成功读取数据
cZxid = 0x1c
ctime = Fri Dec 13 22:52:21 CST 2019
mZxid = 0x1c
mtime = Fri Dec 13 22:52:21 CST 2019
pZxid = 0x1c
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 5 numChildren = 0

多种模式授权

1
2
3
4
5
6
# 同一个节点可以同时使用多种模式授权
[zk: localhost:2181(CONNECTED) 0] create /node5 "node5"
Created /node5
[zk: localhost:2181(CONNECTED) 1] addauth digest taopanfeng:123456 #添加认 证用户
[zk: localhost:2181(CONNECTED) 2] setAcl /node5
ip:192.168.60.129:cdra,auth:taopanfeng:cdrwa,digest:taopanfeng:qlzQzCLKhBROgh kooLvb+Mlwv4A=:cdrwa

ACL 超级管理员

zookeeper的权限管理模式有一种叫做super,该模式提供一个超管可以方便的访问 任何权限的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 假设这个超管是:super:admin,需要先为超管生成密码的密文
echo -n super:admin | openssl dgst -binary -sha1 | openssl base64
xQJmxLMiHGwaqBvst5y6rkB6HQs=

# 那么打开zookeeper目录下的/bin/zkServer.sh服务器脚本文件,找到如下一行:
nohup $JAVA "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "- Dzookeeper.root.logger=${ZOO_LOG4J_PROP}"

# 这就是脚本中启动zookeeper的命令,默认只有以上两个配置项,我们需要加一个 超管的配置项
"-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs="

# 那么修改以后这条完整命令变成了
nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" "-Dzookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs="\
-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &

# 之后启动zookeeper,输入如下命令添加权限
addauth digest super:admin #添加认证用户

zookeeper java api

znode是zooKeeper集合的核心组件,zookeeper API提供了一小组方法使用 zookeeper集合来操纵znode的所有细节。

客户端应该遵循以步骤,与zookeeper服务器进行清晰和干净的交互。

  • 连接到zookeeper服务器。zookeeper服务器为客户端分配会话ID。
  • 定期向服务器发送心跳。否则,zookeeper服务器将过期会话ID,客户端需要重新连接。
  • 只要会话ID处于活动状态,就可以获取/设置znode。
  • 所有任务完成后,断开与zookeeper服务器的连接。如果客户端长时间不活动,则 zookeeper服务器将自动断开客户端。

连接到ZK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
// connectionString - zookeeper主机
// sessionTimeout - 会话超时(以毫秒为单位)
// watcher - 实现“监视器”对象。zookeeper集合通过监视器对象返回连接状态。



// 案例:
package com.taopanfeng.zookeeper;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;

public class ZookeeperConnection {
public static void main(String[] args) {
try {
// 计数器对象
CountDownLatch countDownLatch=new CountDownLatch(1);
// arg1:服务器的ip和端口
// arg2:客户端与服务器之间的会话超时时间 以毫秒为单位的
// arg3:监视器对象
ZooKeeper zooKeeper=new ZooKeeper("192.168.60.130:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()==Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 主线程阻塞等待连接对象的创建成功
countDownLatch.await();
// 会话编号
System.out.println(zooKeeper.getSessionId());
zooKeeper.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

新增节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// 同步方式
create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
// 异步方式
create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback callBack,Object ctx)
// path - znode路径。例如,/node1 /node1/node11
// data - 要存储在指定znode路径中的数据
// acl - 要创建的节点的访问控制列表。
// zookeeper API提供了一个静态接口 ZooDefs.Ids 来获取一些基本的acl列表。
// 例如,ZooDefs.Ids.OPEN_ACL_UNSAFE 返回打开znode的acl列表。
// createMode - 节点的类型,这是一个枚举。
// callBack-异步回调接口
// ctx-传递上下文参数



//案例:
package com.taopanfeng.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZKCreate {

String IP="192.168.60.130:2181";
ZooKeeper zooKeeper;

@Before
public void before()throws Exception{
// 计数器对象
CountDownLatch countDownLatch=new CountDownLatch(1);
// arg1:服务器的ip和端口
// arg2:客户端与服务器之间的会话超时时间 以毫秒为单位的
// arg3:监视器对象
zooKeeper=new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()==Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 主线程阻塞等待连接对象的创建成功
countDownLatch.await();
}

@After
public void after()throws Exception{
zooKeeper.close();
}

@Test
public void create1()throws Exception{
// arg1:节点的路径
// arg2:节点的数据
// arg3:权限列表 world:anyone:cdrwa
// arg4:节点类型 持久化节点
zooKeeper.create("/create/node1","node1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

@Test
public void create2() throws Exception {
// Ids.READ_ACL_UNSAFE world:anyone:r
zooKeeper.create("/create/node2", "node2".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
}

@Test
public void create3() throws Exception {
// world授权模式
// 权限列表
List<ACL> acls = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("world", "anyone");
// 权限设置
acls.add(new ACL(ZooDefs.Perms.READ, id));
acls.add(new ACL(ZooDefs.Perms.WRITE, id));
zooKeeper.create("/create/node3", "node3".getBytes(), acls, CreateMode.PERSISTENT);
}

@Test
public void create4() throws Exception {
// ip授权模式
// 权限列表
List<ACL> acls = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("ip", "192.168.60.130");
// 权限设置
acls.add(new ACL(ZooDefs.Perms.ALL, id));
zooKeeper.create("/create/node4", "node4".getBytes(), acls, CreateMode.PERSISTENT);
}

@Test
public void create5() throws Exception {
// auth授权模式
// 添加授权用户
zooKeeper.addAuthInfo("digest", "taopanfeng:123456".getBytes());
zooKeeper.create("/create/node5", "node5".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
}

@Test
public void create6() throws Exception {
// auth授权模式
// 添加授权用户
zooKeeper.addAuthInfo("digest", "taopanfeng:123456".getBytes());
// 权限列表
List<ACL> acls = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("auth", "taopanfeng");
// 权限设置
acls.add(new ACL(ZooDefs.Perms.READ, id));
zooKeeper.create("/create/node6", "node6".getBytes(), acls, CreateMode.PERSISTENT);
}

@Test
public void create7() throws Exception {
// digest授权模式
// 权限列表
List<ACL> acls = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("digest", "taopanfeng:qlzQzCLKhBROghkooLvb+Mlwv4A=");
// 权限设置
acls.add(new ACL(ZooDefs.Perms.ALL, id));
zooKeeper.create("/create/node7", "node7".getBytes(), acls, CreateMode.PERSISTENT);
}

@Test
public void create8() throws Exception {
// 持久化顺序节点
// Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
String result = zooKeeper.create("/create/node8", "node8".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(result);
}

@Test
public void create9() throws Exception {
// 临时节点
// Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
String result = zooKeeper.create("/create/node9", "node9".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(result);
}

@Test
public void create10() throws Exception {
// 临时顺序节点
// Ids.OPEN_ACL_UNSAFE world:anyone:cdrwa
String result = zooKeeper.create("/create/node10", "node10".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(result);
}

@Test
public void create11() throws Exception {
// 异步方式创建节点
zooKeeper.create("/create/node11", "node11".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
// 0 代表创建成功
System.out.println(rc);
// 节点的路径
System.out.println(path);
// 节点的路径
System.out.println(name);
// 上下文参数
System.out.println(ctx);

}
}, "I am context");
Thread.sleep(10000);
System.out.println("结束");
}
}

更新节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// 同步方式
setData(String path, byte[] data, int version)
// 异步方式
setData(String path, byte[] data, int version,AsyncCallback.StatCallback callBack, Object ctx)
// path- znode路径
// data - 要存储在指定znode路径中的数据。
// version- znode的当前版本。每当数据更改时,ZooKeeper会更新znode的版本号。
// callBack-异步回调接口
// ctx-传递上下文参数



# 案例:
package com.taopanfeng.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

public class ZKSet {

String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper;

@Before
public void before() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
// arg1:zookeeper服务器的ip地址和端口号
// arg2:连接的超时时间 以毫秒为单位
// arg3:监听器对象
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 使主线程阻塞等待
countDownLatch.await();
}

@After
public void after() throws Exception {
zooKeeper.close();
}

@Test
public void set1() throws Exception {
// arg1:节点的路径
// arg2:修改的数据
// arg3:数据版本号 -1代表版本号不参与更新
Stat stat = zooKeeper.setData("/set/node1", "node13".getBytes(), -1);
// 当前节点的版本号
System.out.println(stat.getVersion());

}

@Test
public void set2() throws Exception {
zooKeeper.setData("/set/node1", "node14".getBytes(), -1, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 0代表修改成功
System.out.println(rc);
// 节点的路径
System.out.println(path);
// 上下文参数对象
System.out.println(ctx);
// 属性描述对象
System.out.println(stat.getVersion());
}
}, "I am Context");
Thread.sleep(10000);
System.out.println("结束");
}
}

删除节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 同步方式
delete(String path, int version)
// 异步方式
delete(String path, int version, AsyncCallback.VoidCallback callBack, Object ctx)
// path - znode路径。
// version - znode的当前版本
// callBack-异步回调接口
// ctx-传递上下文参数



// 案例:
package com.taopanfeng.zookeeper;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;

public class ZKDelete {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper;

@Before
public void before() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
// arg1:zookeeper服务器的ip地址和端口号
// arg2:连接的超时时间 以毫秒为单位
// arg3:监听器对象
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 使主线程阻塞等待
countDownLatch.await();
}

@After
public void after() throws Exception {
zooKeeper.close();
}

@Test
public void delete1() throws Exception {
// arg1:删除节点的节点路径
// arg2:数据版本信息 -1代表删除节点时不考虑版本信息
zooKeeper.delete("/delete/node1",-1);
}

@Test
public void delete2() throws Exception {
// 异步使用方式
zooKeeper.delete("/delete/node2", -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
// 0代表删除成功
System.out.println(rc);
// 节点的路径
System.out.println(path);
// 上下文参数对象
System.out.println(ctx);
}
},"I am Context");
Thread.sleep(10000);
System.out.println("结束");
}
}

查看节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// 同步方式
getData(String path, boolean b, Stat stat)
// 异步方式
getData(String path, boolean b,AsyncCallback.DataCallback callBack, Object ctx)
// path - znode路径。
// b- 是否使用连接对象中注册的监视器。
// stat - 返回znode的元数据。
// callBack-异步回调接口
// ctx-传递上下文参数



// 案例:
package com.taopanfeng.zookeeper;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

public class ZKGet {

String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper;

@Before
public void before() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
// arg1:zookeeper服务器的ip地址和端口号
// arg2:连接的超时时间 以毫秒为单位
// arg3:监听器对象
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 使主线程阻塞等待
countDownLatch.await();
}

@After
public void after() throws Exception {
zooKeeper.close();
}

@Test
public void get1() throws Exception {
// arg1:节点的路径
// arg3:读取节点属性的对象
Stat stat=new Stat();
byte [] bys=zooKeeper.getData("/get/node1",false,stat);
// 打印数据
System.out.println(new String(bys));
// 版本信息
System.out.println(stat.getVersion());
}

@Test
public void get2() throws Exception {
//异步方式
zooKeeper.getData("/get/node1", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
// 0代表读取成功
System.out.println(rc);
// 节点的路径
System.out.println(path);
// 上下文参数对象
System.out.println(ctx);
// 数据
System.out.println(new String(data));
// 属性对象
System.out.println(stat.getVersion());
}
},"I am Context");
Thread.sleep(10000);
System.out.println("结束");
}
}

查看子节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// 同步方式
getChildren(String path, boolean b)
// 异步方式
getChildren(String path, boolean b,AsyncCallback.ChildrenCallback callBack,Object ctx)
// path - Znode路径。
// b- 是否使用连接对象中注册的监视器。
// callBack - 异步回调接口。
// ctx-传递上下文参数



// 案例:
package com.taopanfeng.zookeeper;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZKGetChid {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper;

@Before
public void before() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
// arg1:zookeeper服务器的ip地址和端口号
// arg2:连接的超时时间 以毫秒为单位
// arg3:监听器对象
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 使主线程阻塞等待
countDownLatch.await();
}

@After
public void after() throws Exception {
zooKeeper.close();
}

@Test
public void get1() throws Exception {
// arg1:节点的路径
List<String> list = zooKeeper.getChildren("/get", false);
for (String str : list) {
System.out.println(str);
}
}

@Test
public void get2() throws Exception {
// 异步用法
zooKeeper.getChildren("/get", false, new AsyncCallback.ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
// 0代表读取成功
System.out.println(rc);
// 节点的路径
System.out.println(path);
// 上下文参数对象
System.out.println(ctx);
// 子节点信息
for (String str : children) {
System.out.println(str);
}
}
},"I am Context");
Thread.sleep(10000);
System.out.println("结束");
}
}

检查节点是否存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// 同步方法
exists(String path, boolean b)
// 异步方法
exists(String path, boolean b,AsyncCallback.StatCallback callBack,Object ctx)
// path- znode路径。
// b- 是否使用连接对象中注册的监视器。
// callBack - 异步回调接口。
// ctx-传递上下文参数



// 案例:
package com.taopanfeng.zookeeper;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

public class ZKExists {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper;

@Before
public void before() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
// arg1:zookeeper服务器的ip地址和端口号
// arg2:连接的超时时间 以毫秒为单位
// arg3:监听器对象
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 使主线程阻塞等待
countDownLatch.await();
}

@After
public void after() throws Exception {
zooKeeper.close();
}

@Test
public void exists1() throws Exception {
// arg1:节点的路径
Stat stat=zooKeeper.exists("/exists1",false);
// 节点的版本信息
System.out.println(stat.getVersion());
}

@Test
public void exists2() throws Exception {
// 异步方式
zooKeeper.exists("/exists1", false, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
// 0 代表方式执行成功
System.out.println(rc);
// 节点的路径
System.out.println(path);
// 上下文参数
System.out.println(ctx);
// 节点的版本信息
System.out.println(stat.getVersion());
}
},"I am Context");
Thread.sleep(10000);
System.out.println("结束");
}
}

zookeeper 事件监听机制

2021-05-22 21:51:29

watcher概念

zookeeper提供了数据的发布/订阅功能,多个订阅者可同时监听某一特定主题对象,当该主题对象的自身状态发生变化时(例如节点内容改变、节点下的子节点列表改变等),会实时、主动通知所有订阅者

zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对 象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻 了客户端压力。

watcher机制实际上与观察者模式类似,也可看作是一种观察者模式在分布式场 景下的实现方式。

watcher架构

Watcher实现由三个部分组成:

  • Zookeeper服务端
  • Zookeeper客户端
  • 客户端的ZKWatchManager对象

客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watch管 理器中。当ZooKeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端, 接着客户端的Watch管理器会触发相关Watcher来回调相应处理逻辑,从而完成整体的数 据发布/订阅流程。

在这里插入图片描述

watcher特性

  1. 一次性:watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册
  2. 客户端顺序回调:watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该太多,以免影响别的watcher执行
  3. 轻量级:WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点路径,并不会告诉数据节点变化前后的具体内容;
  4. 时效性:watcher只有在当前session彻底失效时才会无效,若在session有效期内 快速重连成功,则watcher依然存在,仍可接收到通知;

watcher接口设计

Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。

Watcher内部包含了两个枚举类:KeeperState、EventType

在这里插入图片描述
在这里插入图片描述

  • Watcher通知状态(KeeperState)
    KeeperState是客户端与服务端连接状态发生变化时对应的通知类型。
    路径为 org.apache.zookeeper.Watcher.Event.KeeperState,是一个枚举类,其枚举属性 如下:
    1
    2
    3
    4
    5
    6
    枚举属性          说明
    ---------------------------------------
    SyncConnected 客户端与服务器正常连接时
    Disconnected 客户端与服务器断开连接时
    Expired 会话session失效时
    AuthFailed 身份认证失败时
  • Watcher事件类型(EventType)
    EventType是数据节点(znode)发生变化时对应的通知类型。
    EventType变化时 KeeperState永远处于SyncConnected通知状态下;
    当KeeperState发生变化时, EventType永远为None。
    其路径为org.apache.zookeeper.Watcher.Event.EventType, 是一个枚举类,枚举属性如下:
    1
    2
    3
    4
    5
    6
    7
    枚举属性              说明
    ------------------------------------------------------
    None 无
    NodeCreated Watcher监听的数据节点被创建时
    NodeDeleted Watcher监听的数据节点被删除时
    NodeDataChanged Watcher监听的数据节点内容发生变更时(无论内容数据是否变化)
    NodeChildrenChanged Watcher监听的数据节点的子节点列表发生变更时
    注:客户端接收到的相关事件通知中只包含状态及类型等信息,不包括节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需调用get等方法重新获取;

捕获相应的事件

上面讲到zookeeper客户端连接的状态和zookeeper对znode节点监听的事件类 型,下面我们来讲解如何建立zookeeper的watcher监听。
在zookeeper中采用下面的方式为某个znode注册监听。

  • zk.getChildren(path, watch)
  • zk.exists(path, watch)
  • zk.getData(path, watcher, stat)

下表以node-x节点为例,说明调用的注册方法和可监听事件间的关系:

1
2
3
4
5
注册方式                         Created    ChildrenChanged      Changed       Deleted
------------------------------------------------------------------------------------------
zk.exists(“/node- x”,watcher) 可监控 可监控 可监控
zk.getData(“/node- x”,watcher) 可监控 可监控
zk.getChildren(“/node- x”,watcher) 可监控 可监控

注册watcher的方法

连接状态

客服端与服务器的连接状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// KeeperState 通知状态
// SyncConnected:客户端与服务器正常连接时
// Disconnected:客户端与服务器断开连接时
// Expired:会话session失效时
// AuthFailed:身份认证失败时
//
// 事件类型为:None



// 案例
package com.taopanfeng.watcher;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class ZKConnectionWatcher implements Watcher {

// 计数器对象
static CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接对象
static ZooKeeper zooKeeper;

@Override
public void process(WatchedEvent event) {
try {
// 事件类型
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
} else if (event.getState() == Event.KeeperState.Disconnected) {
System.out.println("断开连接!");
} else if (event.getState() == Event.KeeperState.Expired) {
System.out.println("会话超时!");
zooKeeper = new ZooKeeper("192.168.60.130:2181", 5000, new ZKConnectionWatcher());
} else if (event.getState() == Event.KeeperState.AuthFailed) {
System.out.println("认证失败!");
}
}
} catch (Exception ex) {
ex.printStackTrace();
}

}

public static void main(String[] args) {
try {
zooKeeper = new ZooKeeper("192.168.60.130:2181", 5000, new ZKConnectionWatcher());
// 阻塞线程等待连接的创建
countDownLatch.await();
// 会话id
System.out.println(zooKeeper.getSessionId());
// 添加授权用户
zooKeeper.addAuthInfo("digest","taopanfeng:123456".getBytes());
byte [] bs=zooKeeper.getData("/node1",false,null);
System.out.println(new String(bs));
Thread.sleep(50000);
zooKeeper.close();
System.out.println("结束");
} catch (Exception ex) {
ex.printStackTrace();
}
}


}

检查节点是否存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// 使用连接对象的监视器
exists(String path, boolean b)
// 自定义监视器
exists(String path, Watcher w)

// NodeCreated:节点创建
// NodeDeleted:节点删除
// NodeDataChanged:节点内容发生变化

// path- znode路径。
// b- 是否使用连接对象中注册的监视器。
// w-监视器对象。



// 案例:
package com.taopanfeng.watcher;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZKWatcherExists {

String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper = null;

@Before
public void before() throws IOException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接zookeeper客户端
zooKeeper = new ZooKeeper(IP, 6000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数!");
// 连接成功
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
countDownLatch.await();
}

@After
public void after() throws InterruptedException {
zooKeeper.close();
}

@Test
public void watcherExists1() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:使用连接对象中的watcher
zooKeeper.exists("/watcher1", true);
Thread.sleep(50000);
System.out.println("结束");
}


@Test
public void watcherExists2() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:自定义watcher对象
zooKeeper.exists("/watcher1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
Thread.sleep(50000);
System.out.println("结束");
}

@Test
public void watcherExists3() throws KeeperException, InterruptedException {
// watcher一次性
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
zooKeeper.exists("/watcher1", this);
} catch (Exception ex) {
ex.printStackTrace();
}
}
};
zooKeeper.exists("/watcher1", watcher);
Thread.sleep(80000);
System.out.println("结束");
}


@Test
public void watcherExists4() throws KeeperException, InterruptedException {
// 注册多个监听器对象
zooKeeper.exists("/watcher1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("1");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
zooKeeper.exists("/watcher1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("2");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
Thread.sleep(80000);
System.out.println("结束");
}
}

查看节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// 使用连接对象的监视器
getData(String path, boolean b, Stat stat)
// 自定义监视器
getData(String path, Watcher w, Stat stat)

// NodeDeleted:节点删除
// NodeDataChanged:节点内容发生变化

// path- znode路径。
// b- 是否使用连接对象中注册的监视器。
// w-监视器对象。
// stat- 返回znode的元数据。



// 案例:
package com.taopanfeng.watcher;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZKWatcherGetData {

String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper = null;

@Before
public void before() throws IOException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接zookeeper客户端
zooKeeper = new ZooKeeper(IP, 6000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数!");
// 连接成功
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
countDownLatch.await();
}

@After
public void after() throws InterruptedException {
zooKeeper.close();
}

@Test
public void watcherGetData1() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:使用连接对象中的watcher
zooKeeper.getData("/watcher2", true, null);
Thread.sleep(50000);
System.out.println("结束");
}

@Test
public void watcherGetData2() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:自定义watcher对象
zooKeeper.getData("/watcher2", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
}, null);
Thread.sleep(50000);
System.out.println("结束");
}

@Test
public void watcherGetData3() throws KeeperException, InterruptedException {
// 一次性
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if(event.getType()==Event.EventType.NodeDataChanged) {
zooKeeper.getData("/watcher2", this, null);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
};
zooKeeper.getData("/watcher2", watcher, null);
Thread.sleep(50000);
System.out.println("结束");
}

@Test
public void watcherGetData4() throws KeeperException, InterruptedException {
// 注册多个监听器对象
zooKeeper.getData("/watcher2", new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("1");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if(event.getType()==Event.EventType.NodeDataChanged) {
zooKeeper.getData("/watcher2", this, null);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
},null);
zooKeeper.getData("/watcher2", new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("2");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if(event.getType()==Event.EventType.NodeDataChanged) {
zooKeeper.getData("/watcher2", this, null);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
},null);
Thread.sleep(50000);
System.out.println("结束");
}
}

查看子节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// 使用连接对象的监视器
getChildren(String path, boolean b)
// 自定义监视器
getChildren(String path, Watcher w)

// NodeChildrenChanged:子节点发生变化
// NodeDeleted:节点删除

// path- znode路径。
// b- 是否使用连接对象中注册的监视器。
// w-监视器对象。



// 案例:
package com.taopanfeng.watcher;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZKWatcherGetChild {
String IP = "192.168.60.130:2181";
ZooKeeper zooKeeper = null;

@Before
public void before() throws IOException, InterruptedException {
CountDownLatch connectedSemaphore = new CountDownLatch(1);
// 连接zookeeper客户端
zooKeeper = new ZooKeeper(IP, 6000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数!");
// 连接成功
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedSemaphore.countDown();
}
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
connectedSemaphore.await();
}

@After
public void after() throws InterruptedException {
zooKeeper.close();
}

@Test
public void watcherGetChild1() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:使用连接对象中的watcher
zooKeeper.getChildren("/watcher3", true);
Thread.sleep(50000);
System.out.println("结束");
}


@Test
public void watcherGetChild2() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:自定义watcher
zooKeeper.getChildren("/watcher3", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}
});
Thread.sleep(50000);
System.out.println("结束");
}

@Test
public void watcherGetChild3() throws KeeperException, InterruptedException {
// 一次性
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if (event.getType() == Event.EventType.NodeChildrenChanged) {
zooKeeper.getChildren("/watcher3", this);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
};
zooKeeper.getChildren("/watcher3", watcher);
Thread.sleep(50000);
System.out.println("结束");
}

@Test
public void watcherGetChild4() throws KeeperException, InterruptedException {
// 多个监视器对象
zooKeeper.getChildren("/watcher3", new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("1");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if (event.getType() == Event.EventType.NodeChildrenChanged) {
zooKeeper.getChildren("/watcher3", this);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
});

zooKeeper.getChildren("/watcher3", new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("2");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if (event.getType() == Event.EventType.NodeChildrenChanged) {
zooKeeper.getChildren("/watcher3", this);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
});
Thread.sleep(50000);
System.out.println("结束");
}
}

案例:配置中心

工作中有这样的一个场景: 数据库用户名和密码信息放在一个配置文件中,应用读取该配置文件,配置文件信息放入缓存。

若数据库的用户名和密码改变时候,还需要重新加载缓存,比较麻烦,通过 ZooKeeper可以轻松完成,当数据库发生变化时自动完成缓存同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// 设计思路:
// 1. 连接zookeeper服务器
// 2. 读取zookeeper中的配置信息,注册watcher监听器,存入本地变量
// 3. 当zookeeper中的配置信息发生变化时,通过watcher的回调方法捕获数据变化事件
// 4. 重新获取配置信息

// 案例:
package com.taopanfeng.example;

import java.util.concurrent.CountDownLatch;

import com.taopanfeng.watcher.ZKConnectionWatcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;

public class MyConfigCenter implements Watcher {

// zk的连接串
String IP = "192.168.60.130:2181";
// 计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接对象
static ZooKeeper zooKeeper;

// 用于本地化存储配置信息
private String url;
private String username;
private String password;

@Override
public void process(WatchedEvent event) {
try {
// 捕获事件状态
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
} else if (event.getState() == Event.KeeperState.Disconnected) {
System.out.println("连接断开!");
} else if (event.getState() == Event.KeeperState.Expired) {
System.out.println("连接超时!");
// 超时后服务器端已经将连接释放,需要重新连接服务器端
zooKeeper = new ZooKeeper("192.168.60.130:2181", 6000,
new ZKConnectionWatcher());
} else if (event.getState() == Event.KeeperState.AuthFailed) {
System.out.println("验证失败!");
}
// 当配置信息发生变化时
} else if (event.getType() == EventType.NodeDataChanged) {
initValue();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}

// 构造方法
public MyConfigCenter() {
initValue();
}


// 连接zookeeper服务器,读取配置信息
public void initValue() {
try {
// 创建连接对象
zooKeeper = new ZooKeeper(IP, 5000, this);
// 阻塞线程,等待连接的创建成功
countDownLatch.await();
// 读取配置信息
this.url = new String(zooKeeper.getData("/config/url", true, null));
this.username = new String(zooKeeper.getData("/config/username", true, null));
this.password = new String(zooKeeper.getData("/config/password", true, null));
} catch (Exception ex) {
ex.printStackTrace();
}
}


public static void main(String[] args) {
try {
MyConfigCenter myConfigCenter = new MyConfigCenter();
for (int i = 1; i <= 20; i++) {
Thread.sleep(5000);
System.out.println("url:"+myConfigCenter.getUrl());
System.out.println("username:"+myConfigCenter.getUsername());
System.out.println("password:"+myConfigCenter.getPassword());
System.out.println("########################################");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}


public String getUrl() { return url; }
public void setUrl(String url) { this.url = url; }

public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }

public String getPassword() { return password; }
public void setPassword(String password) { this.password = password; }

}

案例:生成分布式唯一ID

在过去的单库单表型系统中,通常可以使用数据库字段自带的auto_increment 属性来自动为每条记录生成一个唯一的ID。
但是分库分表后,就无法在依靠数据库的 auto_increment属性来唯一标识一条记录了。
此时我们就可以用zookeeper在分布式环 境下生成全局唯一ID。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// 设计思路:
// 1.连接zookeeper服务器
// 2.指定路径生成临时有序节点
// 3.取序列号及为分布式环境下的唯一ID



// 案例:
package com.taopanfeng.example;

import java.util.concurrent.CountDownLatch;

import com.taopanfeng.watcher.ZKConnectionWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class GloballyUniqueId implements Watcher {
// zk的连接串
String IP = "192.168.60.130:2181";
// 计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
// 用户生成序号的节点
String defaultPath = "/uniqueId";
// 连接对象
ZooKeeper zooKeeper;

@Override
public void process(WatchedEvent event) {
try {
// 捕获事件状态
if (event.getType() == Watcher.Event.EventType.None) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
} else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
System.out.println("连接断开!");
} else if (event.getState() == Watcher.Event.KeeperState.Expired) {
System.out.println("连接超时!");
// 超时后服务器端已经将连接释放,需要重新连接服务器端
zooKeeper = new ZooKeeper(IP, 6000,
new ZKConnectionWatcher());
} else if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
System.out.println("验证失败!");
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}

// 构造方法
public GloballyUniqueId() {
try {
//打开连接
zooKeeper = new ZooKeeper(IP, 5000, this);
// 阻塞线程,等待连接的创建成功
countDownLatch.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}

// 生成id的方法
public String getUniqueId() {
String path = "";
try {
//创建临时有序节点
path = zooKeeper.create(defaultPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (Exception ex) {
ex.printStackTrace();
}
// /uniqueId0000000001
return path.substring(9);
}

public static void main(String[] args) {
GloballyUniqueId globallyUniqueId = new GloballyUniqueId();
for (int i = 1; i <= 5; i++) {
String id = globallyUniqueId.getUniqueId();
System.out.println(id);
}
}

}

案例:分布式锁

分布式锁有多种实现方式,比如通过数据库、redis都可实现。作为分布式协同 工具ZooKeeper,当然也有着标准的实现方式。

下面介绍在zookeeper中如何实现排他锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// 设计思路:
// 1.每个客户端往/Locks下创建临时有序节点/Locks/Lock 000000001
// 2.客户端取得/Locks下子节点,并进行排序,判断排在最前面的是否为自己,如果自己的锁节点在第一位,代表获取锁成功
// 3.如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点 Lock 000000001
// 4.当前一位锁节点(Lock 000000002)的逻辑
// 5.监听客户端重新执行第2步逻辑,判断自己是否获得了锁



// 案例:
package com.taopanfeng.example;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class MyLock {
// zk的连接串
String IP = "192.168.60.130:2181";
// 计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//ZooKeeper配置信息
ZooKeeper zooKeeper;
private static final String LOCK_ROOT_PATH = "/Locks";
private static final String LOCK_NODE_NAME = "Lock_";
private String lockPath;

// 打开zookeeper连接
public MyLock() {
try {
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功!");
countDownLatch.countDown();
}
}
}
});
countDownLatch.await();
} catch (Exception ex) {
ex.printStackTrace();
}
}

//获取锁
public void acquireLock() throws Exception {
//创建锁节点
createLock();
//尝试获取锁
attemptLock();
}

//创建锁节点
private void createLock() throws Exception {
//判断Locks是否存在,不存在创建
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
zooKeeper.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 创建临时有序节点
lockPath = zooKeeper.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("节点创建成功:" + lockPath);
}

//监视器对象,监视上一个节点是否被删除
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
synchronized (this) {
notifyAll();
}
}
}
};

//尝试获取锁
private void attemptLock() throws Exception {
// 获取Locks节点下的所有子节点
List<String> list = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
// 对子节点进行排序
Collections.sort(list);
// /Locks/Lock_000000001
int index = list.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
if (index == 0) {
System.out.println("获取锁成功!");
return;
} else {
// 上一个节点的路径
String path = list.get(index - 1);
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + path, watcher);
if (stat == null) {
attemptLock();
} else {
synchronized (watcher) {
watcher.wait();
}
attemptLock();
}
}

}

//释放锁
public void releaseLock() throws Exception {
//删除临时有序节点
zooKeeper.delete(this.lockPath,-1);
zooKeeper.close();
System.out.println("锁已经释放:"+this.lockPath);
}

public static void main(String[] args) {
try {
MyLock myLock = new MyLock();
myLock.createLock();
} catch (Exception ex) {
ex.printStackTrace();
}

}
}









package com.taopanfeng.example;

public class TicketSeller {

private void sell(){
System.out.println("售票开始");
// 线程随机休眠数毫秒,模拟现实中的费时操作
int sleepMillis = 5000;
try {
//代表复杂逻辑执行了一段时间
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("售票结束");
}

public void sellTicketWithLock() throws Exception {
MyLock lock = new MyLock();
// 获取锁
lock.acquireLock();
sell();
//释放锁
lock.releaseLock();
}
public static void main(String[] args) throws Exception {
TicketSeller ticketSeller = new TicketSeller();
for(int i=0;i<10;i++){
ticketSeller.sellTicketWithLock();
}
}
}

zookeeper 集群搭建

单机环境下,jdk、zookeeper 安装完毕,基于一台虚拟机,进行zookeeper伪集群搭建,zookeeper集群中包含3个节点,节点对外提供服务端口号分别为2181、2182、2183

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 1. 基于zookeeper-3.4.10复制三份zookeeper安装好的服务器文件,
# 目录名称分别为 zookeeper2181、zookeeper2182、zookeeper2183
cp ‐r zookeeper‐3.4.10 zookeeper2181
cp ‐r zookeeper‐3.4.10 zookeeper2182
cp ‐r zookeeper‐3.4.10 zookeeper2183


# 2. 修改zookeeper2181服务器对应配置文件。
#服务器对应端口号
clientPort=2181
#数据快照文件所在路径
dataDir=/home/zookeeper/zookeeper2181/data
#集群配置信息
#server.A=B:C:D
#A:是一个数字,表示这个是服务器的编号
#B:是这个服务器的ip地址
#C:Zookeeper服务器之间的通信端口
#D:Leader选举的端口
server.1=192.168.60.130:2287:3387
server.2=192.168.60.130:2288:3388
server.3=192.168.60.130:2289:3389


# 3. 在 上一步 dataDir 指定的目录下,创建 myid 文件,然后在该文件添加上一步 server 配置的对应 A 数字。
#zookeeper2181对应的数字为1
#/home/zookeeper/zookeeper2181/data目录下执行命令
echo "1" > myid

# 4. zookeeper2182、zookeeper2183参照步骤2、3进行相应配置

# 5. 分别启动三台服务器,检验集群状态 登录命令:
./zkCli.sh ‐server 192.168.60.130:2181
./zkCli.sh ‐server 192.168.60.130:2182
./zkCli.sh ‐server 192.168.60.130:2183

一致性协议:zab协议

zab协议 的全称是 Zookeeper Atomic Broadcast (zookeeper原子广播)。

zookeeper 是通过 zab协议来保证分布式事务的最终一致性

基于zab协议,zookeeper集群中的角色主要有以下三类,如下表所示:

zab广播模式工作原理,通过类似两阶段提交协议的方式解决数据一致性:

1
2
3
4
5
6
1. leader从客户端收到一个写请求
2. leader生成一个新的事务并为这个事务生成一个唯一的ZXID
3. leader将这个事务提议(propose)发送给所有的follows节点
4. follower节点将收到的事务请求加入到历史队列(history queue)中,并发送ack给 leader
5. 当leader收到大多数follower(半数以上节点)的ack消息,leader会发送commit请 求
6. 当follower收到commit请求时,从历史队列中将事务请求commit

zookeeper的leader选举

服务器状态

  • looking:寻找leader状态。当服务器处于该状态时,它会认为当前集群中没有 leader,因此需要进入leader选举状态。
  • leading: 领导者状态。表明当前服务器角色是leader。
  • following: 跟随者状态。表明当前服务器角色是follower。
  • observing:观察者状态。表明当前服务器角色是observer。

启动时Leader选举

服务器启动时期的leader选举

在集群初始化阶段,当有一台服务器server1启动时,其单独无法进行和完成 leader选举,当第二台服务器server2启动时,此时两台机器可以相互通信,每台机器都 试图找到leader,于是进入leader选举过程。

选举过程如下:

  1. 每个server发出一个投票。由于是初始情况,server1和server2都会将自己作为 leader服务器来进行投票,每次投票会包含所推举的服务器的myid和zxid,使用 (myid, zxid)来表示,此时server1的投票为(1, 0),server2的投票为(2, 0),然后各自 将这个投票发给集群中其他机器。
  2. 集群中的每台服务器接收来自集群中各个服务器的投票。
  3. 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行pk,pk。规则如下:
    、优先检查zxid。zxid比较大的服务器优先作为leader。
    、如果zxid相同,那么就比较myid。myid较大的服务器作为leader服务器。 对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较两者的zxid,均为0,再比较myid,此时server2的myid最大,于是更新自己的投票 为(2, 0),然后重新投票,对于server2而言,其无须更新自己的投票,只是再次向集 群中所有机器发出上一次投票信息即可。
  4. 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到 相同的投票信息,对于server1、server2而言,都统计出集群中已经有两台机器接受 了(2, 0)的投票信息,此时便认为已经选出了leader
  5. 改变服务器状态。一旦确定了leader,每个服务器就会更新自己的状态,如果是 follower,那么就变更为following,如果是leader,就变更为leading。

运行时Leader选举

服务器运行时期的Leader选举

在zookeeper运行期间,leader与非leader服务器各司其职,即便当有非leader 服务器宕机或新加入,此时也不会影响leader。
但是一旦leader服务器挂了,那么整个集群将暂停对外服务,进入新一轮leader选举,其过程和启动时期的Leader选举过程基本 一致。

假设正在运行的有server1、server2、server3三台服务器,当前leader是 server2,若某一时刻leader挂了,此时便开始Leader选举。

选举过程如下:

  1. 变更状态。leader挂后,余下的服务器都会将自己的服务器状态变更为looking,然后开始进入leader选举过程。
  2. 每个server会发出一个投票。在运行期间,每个服务器上的zxid可能不同,此时假定 server1的zxid为122,server3的zxid为122,在第一轮投票中,server1和server3 都会投自己,产生投票(1, 122),(3, 122),然后各自将投票发送给集群中所有机器。
  3. 接收来自各个服务器的投票。与启动时过程相同
  4. 处理投票。与启动时过程相同,此时,server3将会成为leader。
  5. 统计投票。与启动时过程相同。
  6. 改变服务器的状态。与启动时过程相同。

observer角色及其配置

observer角色特点:

  1. 不参与集群的leader选举
  2. 不参与集群中写数据时的ack反馈
1
2
3
4
5
6
# 为了使用observer角色,在任何想变成observer角色的配置文件中加入如下配 置:
peerType=observer

# 并在所有server的配置文件中,配置成observer模式的server的那行配置追 加:observer
# 例如:
server.3=192.168.60.130:2289:3389:observer

zookeeperAPI连接集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher)
// connectionString - zooKeeper集合主机。
// sessionTimeout - 会话超时(以毫秒为单位)。
// watcher - 实现“监视器”界面的对象。ZooKeeper集合通过监视器对象返回连接状态。



// 案例:
package com.taopanfeng.zookeeper;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;

public class ZookeeperConnection {
public static void main(String[] args) {
try {
// 计数器对象
CountDownLatch countDownLatch=new CountDownLatch(1);
// arg1:服务器的ip和端口
// arg2:客户端与服务器之间的会话超时时间 以毫秒为单位的
// arg3:监视器对象
ZooKeeper zooKeeper=new ZooKeeper("192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState()==Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
}
}
});
// 主线程阻塞等待连接对象的创建成功
countDownLatch.await();
// 会话编号
System.out.println(zooKeeper.getSessionId());
zooKeeper.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

curator java api

2021-05-22 22:46:36
zookeeper 开源客户端curator介绍

curator简介

curator是Netflix公司开源的一个zookeeper客户端,后捐献给apache, curator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。

提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、 共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最 好用,最流行的zookeeper的客户端。

1
2
3
4
5
6
7
8
9
10
11
12
# 原生zookeeperAPI的不足:
+ 连接对象异步创建,需要开发人员自行编码等待
+ 连接没有自动重连超时机制
+ watcher一次注册生效一次
+ 不支持递归创建树形节点

# curator特点:
+ 解决session会话超时重连
+ watcher反复注册
+ 简化开发api
+ 遵循Fluent风格的API
+ 提供了分布式锁服务、共享计数器、缓存机制等机制

maven依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.6.0</version>
<type>jar</type>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.6.0</version>
<type>jar</type>
</dependency>

连接到ZK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.taopanfeng.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryOneTime;

public class CuratorConnection {
public static void main(String[] args) {
// session重连策略
/*
3秒后重连一次,只重连1次
RetryPolicy retryPolicy = new RetryOneTime(3000);
*/
/*
每3秒重连一次,重连3次
RetryPolicy retryPolicy = new RetryNTimes(3,3000);
*/
/*
每3秒重连一次,总等待时间超过10秒后停止重连
RetryPolicy retryPolicy=new RetryUntilElapsed(10000,3000);
*/
// baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

// 创建连接对象
CuratorFramework client= CuratorFrameworkFactory.builder()
// IP地址端口号
.connectString("192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183")
// 会话超时时间
.sessionTimeoutMs(5000)
// 重连机制
.retryPolicy(retryPolicy)
// 命名空间
.namespace("create")
// 构建连接对象
.build();
// 打开连接
client.start();
System.out.println(client.isStarted());
// 关闭连接
client.close();
}
}

新增节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package com.taopanfeng.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;


public class CuratorCreate {

String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
CuratorFramework client;

@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(IP)
.sessionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("create")
.build();
client.start();
}

@After
public void after() {
client.close();
}

@Test
public void create1() throws Exception {
// 新增节点
client.create()
// 节点的类型
.withMode(CreateMode.PERSISTENT)
// 节点的权限列表 world:anyone:cdrwa
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
// arg1:节点的路径
// arg2:节点的数据
.forPath("/node1", "node1".getBytes());
System.out.println("结束");
}


@Test
public void create2() throws Exception {
// 自定义权限列表
// 权限列表
List<ACL> list = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("ip", "192.168.60.130");
list.add(new ACL(ZooDefs.Perms.ALL, id));
client.create().withMode(CreateMode.PERSISTENT).withACL(list).forPath("/node2", "node2".getBytes());
System.out.println("结束");
}

@Test
public void create3() throws Exception {
// 递归创建节点树
client.create()
// 递归节点的创建
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/node3/node31", "node31".getBytes());
System.out.println("结束");
}


@Test
public void create4() throws Exception {
// 异步方式创建节点
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
// 异步回调接口
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点的路径
System.out.println(curatorEvent.getPath());
// 时间类型
System.out.println(curatorEvent.getType());
}
})
.forPath("/node4","node4".getBytes());
Thread.sleep(5000);
System.out.println("结束");
}
}

更新节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.taopanfeng.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorSet {

String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
CuratorFramework client;

@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(IP)
.sessionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("set").build();
client.start();
}

@After
public void after() {
client.close();
}

@Test
public void set1() throws Exception {
// 更新节点
client.setData()
// arg1:节点的路径
// arg2:节点的数据
.forPath("/node1", "node11".getBytes());
System.out.println("结束");
}

@Test
public void set2() throws Exception {
client.setData()
// 指定版本号
.withVersion(2)
.forPath("/node1", "node1111".getBytes());
System.out.println("结束");
}

@Test
public void set3() throws Exception {
// 异步方式修改节点数据
client.setData()
.withVersion(-1).inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点的路径
System.out.println(curatorEvent.getPath());
// 事件的类型
System.out.println(curatorEvent.getType());
}
}).forPath("/node1", "node1".getBytes());
Thread.sleep(5000);
System.out.println("结束");
}
}

删除节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.taopanfeng.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;

public class CuratorDelete {

String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
CuratorFramework client;

@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(IP)
.sessionTimeoutMs(10000)
.retryPolicy(retryPolicy)
.namespace("delete").build();
client.start();
}

@After
public void after() {
client.close();
}

@Test
public void delete1() throws Exception {
// 删除节点
client.delete()
// 节点的路径
.forPath("/node1");
System.out.println("结束");
}


@Test
public void delete2() throws Exception {
client.delete()
// 版本号
.withVersion(0)
.forPath("/node1");
System.out.println("结束");
}

@Test
public void delete3() throws Exception {
//删除包含字节点的节点
client.delete()
.deletingChildrenIfNeeded()
.withVersion(-1)
.forPath("/node1");
System.out.println("结束");
}

@Test
public void delete4() throws Exception {
// 异步方式删除节点
client.delete()
.deletingChildrenIfNeeded()
.withVersion(-1)
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点路径
System.out.println(curatorEvent.getPath());
// 事件类型
System.out.println(curatorEvent.getType());
}
})
.forPath("/node1");
Thread.sleep(5000);
System.out.println("结束");
}
}

查看节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.taopanfeng.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorGet {

String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
CuratorFramework client;

@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(IP)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("get").build();
client.start();
}

@After
public void after() {
client.close();
}

@Test
public void get1() throws Exception {
// 读取节点数据
byte [] bys=client.getData()
// 节点的路径
.forPath("/node1");
System.out.println(new String(bys));
}

@Test
public void get2() throws Exception {
// 读取数据时读取节点的属性
Stat stat=new Stat();
byte [] bys=client.getData()
// 读取属性
.storingStatIn(stat)
.forPath("/node1");
System.out.println(new String(bys));
System.out.println(stat.getVersion());
}

@Test
public void get3() throws Exception {
// 异步方式读取节点的数据
client.getData()
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点的路径
System.out.println(curatorEvent.getPath());
// 事件类型
System.out.println(curatorEvent.getType());
// 数据
System.out.println(new String(curatorEvent.getData()));
}
})
.forPath("/node1");
Thread.sleep(5000);
System.out.println("结束");
}
}

查看子节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.taopanfeng.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

public class CuratorGetChild {


String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
CuratorFramework client;

@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(IP)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.build();
client.start();
}

@After
public void after() {
client.close();
}

@Test
public void getChild1() throws Exception {
// 读取子节点数据
List<String> list = client.getChildren()
// 节点路径
.forPath("/get");
for (String str : list) {
System.out.println(str);
}
}

@Test
public void getChild2() throws Exception {
// 异步方式读取子节点数据
client.getChildren()
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点路径
System.out.println(curatorEvent.getPath());
// 事件类型
System.out.println(curatorEvent.getType());
// 读取子节点数据
List<String> list=curatorEvent.getChildren();
for (String str : list) {
System.out.println(str);
}
}
})
.forPath("/get");
Thread.sleep(5000);
System.out.println("结束");
}
}

检查节点是否存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.taopanfeng.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorExists {

String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
CuratorFramework client;

@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(IP)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("get").build();
client.start();
}

@After
public void after() {
client.close();
}

@Test
public void exists1() throws Exception {
// 判断节点是否存在
Stat stat= client.checkExists()
// 节点路径
.forPath("/node2");
System.out.println(stat.getVersion());
}

@Test
public void exists2() throws Exception {
// 异步方式判断节点是否存在
client.checkExists()
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点路径
System.out.println(curatorEvent.getPath());
// 事件类型
System.out.println(curatorEvent.getType());
System.out.println(curatorEvent.getStat().getVersion());
}
})
.forPath("/node2");
Thread.sleep(5000);
System.out.println("结束");
}
}

watcherAPI

curator提供了两种Watcher(Cache)来监听结点的变化

  1. Node Cache : 只是监听某一个特定的节点,监听节点的新增和修改
  2. PathChildren Cache : 监控一个ZNode的子节点。当一个子节点增加, 更新、删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package com.taopanfeng.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorWatcher {

String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
CuratorFramework client;

@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory
.builder()
.connectString(IP)
.sessionTimeoutMs(10000)
.retryPolicy(retryPolicy)
.build();
client.start();
}

@After
public void after() {
client.close();
}


@Test
public void watcher1() throws Exception {
// 监视某个节点的数据变化
// arg1:连接对象
// arg2:监视的节点路径
final NodeCache nodeCache=new NodeCache(client,"/watcher1");
// 启动监视器对象
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
// 节点变化时回调的方法
public void nodeChanged() throws Exception {
System.out.println(nodeCache.getCurrentData().getPath());
System.out.println(new String(nodeCache.getCurrentData().getData()));
}
});
Thread.sleep(100000);
System.out.println("结束");
//关闭监视器对象
nodeCache.close();
}

@Test
public void watcher2() throws Exception {
// 监视子节点的变化
// arg1:连接对象
// arg2:监视的节点路径
// arg3:事件中是否可以获取节点的数据
PathChildrenCache pathChildrenCache=new PathChildrenCache(client,"/watcher1",true);
// 启动监听
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
// 当子节点方法变化时回调的方法
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
// 节点的事件类型
System.out.println(pathChildrenCacheEvent.getType());
// 节点的路径
System.out.println(pathChildrenCacheEvent.getData().getPath());
// 节点数据
System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
}
});
Thread.sleep(100000);
System.out.println("结束");
// 关闭监听
pathChildrenCache.close();

}
}

事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.taopanfeng.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorTransaction {

String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
CuratorFramework client;

@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(IP)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("create").build();
client.start();
}

@After
public void after() {
client.close();
}

@Test
public void tra1() throws Exception {
// 开启事务
client.inTransaction()
.create().forPath("/node1","node1".getBytes())
.and()
.create().forPath("/node2","node2".getBytes())
.and()
//事务提交
.commit();
}
}

分布式锁

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.taopanfeng.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class CuratorLock {

String IP = "192.168.60.130:2181,192.168.60.130:2182,192.168.60.130:2183";
CuratorFramework client;

@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory
.builder()
.connectString(IP)
.sessionTimeoutMs(10000)
.retryPolicy(retryPolicy)
.build();
client.start();
}

@After
public void after() {
client.close();
}


@Test
public void lock1() throws Exception {
// 排他锁
// arg1:连接对象
// arg2:节点路径
InterProcessLock interProcessLock = new InterProcessMutex(client, "/lock1");
System.out.println("等待获取锁对象!");
// 获取锁
interProcessLock.acquire();
for (int i = 1; i <= 10; i++) {
Thread.sleep(3000);
System.out.println(i);
}
// 释放锁
interProcessLock.release();
System.out.println("等待释放锁!");
}

@Test
public void lock2() throws Exception {
// 读写锁
InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client, "/lock1");
// 获取读锁对象
InterProcessLock interProcessLock=interProcessReadWriteLock.readLock();
System.out.println("等待获取锁对象!");
// 获取锁
interProcessLock.acquire();
for (int i = 1; i <= 10; i++) {
Thread.sleep(3000);
System.out.println(i);
}
// 释放锁
interProcessLock.release();
System.out.println("等待释放锁!");
}

@Test
public void lock3() throws Exception {
// 读写锁
InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client, "/lock1");
// 获取写锁对象
InterProcessLock interProcessLock=interProcessReadWriteLock.writeLock();
System.out.println("等待获取锁对象!");
// 获取锁
interProcessLock.acquire();
for (int i = 1; i <= 10; i++) {
Thread.sleep(3000);
System.out.println(i);
}
// 释放锁
interProcessLock.release();
System.out.println("等待释放锁!");
}

}

四字监控命令

zookeeper四字监控命令

zooKeeper支持某些特定的四字命令与其的交互。它们大多是查询命令,用来 获取 zooKeeper服务的当前状态及相关信息。

用户在客户端可以通过 telnet 或 nc 向 zooKeeper提交相应的命令。

zooKeeper常用四字命令见下表 所示:

  1. conf 输出相关服务配置的详细信息。比如端口、zk数据及日志配置路径、最大 连接数,session超时时间、serverId等
  2. cons 列出所有连接到这台服务器的客户端连接/会话的详细信息。包括“接受/发 送”的包数量、session id 、操作延迟、最后的操作执行等信息
  3. crst 重置当前这台服务器所有连接/会话的统计信息
  4. dump 列出未经处理的会话和临时节点
  5. envi 输出关于服务器的环境详细信息
  6. ruok 测试服务是否处于正确运行状态。如果正常返回”imok”,否则返回空
  7. stat 输出服务器的详细信息:接收/发送包数量、连接数、模式 (leader/follower)、节点总数、延迟。 所有客户端的列表
  8. srst 重置server状态
  9. wchs 列出服务器watches的简洁信息:连接总数、watching节点总数和 watches总数
  10. wchc 通过session分组,列出watch的所有节点,它的输出是一个与 watch 相关 的会话的节点列表
  11. mntr 列出集群的健康状态。包括“接受/发送”的包数量、操作延迟、当前服务模 式(leader/follower)、节点总数、watch总数、临时节点总数
1
2
3
4
5
# nc命令工具安装
yum -y install nc

# 使用方式,在shell终端输入
echo mntr | nc localhost 2181

conf

conf:输出相关服务配置的详细信息

shell终端输入:echo conf | nc localhost 2181

属性 含义

  1. clientPort 客户端端口号
  2. dataDir 数据快照文件目录 默认情况下10W次事务操作生成一次 快照
  3. dataLogDir 事物日志文件目录,生产环境中放在独立的磁盘上
  4. tickTime 服务器之间或客户端与服务器之间维持心跳的时间间隔(以 毫秒为单位)
  5. maxClientCnxns 最大连接数
  6. minSessionTimeout 最小session超时 minSessionTimeout=tickTime*2
  7. maxSessionTimeout 最大session超时 maxSessionTimeout=tickTime*20
  8. serverId 服务器编号
  9. initLimit 集群中的follower服务器(F)与leader服务器(L)之间初始连接 时能容忍的最多心跳数
  10. syncLimit 集群中的follower服务器(F)与leader服务器(L)之间 请求和 应答之间能容忍的最多心跳数
  11. electionAlg 0:基于UDP的LeaderElection 1:基于UDP的 FastLeaderElection 2:基于UDP和认证的 FastLeaderElection 3:基于TCP的FastLeaderElection 在 1. 3.4.10版本中,默认值为3另外三种算法已经被弃用,并且 有计划在之后的版本中将它们彻底删除而不再支持
  12. electionPort 选举端口
  13. quorumPort 数据通信端口
  14. peerType 是否为观察者 1为观察者

cons

cons:列出所有连接到这台服务器的客户端连接/会话的详细信息

shell终端输入:echo cons | nc localhost 2181

  1. ip ip地址
  2. port 端口号
  3. queued 等待被处理的请求数,请求缓存在队列中
  4. received 收到的包数
  5. sent 发送的包数
  6. sid 会话id
  7. lop 最后的操作 GETD-读取数据 DELE-删除数据 CREA-创建数据
  8. est 连接时间戳
  9. to 超时时间
  10. lcxid 当前会话的操作id
  11. lzxid 最大事务id
  12. lresp 最后响应时间戳
  13. llat 最后/最新 延时
  14. minlat 最小延时
  15. maxlat 最大延时
  16. avglat 平均延时

crst

crst:重置当前这台服务器所有连接/会话的统计信息

shell终端输入:echo crst| nc localhost 2181

dump

dump:列出未经处理的会话和临时节点

shell终端输入:echo dump| nc localhost 2181

session id:znode path(1对多 , 处于队列中排队的session和临时节点)

envi

envi:输出关于服务器的环境配置信息

shell终端输入:echo envi | nc localhost 2181

  1. zookeeper.version 版本
  2. host.name host信息
  3. java.version java版本
  4. java.vendor 供应商
  5. java.home 运行环境所在目录
  6. java.class.path classpath
  7. java.library.path 第三方库指定非java类包的位置(如:dll,so)
  8. java.io.tmpdir 默认的临时文件路径 java.compiler JIT 编译器的名称
  9. os.name Linux os.arch amd64
  10. os.version 3.10.0-514.el7.x86_64
  11. user.name zookeeper
  12. user.home /home/zookeeper
  13. user.dir /home/zookeeper/zookeeper2181/bin

ruok

ruok:测试服务是否处于正确运行状态

shell终端输入:echo ruok | nc localhost 2181

stat

stat:输出服务器的详细信息与srvr相似,但是多了每个连接的会话信息

shell终端输入:echo stat | nc localhost 2181

  • Zookeeper version 版本
  • Latency min/avg/max 延时
  • Received 收包
  • Sent 发包
  • Connections 连接数
  • Outstanding 堆积数
  • Zxid 最大事物id
  • Mode 服务器角色
  • Node count 节点数

srst

srst:重置server状态

shell终端输入:echo srst | nc localhost 2181

wchs

wchs:列出服务器watches的简洁信息

shell终端输入:echo wchs | nc localhost 2181

  1. connectsions 连接数
  2. watch-paths watch节点数
  3. watchers watcher数量

wchc

wchc:通过session分组,列出watch的所有节点,它的输出的是一个与 watch 相关 的会话的节点列表

shell终端输入:echo wchc | nc localhost 2181

  • 问题:
    wchc is not executed because it is not in the whitelist.
    没有执行WCHC,因为它不在白名单中。

  • 解决方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 修改启动指令 zkServer.sh

    # 注意找到这个信息
    else
    echo "JMX disabled by user request" >&2
    ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
    fi


    # 下面添加如下信息
    ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"

wchp

wchp:通过路径分组,列出所有的 watch 的session id信息

shell终端输入:echo wchp | nc localhost 2181

  • 问题:
    wchp is not executed because it is not in the whitelist.
    没有执行WCHP,因为它不在白名单中。

  • 解决方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # 修改启动指令 zkServer.sh

    # 注意找到这个信息
    else
    echo "JMX disabled by user request" >&2
    ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
    fi

    # 下面添加如下信息
    ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"

mntr

mntr:列出服务器的健康状态

shell终端输入:echo mntr | nc localhost 2181

  1. zk_version 版本
  2. zk_avg_latency 平均延时
  3. zk_max_latency 最大延时
  4. zk_min_latency 最小延时
  5. zk_packets_received 收包数
  6. zk_packets_sent 发包数
  7. zk_num_alive_connections 连接数
  8. zk_outstanding_requests 堆积请求数
  9. zk_server_state leader/follower 状态
  10. zk_znode_count znode数量
  11. zk_watch_count watch数量
  12. zk_ephemerals_count 临时节点(znode)
  13. zk_approximate_data_size 数据大小
  14. zk_open_file_descriptor_count 打开的文件描述符数量
  15. zk_max_file_descriptor_count 最大文件描述符数量

ZooInspector图形化客户端

ZooInspector下载地址:https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip

解压后进入目录 ZooInspector\build,运行:java -jar zookeeper-dev-ZooInspector.jar # 执行命令如下
在这里插入图片描述

点击左上角连接按钮,输入zk服务地址:ip或者主机名:2181
在这里插入图片描述

点击OK,即可查看ZK节点信息
在这里插入图片描述

taokeeper监控工具

基于zookeeper的监控管理工具taokeeper,由淘宝团队开源的zk管理中间件, 安装前要求服务前先配置nc 和 sshd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 1.下载数据库脚本
wget https://github.com/downloads/alibaba/taokeeper/taokeeper.sql

# 2.下载主程序
wget https://github.com/downloads/alibaba/taokeeper/taokeeper- monitor.tar.gz

# 3.下载配置文件
wget https://github.com/downloads/alibaba/taokeeper/taokeeper-monitor- config.properties

# 4.配置 taokeeper-monitor-config.properties
#Daily
systemInfo.envName=DAILY
#DBCP
dbcp.driverClassName=com.mysql.jdbc.Driver
#mysql连接的ip地址端口号
dbcp.dbJDBCUrl=jdbc:mysql://192.168.60.130:3306/taokeeper
dbcp.characterEncoding=GBK
#用户名
dbcp.username=root
#密码
dbcp.password=root
dbcp.maxActive=30
dbcp.maxIdle=10
dbcp.maxWait=10000
#SystemConstant
#用户存储内部数据的文件夹
#创建/home/zookeeper/taokeeperdata/ZooKeeperClientThroughputStat
SystemConstent.dataStoreBasePath=/home/zookeeper/taokeeperdata
#ssh用户
SystemConstant.userNameOfSSH=zookeeper
#ssh密码
SystemConstant.passwordOfSSH=zookeeper
#Optional
SystemConstant.portOfSSH=22

# 5.安装配置 tomcat,修改catalina.sh
#指向配置文件所在的位置
JAVA_OPTS=-DconfigFilePath="/home/zookeeper/taokeeper-monitor-tomcat/webapps/ROOT/conf/taokeeper-monitor-config.properties"

# 6.部署工程启动

在这里插入图片描述