上一篇博文主要讲解了RabbitMQ集群的搭建,本文主要介绍一下利用python程序完成mq的消息收发和实时监控
一、利用Python脚本完成RabbitMQ消息发送和接受:
原理和思想:
1、利用python语言强大的模块pika,来实现自动发送消息和接受消息;
2、MQ集群有两台内存节点,第一个内存节点用于发送消息的节点,第二个内存节点用于接受消息的节点;因为MQ集群的消息同步特性,发送消息到node1,接受消息到node2
3、此demo程序的用途除了验证消息的发送和接受之外,还有监控MQ集群是否正常的用途;如果MQ集群其中一个内存节点出现异常无法正常连接,python程序会抛出异常提示
MQ节点出现故障,同时触发另外一个ssh 自动登录执行重启mq服务的python脚本自动完成出现故障的MQ节点的服务自动重启;并结合zabbix监控实现及时告警;
4、此脚本需要放置在MQ集群的LVS主节点上面。因为python脚本里面调用了ipvsadm -d -t 192.168.1.254:56720 -r 192.168.1.4:56720的命令来实现当node1 MQ节点出现故障的时候通过
ipvsadm命令自动删除有问题的MQ节点。当MQ节点重启了MQ服务之后会自动加入到LVS的real server之中;
实施步骤:
1、完成RabbitMQ集群的安装;
2、创建一个测试的Vhost和测试的用户;
rabbitmqctl add_vhost gytest #创建一个vhost
rabbitmqctl add_user gytest_producer01 123456 #创建一个生产者的测试用户
rabbitmqctl add_user gytest_consumer01 123456 # 创建一个消费者的测试用户
rabbitmqctl set_user_tags gytest_producer01 management #将生产者的用户设置为management角色权限
rabbitmqctl set_user_tags gytest_consumer01 management #将消费者的用户设置为management角色权限
rabbitmqctl set_permissions -p gytest gytest_producer01 '.*' '.*' '.*' #将gytest的vhost授权给生产者用户gytest_producer01
rabbitmqctl set_permissions -p gytest gytest_consumer01 '.*' '.*' '.*' #将gytest的vhost授权给消费者用户gytest_consumer01
3、安装好python软件,下载所需要的python模块;
python -m ensurepip --default-pip
/usr/local/python2.7.12/bin/pip install pika 安装好pip,通过pip install安装所需要的模块
4、利用python语言开发一个发送消息的demo程序:
#encoding: utf-8
#author: gaoyang#date: 2018-01-31#summary: 发送方/生产者 import os, sys, timeimport pikafilename = '/tmp/rabbitmq_send2.log'
with open(filename,'w') as f: f.truncate() f.close()def Main(host) :
credentials = pika.PlainCredentials("gytest_producer01", "123456") parameters = pika.ConnectionParameters(host=host,port=56720, virtual_host='gytest', credentials=credentials) #定义了MQ集群的认证信息,包括IP、端口、vhost、用户名和密码等 connection = pika.BlockingConnection(parameters) channel = connection.channel() queue = channel.queue_declare(queue='queuetest') #定义队列,MQ集群管理员一般只会提前创建VHOST和登录用户,其他的MQ资源比如队列、交换机、绑定等由代码自行创建; channel.exchange_declare(exchange='exchangetest') #定义交换机 channel.queue_bind(queue='queuetest',routing_key='rkeytest',exchange='exchangetest') #定义队列和交换机之间通过路由KEY绑定。生产者发送消息到交换机,消息中带有路由KEY。交换机根据路由KEY,把消息路由到指定的队列while True:
message = time.strftime('%Y-%m-%d-%H:%M:%S', time.localtime()) #发送消息的内容体 channel.basic_publish(exchange='exchangetest', #消息发送的内容除了消息本身,还需要带上交换机和路由key routing_key='rkeytest', body=message ) filename = '/tmp/rabbitmq_send2.log' #程序的日志文件定义 with open(filename,'a') as f: f.write("send message: %s\n" % (message)) #程序的日志内容# print('send message: %s' % message) queue = channel.queue_declare(queue='queuetest', passive=True) messageCount = queue.method.message_count# print('messageCount: %d' % messageCount) with open(filename,'a') as f: f.write("messageCount: %d\n" % (messageCount)) time.sleep( 5 ) if __name__ == '__main__': try: hostname='192.168.1.4' Main(hostname) except: filename = '/tmp/rabbitmq_send2.log' with open(filename,'a') as f: f.write("192.168.1.4 is faild\n") f.close()# print hostname + " is faild" os.system("/sbin/ipvsadm -d -t 192.168.1.254:56720 -r 192.168.1.4:56720") os.system("python /.scripts/ssh_send.py") else: print hostname is ok
vim /.scripts/ssh_send.py
#这个python脚本主要是使用python的paramiko模块,实现自动登录到linux机器执行脚本或者命令
import paramiko
def sshclient_execmd(hostname, port, username, password, execmd): paramiko.util.log_to_file("paramiko.log") s = paramiko.SSHClient() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) s.connect(hostname=hostname, port=port, username=username, password=password) stdin, stdout, stderr = s.exec_command (execmd) stdin.write("Y") # Generally speaking, the first connection, need a simple interaction. print stdout.read() s.close() def main(): hostname = '192.168.1.5' port = 10022 username = 'root' password = '111111' execmd = "sudo /bin/bash /.scripts/killmq.sh" sshclient_execmd(hostname, port, username, password, execmd) if __name__ == "__main__": main()
启动起来之后的日志如下:
5、利用python语言开发一个接受消息的demo程序:
#encoding: utf-8
#author: gaoyang#date: 2018-01-31#summary: 接收方/消费者 import os, sys, timeimport pikafilename = '/tmp/rabbitmq_resv.log'
with open(filename,'w') as f: f.truncate() f.close() # 接收处理消息的回调函数def ConsumerCallback (channel, method, properties, body): filename = '/tmp/rabbitmq_resv.log' with open(filename,'a') as f: f.write("Received %s\n" % (body))# print("Received %s" % body) #ch.basic_ack(delivery_tag=method.delivery_tag) def Main(host): credentials = pika.PlainCredentials("gytest_consumer01", "123456") parameters = pika.ConnectionParameters(host=host,port=56720, virtual_host='gytest', credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() queue = channel.queue_declare(queue='queuetest') # no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面 # no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式 channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True) filename = '/tmp/rabbitmq_resv.log' with open(filename,'a') as f: f.write("Wait Message ...\n")# print('Wait Message ...') channel.start_consuming() if __name__ == '__main__': try: hostname='192.168.1.5' Main(hostname) except: filename = '/tmp/rabbitmq_resv.log' with open(filename,'a') as f: f.write("192.168.1.5 is faild\n") f.close()# print hostname + " is faild" os.system("/sbin/ipvsadm -d -t 192.168.1.254:56720 -r 192.168.1.5:56720") os.system("python /.scripts/ssh_resv.py") else: print hostname is ok
vim /.scripts/ssh_resv.py
import paramiko
def sshclient_execmd(hostname, port, username, password, execmd): paramiko.util.log_to_file("paramiko.log") s = paramiko.SSHClient() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) s.connect(hostname=hostname, port=port, username=username, password=password) stdin, stdout, stderr = s.exec_command (execmd) stdin.write("Y") # Generally speaking, the first connection, need a simple interaction. print stdout.read() s.close() def main(): hostname = '192.168.1.5' port = 10022 username = 'root' password = '111111' execmd = "sudo /bin/bash /.scripts/killmq.sh" sshclient_execmd(hostname, port, username, password, execmd) if __name__ == "__main__": main()
启动之后的日志如下:
二、利用zabbix来完成python程序的实时监控:
在LVS Master 主机上面创建两个监控项
设置触发器
因为这里用到了自定义脚本
需要在LVS Master服务器上面创建/etc/zabbix/zabbix_agentd.d/下面创建两个文件
[root@test~]# cat /etc/zabbix/zabbix_agentd.d/mq_monitor_send.conf
UserParameter=mq_monitor_send[*], /bin/grep $1 /tmp/rabbitmq_send2.log |wc -l [root@test ~]# cat /etc/zabbix/zabbix_agentd.d/mq_monitor_resv.conf UserParameter=mq_monitor_resv[*], /bin/grep $1 /tmp/rabbitmq_resv.log |wc -l service zabbix-agent restart
这里的$1就是zabbix里面配置的监控项的参数:
mq_monitor_send[faild]
mq_monitor_resv[faild]
最终的效果是:
1、通过python程序实时模拟JAVA或者PHP业务系统发送和接受MQ消息;
2、如果MQ节点出现异常(无论是消息堆积导致内存爆满故障,或者是MQ集群脑裂导致的故障都会检查处理)会第一时间自动触发zabbix告警。并且自动重启mq服务;