zookeeper watch机制与客户端实现原理

本文讨论如何用zookeeper做服务发现,zookeeper 的watch实现原理和机制,以及python使用kazoo客户端库连接zookeeper时如何在数据变化后更新数据,保证数据安全。
\

1. 服务发现

你有一个服务,它需要请求下游的服务,假设下游的服务器ip有192.168.0.1 和 192.168.0.2 。实践中面临的一个问题是,下游的服务器可能会增加机器,也可能减少机器,以实现动态扩容和收缩。一个可行的方案是在你的服务和下游服务之间架设一个nginx,由它代为转发请求,你的请求发送给nginx,nginx向下游服务器转发请求,这样下游服务的变化情况对于你的服务来说是不可见的。另外一个方案是想办法让你的服务动态的感知下游服务的变化,当下游服务增加机器时,你需要知道新增机器的ip是多少,当下游服务的机器减少时,你需要知道减少机器的ip,当你向下游发送请求时,你自行决定向哪台服务器发送请求。

上面这段所涉及的就是服务发现问题,nginx做服务发现有很多种方案,使用zookeeper就是其中一种,在不重启nginx或者更新nginx配置的前提下,使用 dyups 模块 修改共享内存中的 upstream 。如果你的服务也需要具备这种能力,那么利用zookeeper恐怕是你最好的选择。

约定一个节点,你的服务监控这个节点,这里的监控,是指watch节点的子节点变化情况,下游的服务在启动后,在节点下新增临时节点,并在临时节点里存储有关自己的信息。你的服务发现了子节点变化情况,就可以更新下游服务器的ip了,当下游服务某个机器出现问题导致服务不可用时,由于当初启动时创建的是临时节点,一旦服务由于网络不通或者机器出现问题而导致不可用,那么这个临时节点会自行消失,这种情况,你的服务也会感知到。

2. zookeeper是如何通知你的

以python代码做示例,你使用kazoo这个库与zookeeper建立连接,这个所谓的连接究竟是什么呢?和连接http服务器,mysql一样,在最底层,这个连接其实就是TCP连接。你可以通过TCP连接向zookeeper发送数据,同理,zookeeper也可以向你发送数据,当你所watch的节点发生变化时,它就会向你发送数据,提醒你数据有变化。

由此,带来了一个新的问题,我的程序使用kazoo与zookeeper建立了连接,对某个节点进行了watch监控,但这之后,我就去忙别的事情了,zookeeper突然通知我节点有变化,那么这个通知对我的程序会产生什么影响呢?会终止我正在运行的程序么?kazoo这个库是如何处理zookeeper发来的数据变化的通知呢?

这些问题之前一直困扰这我,直到最近需要在项目中使用zookeeper,于是决定认真研究一番,结果真相却是那么的简单,kazoo仅仅是启动了一个线程来接收zookeeper发来的消息,你程序里运行的其他的代码都是在另一个线程里,zookeeper的通知对程序的执行不会产生任何影响。

import time
from kazoo.client import KazooClient

host = '192.168.1.12:3182'
zk_client = KazooClient(host, auth_data=[('auth', 'auth')])
zk_client.start()

children_node = []

@zk_client.ChildrenWatch('/config/server')
def watch_child(children):
    global children_node
    children_node = children


while True:
    # 模拟对children_node 的使用
    print(children_node)
    time.sleep(5)

我代码里注册了一个watch,来监听节点/config/server 的子节点变化情况,这个监听是有另外一个线程来执行的,在我的代码里,你看不到这个线程。当子节点发生变化后,zookeeper发来消息,线程收到消息后调用watch_child函数更新数据, children会是所有子节点的列表。

3. 多线程下修改数据不安全啊

watch_child 函数是在另一个线程中被调用的,这就带来了一个隐患,在我的程序正常执行的过程中,某一刻,正在对children_node 进行修改,此时,节点发生变化,watch_child被调用,也要修改children_node, 这样不就发生了两个线程同时修改一个数据的情况了么?这不是多线程编程里极力避免的事情么?难道需要做线程互斥操作来保证数据的安全?

5年前,我就对zookeeper有过学习,但只是从概念上,使用方法进行学习,并没有在工作中实际使用过,对于zookeeper数据发生变化时的watch机制并不了解,在我了解了watch机制后,又对多线程修改同一份数据产生了如上的疑问。

以下,是我自己探索思考的结果,如果你有不同的看法,或者你对zookeeper的使用更加深入熟练,请指正我观点中错误的部分。

我认为除了所注册的watch函数,其他代码不应当对数据进行修改。道理很简单,一旦我们在程序里对数据进行了修改,那么我们所得到的信息就和zookeeper所保存的信息有了差异和不同,那么zookeeper还有什么存在的价值和意义呢?因此,前面所担心的多线程数据安全问题,原本就是一个伪命题,我们决不能擅自修改从zookeeper获取到的数据。

但用变化后的数据更新旧的数据仍然要注意数据安全,假设你的代码有这样一段

for i in range(len(children_node)):
    node = children_node[i]
    # do something

如果watch_child函数触发时,你的主线程里正在执行上面这段代码,就有可能发生索引异常。watch_child函数触发前,假设children_node的长度是5, i = 3时, watch_child 被触发,这时,修改了变量children_node, 列表长度变成了3, 那么就会产生索引异常。正确的写法应该是这样

for node in children_node:
    # do something

同样是for循环,为什么这种写法在多线程环境下就是安全的呢?原因在于for循环的本质是使用iter函数获得children_node的一个迭代器,然后使用next函数进行迭代并执行处理StopIteration 异常。在上面的这个for循环执行期间,watch_child函数被触发,修改了children_node的值,但却不会对for循环造成任何影响,因为watch_child函数中的赋值语句执行时,程序在内存中创建了一个新的对象,变量children_node指向了内存中的新对象,而在这之前children_node所指向,所引用的对象不会被销毁,因为for循环时通过iter函数获得了children_node的一个迭代器,这个迭代器指向了老的children_node,因此不会被销毁。上面这段内容对于掌握python不是很深入的人来说,很难理解,这涉及到python变量的本质,变量是内存中对象的引用,下面这段代码可以向你证明我的观点

lst = [9, 4]

print('lst 内存地址:',id(lst))
_iter = iter(lst)
print(next(_iter))
lst = [8, 7, 6]
print('lst 内存地址:',id(lst))
print(next(_iter))

程序输出结果

lst 内存地址: 2006395135624
9
lst 内存地址: 2006395135432
4

尽管中途修改了lst的值,但是遍历输出的仍然是那个最开始定义的列表 [9, 4]。在watch_child函数中,我建议直接对children_node 进行赋值,并在使用children_node 时避免使用索引,或许,你还有其他的想法,比如在watch_child函数中这样操作

def watch_child(children):
    for child in children:
        if not child in children_node:
            children_node.append(child)

如果你在主线程使用children_node 时避免使用索引,这种更新数据的方法同样可行,但有两个致命缺陷,第一个缺陷,如果子节点减少,这段代码不能删除掉children_node存储的子节点,当然,你可以用更多的代码来解决这个问题,可你仍然要面临一个严峻的问题,这种修改方式可能会带来意想不到的后果,下面这段代码将向你展示这种可能

lst = [9, 4]

_iter = iter(lst)
print(next(_iter))
lst.append(5)   # 假设这行代码发生在另一个线程,最终效果是一样的
print(next(_iter))
print(next(_iter))

与前面的代码不同,这一次,没有对lst进行赋值,而是使用append方法增加新的数据,这与上面的watch_child函数一样,都是修改了列表的内容,可这一次,第三个print(next(_iter)) 可以完美的输出刚才所添加的5,但你在程序里原本只想遍历9 和 4, 但另一个线程向lst增加了新的数据,最终,程序的最终表现与预期不符,我认为这是危险的事情。

经过前面的分析,总结为两点实践要求:

节点发生变化时,将新的数据赋值给变量,不要通过append或是其他方法对数据进行修改

使用for item in lst 的方式遍历列表,避免使用索引

我的示例是监控节点的子节点,watch函数传入的参数是列表,如果你监控的是节点数据,同样是直接赋值就可以了,不用考虑for循环遍历避免使用索引的问题。