Press "Enter" to skip to content

分布式协调服务框架ZooKeeper

ZooKeeper 简介

ZooKeeper 是 Apache 软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。 ZooKeeper 曾经是 Hadoop 的一个子项目,但现在是一个顶级独立的开源项目。

所谓分布式协调主要是来解决分布式系统中多个进程之间的同步限制,防止出现脏读,例如我们常说的分布式锁。

数据存储

ZooKeeper中的数据是存储在内存当中的,因此它的效率十分高效。它内部的存储方式十分类似于文件存储结构,采用了分层存储结构。但是它和文件存储结构的区别是,它的各个节点中是允许存储数据的,需要注意的是zk的每个节点存储数据不能超过1M。

我们可以通过不同的路径访问到不同的节点,因为它是分层结构,我们也可以通过某一个父节点,获取到该节点下的所有子节点信息。

节点类型

永久性节点 – PERSISTENT

永久性节点即创建以后,在不执行delete命令的前提下,该节点是永久存在的。

永久性顺序节点 – PERSISTENT_SEQUENTIAL

这类节点的基本特性和上面的节点类 型是一致的。额外的特性是,在 ZooKeeper 中,每个父节点会为他的第一级子节点维护一份时序, 会记录每个子节点创建的先后顺序。

临时性节点 – EPHEMERAL

临时节点与session有关,每个客户端与zk建立链接的时候会生成一个session,这个session不会因为ZooKeeper服务器节点的变化而变化,只有当客户端断开连接以后,该session才会消失,而临时节点会随着session的消失而消失。

临时自动编号节点 – EPHEMERAL_SEQUENTIAL

此节点是属于临时节点,不过带有顺序,客户端会话结束节点就消失。

监听机制

ZooKeeper拥有watch机制,也就是监视机制,可以支持响应式编程模式,它可以对某个路径的终节点及其子节点的变更进行监视,当其发生变更以后,会调用注册的callback方法,然后进行具体的业务逻辑。例如监测路径为/A/A1,那么它会加测A1节点,以及附属于A1的所有子节点,这个子不单单只一层子节点,是指所有层的子节点。

重要特性

顺序一致性

来自客户端的相关指令会按照顺序执行,不会出现乱序的情况,客户端发送到服务的指令1->2->3->4,那个这些指令就会按照顺序执行;

原子性

更新只有成功和失败,没有中间状态;

可靠性

也可以称之为持久性,节点更新以后,在下次更新之前,它的数据不会发生变更;

准实时性

也可以称之为最终一致性,在ZooKeeper集群中,一个客户端修改了其中的一个节点,一定时间以后,所有可用的服务对应的节点都会变成更新以后的值,如果需要最新数据,应该在读数据之前调用 sync接口。

常用API

create

创建一个新节点,通过指定路径的方式创建节点,例如创建路径为/A/A1/demo,则会在A1节点下创建一个demo节点;

delete

删除节点,通过路径的方式删除节点,如果删除路径为/A/A1/demo,则会删除A1节点下的demo节点;

exists

判断指定路径下的节点是否存在,例如判断路径为/A/A1/demo,则会判断A1节点下的demo节点是否存在;

get

获取指定路径下某个节点的值是什么,例如获取路径为/A/A1/demo,则会获取A1节点下的demo节点的值什么;

set

为指定路径的节点进行赋值操作,例如修改路径为/A/A1/demo,则会修改A1节点下的demo节点的值;

getChildren

获取指定路径节点下的子节点信息,例如获取路径为/A,则会获取A节点下的A1和A2节点;

sync

获取到同步数据,这个涉及到了zk的原理,zk集群属于最终一致性,调用该方法,可以获取到最终的结果值,如果不使用该方法,在查询的时候可能获取到的值是中间值。

统一配置管理

为什么要用统一配置?

我们做项目时用到的配置比如数据库配置等…我们都是写死在项目里面,如果需要更改,那么也是的修改配置文件然后再投产上去,那么问题来了,如果做集群的呢,有100台机器,这时候做修改那就太不切实际了,那么就需要用到统一配置管理啦。

解决思路

  1. 把公共配置抽取出来
  2. 对公共配置进行维护
  3. 修改公共配置后应用不需要重新部署

采用方案

  1. 公共配置抽取存放于zookeeper中并落地数据库
  2. 对公共配置修改后发布到zookeeper中并落地数据库
  3. 对应用开启配置实时监听,zookeeper配置文件一旦被修改,应用可实时监听到并获取

下面基于zookeeper粗略实现了一个统一配置管理

配置文件Config

public class Config implements Serializable{

private static final long serialVersionUID = 1L;
private String userNm;
private String userPw;

public Config() {
}
public Config(String userNm, String userPw) {
this.userNm = userNm;
this.userPw = userPw;
}
public String getUserNm() {
return userNm;
}
public void setUserNm(String userNm) {
this.userNm = userNm;
}
public String getUserPw() {
return userPw;
}
public void setUserPw(String userPw) {
this.userPw = userPw;
}
@Override
public String toString() {
return “Config [userNm=” + userNm + “, userPw=” + userPw + “]”;
}

}

配置管理中心

public class ZkConfigMag {

private Config config;
/**
* 从数据库加载配置
*/

public Config downLoadConfigFromDB(){
//getDB
config = new Config(“nm”, “pw”);
return config;
}

/**
* 配置文件上传到数据库
*/

public void upLoadConfigToDB(String nm, String pw){
if(config==null)config = new Config();
config.setUserNm(nm);
config.setUserPw(pw);
//updateDB
}

/**
* 配置文件同步到zookeeper
*/

public void syncConfigToZk(){
ZkClient zk = new ZkClient(“localhost:2181”);
if(!zk.exists(“/zkConfig”)){
zk.createPersistent(“/zkConfig”,true);
}
zk.writeData(“/zkConfig”, config);
zk.close();
}
}

应用监听实现

public class ZkGetConfigClient {

private Config config;

public Config getConfig() {
ZkClient zk = new ZkClient(“localhost:2181”);
config = (Config)zk.readData(“/zkConfig”);
System.out.println(“加载到配置:”+config.toString());

//监听配置文件修改
zk.subscribeDataChanges(“/zkConfig”, new IZkDataListener(){
@Override
public void handleDataChange(String arg0, Object arg1)
throws Exception {
config = (Config) arg1;
System.out.println(“监听到配置文件被修改:”+config.toString());
}

@Override
public void handleDataDeleted(String arg0) throws Exception {
config = null;
System.out.println(“监听到配置文件被删除”);
}

});
return config;
}
      // 开启监听
public static void main(String[] args) {
ZkGetConfigClient client = new ZkGetConfigClient();
client.getConfig();
System.out.println(client.config.toString());
for(int i = 0;i<10;i++){
System.out.println(client.config.toString());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}


}

测试,启动配置管理中心

public class ZkConfigTest {

public static void main(String[] args) {
ZkConfigMag mag = new ZkConfigMag();
Config config = mag.downLoadConfigFromDB();
System.out.println(“….加载数据库配置….”+config.toString());
mag.syncConfigToZk();
System.out.println(“….同步配置文件到zookeeper….”);

//歇会,这样看比较清晰
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

mag.upLoadConfigToDB(“cwhcc”, “passwordcc”);
System.out.println(“….修改配置文件….”+config.toString());
mag.syncConfigToZk();
System.out.println(“….同步配置文件到zookeeper….”);


}

}

至此一个简单的统一配置管理示例结束。

统一命名服务

统一命名服务的理解其实跟域名一样,是我们为这某一部分的资源给它取一个名字,别人通过这个名字就可以拿到对应的资源。

比如说,现在我有一个域名francissoung.com,但我这个域名下有多台机器:

192.168.1.1

192.168.1.2

192.168.1.3

192.168.1.4

别人访问francissoung.com即可访问到我的机器,而不是通过IP去访问。

分布式锁

优势

用ZooKeeper作为分布式协调服务,它的一个很大的作用就是用来实现分布式锁。用ZooKeeper节点存在临时节点,它的生命周期与session有关,它会随着session的消失而消失,这就比较完美的解决了使用redis作为分布式锁时可能出现的死锁问题。

实现方式

我们可以使用ZooKeeper来实现分布式锁,那是怎么做的呢??下面来看看:

系统A、B、C都去访问/locks节点

访问的时候会创建带顺序号的临时/短暂(EPHEMERAL_SEQUENTIAL)节点,比如,系统A创建了id_000000节点,系统B创建了id_000002节点,系统C创建了id_000001节点。

接着,拿到/locks节点下的所有子节点(id_000000,id_000001,id_000002),判断自己创建的是不是最小的那个节点

如果是,则拿到锁。

释放锁:执行完操作后,把创建的节点给删掉

如果不是,则监听比自己要小1的节点变化

举个例子:

系统A拿到/locks节点下的所有子节点,经过比较,发现自己(id_000000),是所有子节点最小的。所以得到锁

系统B拿到/locks节点下的所有子节点,经过比较,发现自己(id_000002),不是所有子节点最小的。所以监听比自己小1的节点id_000001的状态

系统C拿到/locks节点下的所有子节点,经过比较,发现自己(id_000001),不是所有子节点最小的。所以监听比自己小1的节点id_000000的状态

……

等到系统A执行完操作以后,将自己创建的节点删除(id_000000)。通过监听,系统C发现id_000000节点已经删除了,发现自己已经是最小的节点了,于是顺利拿到锁

….系统B如上

使用curator-recipes组件模拟获取锁的过程

创建连接

private static CuratorFramework getZkClient() {
        String zkServerAddress = “127.0.0.1:2181”;
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
                .connectString(zkServerAddress)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        zkClient.start();
        return zkClient;
    }

模拟获取锁、释放锁

static class TestThread implements Runnable {
        private Integer threadFlag;
        private InterProcessMutex lock;

        public TestThread(Integer threadFlag, InterProcessMutex lock) {
            this.threadFlag = threadFlag;
            this.lock = lock;
        }

        @Override
        public void run() {
            try {
                // 获取锁
                lock.acquire();
                System.out.println(“第”+threadFlag+”线程获取到了锁”);
                //等到1秒后释放锁
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                try {
                    // 释放锁
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

模拟50个线程抢锁

public static void main(String[] args)  {
        CuratorFramework zkClient = getZkClient();
        String lockPath = “/lock”;
        InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
        //模拟50个线程抢锁
        for (int i = 0; i < 50; i++) {
            new Thread(new TestThread(i, lock)).start();
        }
    }

集群管理

在分布式的集群中,经常会由于各种原因,比如硬件故障,软件故障,网络问题,有些节点会进进出出。有新的节点加入进来,也有老的节点退出集群。这个时候集群中其他机器需要感知到这种变化,然后根据这种变化做出对应的决策。比如一 个分布式的 SOA 架构中,服务是一个集群提供的,当消费者访问某个服务时,就需要采用 某种机制发现现在有哪些节点可以提供该服务,也就是服务发现。

还是以我们三个系统A、B、C为例,在ZooKeeper中创建临时节点即可:

只要系统A挂了,那/groupMember/A这个节点就会删除,通过监听groupMember下的子节点,系统B和C就能够感知到系统A已经挂了。(新增也是同理)

除了能够感知节点的上下线变化,ZooKeeper还可以实现动态选举Master的功能。(如果集群是主从架构模式下)

原理也很简单,如果想要实现动态选举Master的功能,Znode节点的类型是带顺序号的临时节点(EPHEMERAL_SEQUENTIAL)就好了。

Zookeeper会每次选举最小编号的作为Master,如果Master挂了,自然对应的Znode节点就会删除。然后让新的最小编号作为Master,这样就可以实现动态选举的功能了。

补充:ZooKeeper服务中,2888端口用于follower调用leader进行写操作,3888端口为选主使用端口,2181端口为客户端连接ZooKeeper服务节点端口。

附录

锁相关

https://blog.51cto.com/u_15368284/5115965?b=totalstatistic

curator-recipes组件

https://curator.apache.org/curator-recipes/index.html

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注