中国电力建设集团网站,网站架构图一般包括什么,网站开发后端做什么,网站菜单栏代码ZooKeeper数据模型
ZooKeeper是一个树形目录服务#xff0c;其数据模型和Uiix的文件目录树很类似#xff0c;拥有一个层次化结构。这里面的每一个节点都被称为#xff1a;ZNode#xff0c;每个节点上都会保存自己的数据和节点信息。节点可以拥有子节点#xff0c;同时也允…ZooKeeper数据模型
ZooKeeper是一个树形目录服务其数据模型和Uiix的文件目录树很类似拥有一个层次化结构。这里面的每一个节点都被称为ZNode每个节点上都会保存自己的数据和节点信息。节点可以拥有子节点同时也允许少量1MB数据存储在该节点之下。节点可以分为四大类 PEFSISTENT持久化节点EPHEMERAL临时节点-ePERSISTENT_SEQUENTIAL持久化顺序节点-sEPHEMERAL_SEQUENTIAL临时顺序节点-es
ZooKeeper服务端常用命令
启动ZooKeeper服务./zkServer.sh start查看ZooKeeper服务./zkServer.sh status停止ZooKeeper服务./zkServer.sh stop重启ZooKeeper服务./zkServer.sh restart
ZooKeeper客户端命令
./zkCli.sh -server localhost:2181连接服务端如果是单机后面的可以省略不写。ls [/] 查看指定节点下子节点create [/app] [hrbu]创建一个名为/app1的子节点并存放数据。get [/app] 获取节点下的数据。set [/app] [hrbu]给指定节点设置数据delete [/app] 删除指定节点 ps此命令无法删除存在子节点的节点如果要删除带有子节点的节点可以是使用deleteall [/app] 命令。quit 断开连接help 查看命令帮助create -e [/app] 创建临时节点会话关闭就会删除create -s [/app] 创建顺序节点create -es [/app] 创建临时顺序节点ls -s [/app] 查看节点的详细信息
使用Curator API操作Zookeeper
建立连接
Test
public void testConnect() {//重试策略ExponentialBackoffRetry retry new ExponentialBackoffRetry(3000, 10);//第一种方式CuratorFramework client CuratorFrameworkFactory.newClient(192.168.130.120:2181, 60 * 1000, 15 * 1000, retry);//第二种方式CuratorFramework client1 CuratorFrameworkFactory.builder().connectString(192.168.130.120:2181).sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retry).namespace(hrbu).build();//开启连接client.start();
}参数解读
connectString – list of servers to connect to ZooKeeper的地址 sessionTimeoutMs – session timeout 会话超时时间 connectionTimeoutMs – connection timeout 连接超时时间 retryPolicy – retry policy to use 重试策略 会话超时时间和连接超时时间有默认值。
第二种链式编程的方式可以指定一个工作空间在此客户端下的所有操作都会将此工作空间作为根目录。
注意
如果使用的是云服务器需要将指定端口打开
firewall-cmd --zonepublic --add-port2181/tcp --permanent 开放端口
firewall-cmd --zonepublic --list-ports 查看已经开放的端口
systemctl restart firewalld 重启防火墙生效
最后别忘了在服务器的安全组里面添加端口将2181端口打开
添加节点
Test
public void testCreate1() throws Exception {//基本创建CreateBuilder createBuilder client.create();//创建时不指定数据会将当前客户端ip存到里面createBuilder.forPath(/app1);//指定数据createBuilder.forPath(/app2, hello.getBytes());
}Test
public void testCreate2() throws Exception {CreateBuilder createBuilder client.create();//设置节点类型默认的类型是持久化//CreateMode是枚举类型createBuilder.withMode(CreateMode.EPHEMERAL).forPath(/app3);
}Test
public void testCreate3() throws Exception {CreateBuilder createBuilder client.create();//创建多级节点如果父节点不存在则创建父节点。createBuilder.creatingParentContainersIfNeeded().forPath(/app4/app4_1);
}查询节点
Test
public void testGet() throws Exception {//查询数据byte[] bytes client.getData().forPath(/app1);System.out.println(new String(bytes));//查询子节点ListString strings client.getChildren().forPath(/app4);strings.forEach(System.out::println);//查询节点状态信息Stat stat new Stat();client.getData().storingStatIn(stat).forPath(/app1);System.out.println(stat);
}修改节点
Test
public void testSet() throws Exception {//修改数据client.setData().forPath(/app1,hrbu.getBytes());//根据版本修改int version 0;Stat stat new Stat();client.getData().storingStatIn(stat).forPath(/app1);version stat.getVersion();client.setData().withVersion(version).forPath(/app1, HRBU.getBytes());
}删除节点
Test
public void testDelete() throws Exception {//删除单个节点client.delete().forPath(/app4/app4_1);//删除带有子节点的节点client.delete().deletingChildrenIfNeeded().forPath(/app4);//强制删除client.delete().guaranteed().forPath(/app4);//回调client.delete().guaranteed().inBackground(new BackgroundCallback() {Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println(执行删除操作);}}).forPath(/app4);}Watch事件监听 Zookeeper允许用户在指定节点上注册一些Watcher并且在一些特定事件触发的时候ZooKeeper服务端会将事件通知到感兴趣的客户端上去该机制是ZooKeeper实现分布式协调服务的重要特性。 ZooKeeper中引入了Watcher机制来实现了发布/订阅功能能够让多个订阅者同时监听某一个对象当一个对象自身状态变化时会通知所有订阅者。 ZooKeeper原生支持通过注册Watcher来进行事件监听但是使用并不是特别方便需要开发人员自己反复注册Watcher比较繁琐。 Curator引入了Cache来时限对Zookeeper服务端事件的监听。 ZooKeeper提供了三种Watcher NodeCache只是监听某一个特定的节点。PathChildrenCache监控一个Node的子节点。TreeCache可以监控整个树上的所有节点类似于PathChildrenCache和NodeCache的组合。
NodeCache
Test
public void testNodeCache() throws Exception {//NodeCache:指定一个节点注册监听器//创建NodeCache对象final NodeCache nodeCache new NodeCache(client, /app1);//注册监听nodeCache.getListenable().addListener(new NodeCacheListener() {Overridepublic void nodeChanged() throws Exception {System.out.println(app1节点发生变化);//获取修改节点后的数据byte[] data nodeCache.getCurrentData().getData();System.out.println(变化后的节点new String(data));}});//开启监听如果为true则开启则开启监听加载缓冲数据nodeCache.start(true);
}PathChildrenCache
Test
public void testPathChildrenCache() throws Exception {//PathChildrenCache:监听某个节点的所有子节点//创建监听对象PathChildrenCache pathChildrenCache new PathChildrenCache(client, /hrbu, true);//绑定监听器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {Overridepublic void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {System.out.println(子节点发生变化);System.out.println(pathChildrenCacheEvent);//监听子节点的数据变更并且得到变更后的数据//获取类型PathChildrenCacheEvent.Type type pathChildrenCacheEvent.getType();//判断类型if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {//获取数据byte[] data pathChildrenCacheEvent.getData().getData();System.out.println(new String(data));}}});//开启pathChildrenCache.start();
}TreeCache
Test
public void testTreeCache() throws Exception {//创建监听器TreeCache treeCache new TreeCache(client, /);//注册监听treeCache.getListenable().addListener(new TreeCacheListener() {Overridepublic void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {System.out.println(节点发生变化);System.out.println(treeCacheEvent);}});//开启treeCache.start();}分布式锁实现
概述
我们在进行单机应用开发涉及并发同步的时候我们往往采用synchronized或者lock的方式来解决多线程间的代码同步问题这时候多线程的运行都是在同一个JVM之下没有任何问题。但当我们的应用时分布式集群工作的情况下属于多JVM下的工作环境跨JVM之间已经无法通过多线程的锁解决同步问题。那么就需要一种更加高级的锁机制来处理跨机器进程之间的数据同步问题这就是分布式锁。
Zookeeper分布式锁原理
核心思想当客户端要获取锁则创建节点使用完锁则删除该节点。 1.客户端获取锁时在lock节点下创建临时顺序节点。2.然后获取lock下面的所有子节点客户端获取到所有的子节之后如果发现自己创建的子节点序号最小那么就认为该客户端获取到了锁。使用完锁后将该节点删除。3.如果发现自己创建的节点并非lock所有子节点中最小的说明自己还没有获取到锁此时客户端需要找到比自己小的那个节点同时对其注册事件监听器监听删除事件。4.如果发现比自己小的那个节点被删除则客户端的Watcher会收到相应通知此时再次判断自己创建的节点是否时lock子节点中序号最小的如果是则获取到了锁如果不是则重复以上步骤继续获取比自己小的一个节点并注册监听。
Curator实现分布式锁API 在Curator中有五种锁方案 InterProcessSemaphoreMutex分布式排它锁非可重入锁 InterProcessMutex分布式可重入排它锁 InterProcessReadWriteLock分布式读写锁 InterProcessMultiLock将多个锁作为单个实体管理的容器 InterProcessSemaphoreV2共享信号量
案例
package com.hrbu.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.concurrent.TimeUnit;public class Ticket12306 implements Runnable{private int tickets 10;//数据库的票数private InterProcessMutex lock ;public Ticket12306(){//重试策略RetryPolicy retryPolicy new ExponentialBackoffRetry(3000, 10);CuratorFramework client CuratorFrameworkFactory.builder().connectString(8.130.32.75:2181).sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).build();//开启连接client.start();lock new InterProcessMutex(client,/lock);}Overridepublic void run() {while(true){//获取锁try {lock.acquire(3, TimeUnit.SECONDS);if(tickets 0){System.out.println(Thread.currentThread():tickets);Thread.sleep(100);tickets--;}} catch (Exception e) {e.printStackTrace();}finally {//释放锁try {lock.release();} catch (Exception e) {e.printStackTrace();}}}}
}
package com.hrbu.curator;public class LockTest {public static void main(String[] args) {Ticket12306 ticket12306 new Ticket12306();//创建客户端Thread t1 new Thread(ticket12306,携程);Thread t2 new Thread(ticket12306,飞猪);t1.start();t2.start();}
}