日度归档:2012年2月6日

beanstalkd 消息队列的第一手资料

beanstalk 消息队列 小结
协议说明和各状态转换情况

基本知识点:
1. 对于beanstalk 消息队列中每条数据都为 job
2. beanstalk service端 ,会维护 tubes[多个管道]
3. client端可以监听,使用多 tube
4. client端可以指定 use 管道[ client生成一个新的job时会把此job提交到 指定管道]
5. client端可以指定 watch 管道 [ client接收处理job时会到 指定管道得到待处理的job]

官方示意图:
put reserve delete
—–> [READY] ———> [RESERVED] ——–> *poof*

一般情况:
1. 任务提交到service端,job 管理放入内存空间并为其标记状态 [READY]
2. client通过轮训竞争得到次状态, job 改为 [RESERVED]
2.1 当在默认时间 120 秒内没处理完 , job.stats.timeouts 就会大于 0
同时其他 轮训竞争client会拿到这个job【 注意了 每次timeouts时,在轮训的客户端就会得到次job,状态都为 ready,timeouts>0 】
3. 随便其中一台client处理完 job.delete , 其他 client 中的此job 都会 *poof*

deom – python beanstalkc 中 job.stats 参考:
使用 easy_install beanstalkc
API 参考 : http://github.com/earl/beanstalkc/blob/master/TUTORIAL
刚生成的 beanstalk
{‘buries’: 0, ‘releases’: 0, ‘tube’: ‘default’, ‘timeouts’: 0, ‘ttr’: 120,
‘age’: 6, ‘pri’: 2147483648L, ‘delay’: 0, ‘state’: ‘reserved’, ‘time-left’: 114,
‘kicks’: 0, ‘id’: 2}

以timeout了的 beanstalk,并且在其他client轮训到 job
{‘buries’: 0, ‘releases’: 0, ‘tube’: ‘default’, ‘timeouts’: 1, ‘ttr’: 120,
‘age’: 417, ‘pri’: 2147483648L, ‘delay’: 0, ‘state’: ‘reserved’, ‘time-left’: 110,
‘kicks’: 0, ‘id’: 2}
{‘buries’: 0, ‘releases’: 0, ‘tube’: ‘default’, ‘timeouts’: 1, ‘ttr’: 120, ‘age’: 415,
‘pri’: 2147483648L, ‘delay’: 0, ‘state’: ‘reserved’, ‘time-left’: 4294967163L,
‘kicks’: 0, ‘id’: 2}

当没所有client 的 job 都到期 了 状态
{‘buries’: 0, ‘releases’: 0, ‘tube’: ‘default’, ‘timeouts’: 2, ‘ttr’: 120,
‘age’: 417, ‘pri’: 2147483648L, ‘delay’: 0, ‘state’: ‘ready’, ‘time-left’: 4294967161L,
‘kicks’: 0, ‘id’: 2}
{‘buries’: 0, ‘releases’: 0, ‘tube’: ‘default’, ‘timeouts’: 2, ‘ttr’: 120, ‘age’: 415,
‘pri’: 2147483648L, ‘delay’: 0, ‘state’: ‘ready’, ‘time-left’: 4294967163L,
‘kicks’: 0, ‘id’: 2}

其中 client1 job.delete
client1 job.stats *poof*
client2 job.stats *poof*

比较全的状态说明 – [官方文档]
http://github.com/kr/beanstalkd/blob/v1.1/doc/protocol.txt?raw=true

官方示意图:

先简单说明下(完全自己理解的,欢迎拍砖。本人E人太差~看官档费劲,谅解下):
job.stats状态 = [READY] 待处理, [RESERVED] 正处理, [DELAYED]延迟状态 , [BURIED] 隐藏状态

1. 延迟提交
py.client1.put>>> beanstalk.put(‘yes!’, delay=10)
py.client3.reserve>>> job = beanstalk.reserve()
# 等待 10 秒

2. 管道测试
put-job到service端 可以指定 put的tube管道
如:

py.client1.put>>> beanstalk.use(‘foo’)
py.client1.put>>> beanstalk.put(‘hey!’)

py.client2.reserve>>> job = beanstalk.reserve()
# 一直拥塞,应为 他 watch 管道 ‘default’

py.client3.reserve>>> beanstalk.watch(‘foo’)
# beanstalk.ignore(‘bar’) 放弃监听 bar
py.client3.reserve>>> job = beanstalk.reserve()
py.client3.reserve>>> job.body #输出 ‘hey!’

3. 隐藏状态 现在吧 client 1/2/3 的 use watch 的管道都调回 default
py.client2.reserve>>> job = beanstalk.reserve()
py.client3.reserve>>> job = beanstalk.reserve()
py.client1.put>>> beanstalk.put(‘隐藏状态!’)
py.client2.reserve>>> job.bury() #2 轮训得到 并且 修改 job 为隐藏状态
# 120 秒后 client3 没有轮训得到 此job
py.client2.reserve>>> job.stats()
{‘buries’: 1, ‘releases’: 0, ‘tube’: ‘default’, ‘timeouts’: 0, ‘ttr’: 120,
‘age’: 188, ‘pri’: 2147483648L, ‘delay’: 0, ‘state’: ‘buried’,
‘time-left’: 4294967228L, ‘kicks’: 0, ‘id’: 11}
py.client2.reserve>>> beanstalk.kick( job.stats()[‘id’] ) #修改状态为 reserved
# 立刻 client3 得到 job
py.client3.reserve>>> job.stats()
{‘buries’: 1, ‘releases’: 0, ‘tube’: ‘default’, ‘timeouts’: 0, ‘ttr’: 120, ‘age’: 313,
‘pri’: 2147483648L, ‘delay’: 0, ‘state’: ‘reserved’,
‘time-left’: 110, ‘kicks’: 1, ‘id’: 11}
# 这时候 client2 / 3 同时 有 job 11 状态 ‘buries’: 1,’timeouts’: 0,’state’: ‘reserved’

4. peek 窥见
可以得到 一个 stats – read 的 job ,其他 client 可以 job = beanstalk.reserve()
后马上 job.stats 会变成 [RESERVED]
py.client2.reserve>>> job = beanstalk.peek_ready()
取得 job 并看 本 client 能 处理能
>>> job = beanstalk.peek(3)
>>> job.body
‘yes!’
>>> job.stats()[‘state’]
‘ready’
这种形式西 job 不能 bury 等修改状态,但 可以 delete

peek 系类
peek_buried
peek_ready