Hadoop学习之Zookeeper相关内容

我爱海鲸 2023-01-25 22:00:31 暂无标签

简介docker-compose安装zookeeper、Zookeeper相关的使用

一、引言

在分布式环境下,如果舍弃SpringCloud,使用其他的分布式框架,那么注册心中,配置集中管理,集群管理,分布式锁,分布式任务,队列的管理想单独实现怎么办。

二、Zookeeper介绍

Zookeeper本身是Hadoop生态园的中的一个组件,Zookeeper强大的功能,在Java分布式架构中,也会频繁的使用到Zookeeper。

Zookeeper 作者 Doug:

Zookeeper就是一个文件系统+监听通知机制

三、Zookeeper安装

docker-compose学习 请参考 docker学习之compose

安装:

version: "3.1"
services:
  zk:
    image: zookeeper
    restart: always
    container_name: zk
    ports:
      - 2181:2181

四、Zookeeper架构【重点

4.1 Zookeeper的架构图

每一个节点都没称为znode

每一个znode中都可以存储数据

节点名称是不允许重复的

Zookeeper的架构图

4.2 znode 类型

四种 Znode

持久节点:永久的保存在你的Zookeeper

持久有序节点:永久的保存在你的Zookeeper,他会给节点添加一个有序的序号。/xx ->/xx0000001

临时节点:当存储的客户端和Zookeeper服务断开连接时,这个临时节点自动删除

临时有序节点:当存储的客户端和zookeeper服务断开连接时,这个临时节点自动删除,他会给节点添加一个有序的序号。/xx->/xx0000001

4.3 Zookeeper的监听通知机制

客户端可以去监听Zookeeper中的Znode节点。

Znode改变时,会通知监听当前Znode的客户端。

监听通知机制

五、Zookeeper常用命令

Zookeeper针对增删改查的常用命令

#查询当前节点下的全部子节点

ls  节点名称

#例子ls /

 

#查询当前节点下的数据

get  节点名称

#例子get /zookeeper

 

#创建节点

create [-s][-e] znode名称 znode数据

# -s: sequence,有序节点

#-e: ephemeral,临时节点

 

#修改节点值

set  znode名称  新数据

 

#删除节点

delete znode名称  #没有子节点的znode

rmr znode名称  #删除当前节点和全部的子节点

 

六、Zookeeper集群【重点

6.1 Zookeeper集群架构图

集群架构图

6.2 Zookeeper集群中节点的角色

Leader: Master主节点

Follower (默认的从节点):从节点,参与选举全新的Leader

Observer:从节点,不参与投票

Looking:正在找Leader节点

6.3 Zookeeper投票策略

每一个Zookeeper服务都会被分配一个全局唯一的myid,myid是一个数字。

Zookeeper在执行写数据时,每一个节点都有一个自己的FIFO的队列。

保证写每一个数据的时候,顺序是不会乱的,Zookeeper还会给每一个数据分配一个全局唯一的zxid,数据越新zxid就越大。

选举Leader:

选举出zxid最大的节点作为Leader。

在zxid相同的节点中,选举出一个myid最大的节点,作为Leader。

6.4搭建Zookeeper集群

yml文件

version: "3.1"
services:
  zk1;
    image: zookeeper
    restart: always
    container_name: zk1
    ports:
      - 2181:2181
   environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3 :2888:3888;2181
 zk2:
    image: zookeeper
    restart: always
    container_name: zk2
    ports:
        - 2182:2181
    environment:
        ZOO_MY_ID: 2
        ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
  zk3:
      image: zookeeper
      restart: always
      container_name: zk3
      ports:
         - 2183:2181
      environment:
        ZOO_MY_ID: 3
        ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3 :2888:3888;2181

七、Java操作iZookeeper

7.1 Java连接Zookeeper

创建Maven工程

导入依赖

<dependencies>
    <dependency>
        <groupId>org.apache .zookeeper</groupId>
		<artifactId>zookeeper</artifactId>
        <version>3.6.0</version>
    </dependency>

	<dependency>
        <groupId>org.apache.curator</groupId>
		<artifactId>curator-recipes</artifactId>
		<version>4.0.1</version>
    </ dependency>

	<dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
		<version>4.12</version>
    </dependency>
</dependencies>

编写连接Zookeeper集群的工具类

public class ZkUtil {
    
	public static CuratorFramework cf(){
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,2);
		curatorFramework cf = CuratorFrameworkFactory.builder()
            .connectString("192.168.199.109:2181,192.168.199.109:2182,192.168.199.109:2183")
			.retryPolicy(retryPolicy)
            .build();
        
		cf.start();
        
		return cf;
    }
}

测试类

7.2 Java操作Znode节点

查询

public class Demo2{
    curatorFramework cf = ZkUtil.cf();
    
	//获取子节点
    @Test
    public void getChildren() throws Exception {
        List<String strings = cf.getChildren().forPath("/");
 
        for (String string : strings){
            System.out.print1n(string);
        }
    }
    
	//获取节点数据
	@Test
    public void getData() throws Exception {
        byte[] bytes = cf.getData().forPath("/zk");
		System.out.println(new String( bytes, "UTF-8" ));
    }
}

添加

@Test
public void create() throws Exception {
    cf.create( ) .withMode(CreateMode .PERSISTENT). forPath(" /zk2" , " uuuu".getBytes()
);

 

修改

@Test
public void update() throws Exception {
    cf.setData().forPath( "/zk2" , "oooo".getBytes());
)

 

删除

@Test
public void delete() throws Exception {
    cf.delete().deletingChildrenIfNeeded().forPath("/zk2");
}

查看znode的状态

@Test
public void stat() throws Exception {
    Stat stat =cf.checkExists().forPath("/zk"');
	System.out.print1n(stat);
}

7.3 监听通知机制

实现方式

public class Demo3 {

	CuratorFramework cf = ZkkUtil.cf();
	
	@Test
	public void listen() throws Exception {
		//1.创建NodeCache对象,指定要监听的znode
		NodeCache nodeCache = new NodeCache(cf, " /zk");
		nodeCache.start();
		
		//2.添加一个监听器
		nodeCache.getListenable().addListener(new NodeCacheListener() {
			@override
			public void nodeChanged() throws Exception {
				byte[] data = nodeCache. getCurrentData().getData();
				Stat stat = nodeCache. getCurrentData().getStat();
				String path = nodeCache. getCurrentData().getPath();
				System.out.print1n("监听的节点是:” +path);
				System.out.println("节点现在的数据是:” +new String(data, "UTF-8"));
				system.out.println("节点状态是:" + stat);
			}
		});
		System.out. println(”开始监听!!");
		//3. System.in.read();
		system.in .read();
	}
}

 

你好:我的2025