视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
基于RabbitMQrpc实现的主机管理
2020-11-27 14:13:56 责编:小采
文档
题目:基于RabbitMQ rpc实现的主机管理,下面就来具体介绍一下。

需求:

可以对指定机器异步的执行多个命令
例子:

>>:run "df -h" --hosts 192.168.3.55 10.4.3.4 
task id: 45334
>>: check_task 45334 
>>:

注意,每执行一条命令,即立刻生成一个任务ID,不需等待结果返回,通过命令check_task TASK_ID来得到任务结果

README

 1 基于RabbitMQ rpc实现的主机管理 2 可以对指定机器异步的执行多个命令 3 例子: 4 >>:run "df -h" --hosts 192.168.3.55 10.4.3.4 5 task id: 45334 6 >>: check_task 45334 #查看任务信息 7 8 程序结构: 9 RabbitMQ_PRC/#综合目录10 |- - -PRC_CLIENT/#client程序主目录11 | |- - -__init__.py12 | |- - -bin/#执行程目录13 | | |- - -__init__.py14 | | |- - -clien_start.py #客户端执行文件15 | |16 | |17 | |- - -core #主逻辑程序目录18 | | |- - -__init__.py19 | | |- - -clien_class.py#客户端执行主要逻辑 类20 | |21 | |22 |23 |24 |- - -PRC_SERVER/#服务端程序目录25 | |- - -__init__.py26 | |- - -bin/#执行目录27 | | |- - -__init__.py28 | | |- - -server_start.py#服务端程序执行文件29 | |30 | |31 | |- - -core/##主逻辑程序目录32 | | |- - -server_class.py#主逻辑 相关类33 | |34 |35 |- - -README

程序结构:
RabbitMQ_PRC/#综合目录
|- - -PRC_CLIENT/#client程序主目录
| |- - -__init__.py
| |- - -bin/#执行程目录
| | |- - -__init__.py
| | |- - -clien_start.py #客户端执行文件
1 import os ,sys2 BASE_DIR=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量3 sys.path.append(BASE_DIR)#增加环境变量4 5 from core.client_class import Threa6 7 if __name__ == '__main__':8 RPCS=Threa()9 response=RPCS.th_start()
View Code
| |- - -core #主逻辑程序目录
| | |- - -__init__.py
| | |- - -clien_class.py#客户端执行主要逻辑 类
 1 import pika 2 import uuid 3 import threading 4 import random 5 6 class FibonacciRpcClient(object): 7 def __init__(self): 8 #self.credentials=pika.PlainCredentials("test","test") 9 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))#生成连接的服务端 ip 10 #self.connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.11.51",15672,'/',self.credentials))#生成连接的服务端 ip 11 self.channel = self.connection.channel()#创建一个管道 12 13 def get_respon(self,cal_queue,cal_id):#取任务信息 14 self.response=None 15 self.callback_id=cal_id#队列名 16 self.channel.basic_consume(self.on_response,queue=cal_queue)# 使用回调函数 17 while self.response is None: 18 self.connection.process_data_events()#非阻塞模式接收消息 19 return self.response#返回 20 21 def on_response(self, ch, method, props, body):#回调函数 22 if self.callback_id == props.correlation_id:#判断服务端返回的队列名是否与当前所生成的队列名一致 23 self.response = body# 将服务端的结果赋于返回来的结果变量 24 ch.basic_ack(delivery_tag = method.delivery_tag)##确保消息被 接收 25 26 def call(self, queues,n):#发送消息的函数 27 result = self.channel.queue_declare(exclusive=False)#随机生成一个队列,收消息后不删除 28 self.callback_queue = result.method.queue#赋于管道 变量 29 self.corr_id = str(uuid.uuid4())#生成一个服务端返回消息的队列名 30 self.channel.basic_publish(exchange='', 31 routing_key=queues,#队列名 32 properties=pika.BasicProperties( 33 reply_to = self.callback_queue,#发送的管道队列名 34 correlation_id = self.corr_id,#发送给服务端,用于返回消息的队列名 35 ), 36 body=str(n))#发送的内容数据 37 return self.callback_queue,self.corr_id#返回管道名 队列id号 38 39 class Threa(object):#线程 类 40 def __init__(self): 41 self.info={}#生成一个字典 42 self.help_info=''' 指令示例33[36;1m 43 run "df -h" --hosts 192.168.3.55 10.4.3.4 44 --- ------- ------- ------------ -------- 45 运行 指令 主机 ip 1# ip 2# 46 check_task_all #查看任务列表 47 check_task 25413 #查看具体id任务信息,过后删除 48 helps #查看指令帮助 49 33[0m''' 50 51 def check_task_all(self,cmd):#查看所有任务信息 52 53 for i in self.info: 54 print("任务id:%s,服务端:%s,命令:%s"%(i,self.info[i][0],self.info[i][1])) 55 def check_task(self,take_id):#查看任务 56 try: 57 id=int(take_id.split()[1])#取任务ID 58 #print(id,'任务ID') 59 cal_queue=self.info[id][2]#管道名 60 #print(cal_queue,'队列') 61 cal_id=self.info[id][3]#消息队列位置 62 #print(cal_id,'消息位置') 63 clinets=FibonacciRpcClient()#调用类  rest=clinets.get_respon(cal_queue,cal_id)#取任务信息 65 print('任务执行结果:',rest.decode())#打印 66 del self.info[id]#从字典中删除对应任务 67 except Exception as e: 68 print(e) 69 return 70 71 def run(self,str_l):#run函数 72 addr_l=self.attr_l(str_l)#获取IP 73 oreds=self.oreds_(str_l)#获取 命令 74 #print(oreds,'上传命令') 75 for i in addr_l:#取出IP 76 tak_id=random.randint(10000,99999)#任务ID生成 77 #print(tak_id,'任务ID') 78 obj=FibonacciRpcClient()#生成连接类 79 r=obj.call(i,oreds)#ip做队列名 命令 80 self.info[tak_id]=[i,oreds,r[0],r[1]]#写入字典 tak_id{ ip 命令 管道名 队列名} 81 return self.info 82 83 def retf(self,str_l):#反射命令 84 sl=str_l.split()[0]#取命令开头 85 if sl=='helps': 86 self.helps() 87 if len(str_l.split())==1 and sl!='check_task_all' : 88 return  if hasattr(self,sl):#是否存在 90 func=getattr(self,sl)#调用 91 rer=func(str_l)#执行 92 #print(rer) 93 if rer is not None: 94 for i in rer: 95 print("任务id:%s"%i) 96 97 def attr_l(self,n):#命令分解函数 98 attr=n.split("--")##用--分割 99 addr=attr[1].split()[1:]#获取IP列表100 return addr#返回IP列表101 102 def oreds_(self,n):#获取 命令103 oreds=n.split(""")[1]##用"分割取命令104 return oreds#返回 命令105 106 def helps(self):#查看指令帮助107 print(self.help_info)108 109 def th_start(self):#开始110 self.helps()111 while True:112 str_l=input(">>:").strip()113 if not str_l:continue#如果为空重新输入114 t1=threading.Thread(target=self.retf,args=(str_l,))#创建新线程 调用反射函数115 t1.start()#开始线程
View Code
|- - -PRC_SERVER/#服务端程序目录
| |- - -__init__.py
| |- - -bin/#执行目录
| | |- - -__init__.py
| | |- - -server_start.py#服务端程序执行文件
1 import os ,sys2 BASE_DIR=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量3 sys.path.append(BASE_DIR)#增加环境变量4 5 from core.client_class import Threa6 7 if __name__ == '__main__':8 RPCS=Threa()9 response=RPCS.th_start()
View Code
| |- - -core/##主逻辑程序目录
| | |- - -server_class.py#主逻辑 相关类
 1 import pika,os 2 3 class RabbitMQ_PRC(object): 4 def __init__(self,myaddr): 5 self.queues=myaddr#用本机IP做队列名 6 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#生成消息对队 7 self.channel = self.connection.channel()#生成管道 8 self.channel.queue_declare(queue=self.queues)#消息收接队列 9 10 def str_run(self,body):#处理 run的函数11 msg = os.popen(body.decode()).read()#执行系统命令12 if not msg:13 msg = '系统命令不存在'14 return msg15 16 def on_request(self,ch, method, props, body):#回调函数17 resp=self.str_run(body)18 print('执行完成')19 #print(resp)20 ch.basic_publish(exchange='',21 routing_key=props.reply_to,#收消息的队列22 properties=pika.BasicProperties(correlation_id =props.correlation_id),#返回消息的队列23 body=str(resp))#返回结果数据24 ch.basic_ack(delivery_tag = method.delivery_tag)##确保消息被 客户端接收25 26 def run_(self):27 self.channel.basic_qos(prefetch_count=1)#同时只处理一个消息28 self.channel.basic_consume(self.on_request, queue=self.queues)#接收消息,自动调用回调函数29 30 print("开始接收数据!")31 self.channel.start_consuming()#开始接收

下载本文
显示全文
专题