视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
python实现的文件同步服务器实例
2020-11-27 14:33:50 责编:小采
文档


本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:

服务端使用asyncore, 收到文件后保存到本地。

客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。

重点:

1. 使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。

2. 客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。

上代码:

服务端:

# receive file from client and store them into file use asyncore.# 
#/usr/bin/python 
#coding: utf-8 
import asyncore 
import socket 
from socket import errno 
import logging 
import time 
import sys 
import struct 
import os 
import fcntl 
import threading 
from rrd_graph import MakeGraph 
try: 
 import rrdtool 
except (ImportError, ImportWarnning): 
 print "Hope this information can help you:" 
 print "Can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu." 
 sys.exit(1) 
class RequestHandler(asyncore.dispatcher): 
 def __init__(self, sock, map=None, chunk_size=1024): 
 self.logger = logging.getLogger('%s-%s' % (self.__class__.__name__, str(sock.getsockname()))) 
 self.chunk_size = chunk_size 
 asyncore.dispatcher.__init__(self,sock,map) 
 self.data_to_write = list() 
 def readable(self): 
 #self.logger.debug("readable() called.") 
 return True 
 def writable(self): 
 response = (not self.connected) or len(self.data_to_write) 
 #self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write))) 
 return response 
 def handle_write(self): 
 data = self.data_to_write.pop() 
 #self.logger.debug("handle_write()->%s size: %s",data.rstrip('
'),len(data)) 
 sent = self.send(data[:self.chunk_size]) 
 if sent < len(data): 
 remaining = data[sent:] 
 self.data_to_write.append(remaining) 
 def handle_read(self): 
 self.writen_size = 0 
 nagios_perfdata = '../perfdata' 
 head_packet_format = "!LL128s128sL" 
 head_packet_size = struct.calcsize(head_packet_format) 
 data = self.recv(head_packet_size) 
 if not data: 
 return 
 filepath_len, filename_len, filepath,filename, filesize = struct.unpack(head_packet_format,data) 
 filepath = os.path.join(nagios_perfdata, filepath[:filepath_len]) 
 filename = filename[:filename_len] 
 self.logger.debug("update file: %s" % filepath + '/' + filename)
 try: 
 if not os.path.exists(filepath): 
 os.makedirs(filepath) 
 except OSError: 
 pass 
 self.fd = open(os.path.join(filepath,filename), 'w') 
 #self.fd = open(filename,'w') 
 if filesize > self.chunk_size: 
 times = filesize / self.chunk_size 
 first_part_size = times * self.chunk_size 
 second_part_size = filesize % self.chunk_size 
 while 1: 
 try: 
 data = self.recv(self.chunk_size) 
 #self.logger.debug("handle_read()->%s size.",len(data)) 
 except socket.error,e: 
 if e.args[0] == errno.EWOULDBLOCK: 
 print "EWOULDBLOCK" 
 time.sleep(1) 
 else: 
 #self.logger.debug("Error happend while receive data: %s" % e) 
 break 
 else: 
 self.fd.write(data) 
 self.fd.flush() 
 self.writen_size += len(data) 
 if self.writen_size == first_part_size: 
 break 
 #receive the packet at last 
 while 1: 
 try: 
 data = self.recv(second_part_size) 
 #self.logger.debug("handle_read()->%s size.",len(data)) 
 except socket.error,e: 
 if e.args[0] == errno.EWOULDBLOCK: 
 print "EWOULDBLOCK" 
 time.sleep(1) 
 else: 
 #self.logger.debug("Error happend while receive data: %s" % e) 
 break 
 else: 
 self.fd.write(data) 
 self.fd.flush() 
 self.writen_size += len(data) 
 if len(data) == second_part_size: 
 break 
 elif filesize <= self.chunk_size: 
 while 1: 
 try: 
 data = self.recv(filesize) 
 #self.logger.debug("handle_read()->%s size.",len(data)) 
 except socket.error,e: 
 if e.args[0] == errno.EWOULDBLOCK: 
 print "EWOULDBLOCK" 
 time.sleep(1) 
 else: 
 #self.logger.debug("Error happend while receive data: %s" % e) 
 break 
 else: 
 self.fd.write(data) 
 self.fd.flush() 
 self.writen_size += len(data) 
 if len(data) == filesize: 
 break 
 self.logger.debug("File size: %s" % self.writen_size) 
class SyncServer(asyncore.dispatcher): 
 def __init__(self,host,port): 
 asyncore.dispatcher.__init__(self) 
 self.debug = True 
 self.logger = logging.getLogger(self.__class__.__name__) 
 self.create_socket(socket.AF_INET,socket.SOCK_STREAM) 
 self.set_reuse_addr() 
 self.bind((host,port)) 
 self.listen(2000) 
 def handle_accept(self): 
 client_socket = self.accept() 
 if client_socket is None: 
 pass 
 else: 
 sock, addr = client_socket 
 #self.logger.debug("Incoming connection from %s" % repr(addr)) 
 handler = RequestHandler(sock=sock) 
class RunServer(threading.Thread): 
 def __init__(self): 
 super(RunServer,self).__init__() 
 self.daemon = False 
 def run(self): 
 server = SyncServer('',9999) 
 asyncore.loop(use_poll=True) 
def StartServer(): 
 logging.basicConfig(level=logging.DEBUG, 
 format='%(name)s: %(message)s', 
 ) 
 RunServer().start() 
 #MakeGraph().start() 
if __name__ == '__main__': 
 StartServer()

客户端:

# monitor path with inotify(python module), and send them to remote server.# 
# use sendfile(2) instead of send function in socket, if we have python-sendfile installed.# 
import socket 
import time 
import os 
import sys 
import struct 
import threading 
import Queue 
try: 
 import pyinotify 
except (ImportError, ImportWarnning): 
 print "Hope this information can help you:" 
 print "Can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu." 
 sys.exit(1) 
try: 
 from sendfile import sendfile 
except (ImportError,ImportWarnning): 
 pass 
filetype_filter = [".rrd",".xml"] 
def check_filetype(pathname): 
 for suffix_name in filetype_filter: 
 if pathname[-4:] == suffix_name: 
 return True 
 try: 
 end_string = pathname.rsplit('.')[-1:][0] 
 end_int = int(end_string) 
 except: 
 pass 
 else: 
 # means pathname endwith digit 
 return False 
class sync_file(threading.Thread): 
 def __init__(self, addr, events_queue): 
 super(sync_file,self).__init__() 
 self.daemon = False 
 self.queue = events_queue 
 self.addr = addr 
 self.chunk_size = 1024 
 def run(self): 
 while 1: 
 event = self.queue.get() 
 if check_filetype(event.pathname): 
 print time.asctime(),event.maskname, event.pathname 
 filepath = event.path.split('/')[-1:][0] 
 filename = event.name 
 filesize = os.stat(os.path.join(event.path, filename)).st_size 
 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
 filepath_len = len(filepath) 
 filename_len = len(filename) 
 sock.connect(self.addr) 
 offset = 0 
 data = struct.pack("!LL128s128sL",filepath_len, filename_len, filepath,filename,filesize) 
 fd = open(event.pathname,'rb') 
 sock.sendall(data) 
 if "sendfile" in sys.modules: 
 # print "use sendfile(2)" 
 while 1: 
 sent = sendfile(sock.fileno(), fd.fileno(), offset, self.chunk_size) 
 if sent == 0: 
 break 
 offset += sent 
 else: 
 # print "use original send function" 
 while 1: 
 data = fd.read(self.chunk_size) 
 if not data: break 
 sock.send(data) 
 sock.close() 
 fd.close() 
class EventHandler(pyinotify.ProcessEvent): 
 def __init__(self, events_queue): 
 super(EventHandler,self).__init__() 
 self.events_queue = events_queue 
 def my_init(self): 
 pass 
 def process_IN_CLOSE_WRITE(self,event): 
 self.events_queue.put(event) 
 def process_IN_MOVED_TO(self,event): 
 self.events_queue.put(event) 
def start_notify(path, mask, sync_server): 
 events_queue = Queue.Queue() 
 sync_thread_pool = list() 
 for i in range(500): 
 sync_thread_pool.append(sync_file(sync_server, events_queue)) 
 for i in sync_thread_pool: 
 i.start() 
 wm = pyinotify.WatchManager() 
 notifier = pyinotify.Notifier(wm,EventHandler(events_queue)) 
 wdd = wm.add_watch(path,mask,rec=True) 
 notifier.loop() 
def do_notify(): 
 perfdata_path = '/var/lib/pnp4nagios/perfdata' 
 mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO 
 sync_server = ('127.0.0.1',9999) 
 start_notify(perfdata_path,mask,sync_server) 
if __name__ == '__main__': 
 do_notify()

python监视线程池

#!/usr/bin/python 
import threading 
import time 
class Monitor(threading.Thread): 
 def __init__(self, *args,**kwargs): 
 super(Monitor,self).__init__() 
 self.daemon = False 
 self.args = args 
 self.kwargs = kwargs 
 self.pool_list = [] 
 def run(self): 
 print self.args 
 print self.kwargs 
 for name,value in self.kwargs.items(): 
 obj = value[0] 
 temp = {} 
 temp[name] = obj 
 self.pool_list.append(temp) 
 while 1: 
 print self.pool_list 
 for name,value in self.kwargs.items(): 
 obj = value[0] 
 parameters = value[1:] 
 died_threads = self.cal_died_thread(self.pool_list,name)
 print "died_threads", died_threads 
 if died_threads >0: 
 for i in range(died_threads): 
 print "start %s thread..." % name 
 t = obj[0].__class__(*parameters) 
 t.start() 
 self.add_to_pool_list(t,name) 
 else: 
 break 
 time.sleep(0.5) 
 def cal_died_thread(self,pool_list,name): 
 i = 0 
 for item in self.pool_list: 
 for k,v in item.items(): 
 if name == k: 
 lists = v 
 for t in lists: 
 if not t.isAlive(): 
 self.remove_from_pool_list(t) 
 i +=1 
 return i 
 def add_to_pool_list(self,obj,name): 
 for item in self.pool_list: 
 for k,v in item.items(): 
 if name == k: 
 v.append(obj) 
 def remove_from_pool_list(self, obj): 
 for item in self.pool_list: 
 for k,v in item.items(): 
 try: 
 v.remove(obj) 
 except: 
 pass 
 else: 
 return

使用方法:

rrds_queue = Queue.Queue() 
 make_rrds_pool = [] 
 for i in range(5): 
 make_rrds_pool.append(MakeRrds(rrds_queue)) 
 for i in make_rrds_pool: 
 i.start() 
 make_graph_pool = [] 
 for i in range(5): 
 make_graph_pool.append(MakeGraph(rrds_queue)) 
 for i in make_graph_pool: 
 i.start() 
 monitor = Monitor(make_rrds_pool=(make_rrds_pool, rrds_queue), 
 make_graph_pool=(make_graph_pool, rrds_queue)) 
 monitor.start()

解析:

1. 接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。
2. 每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。
3. 如果没有线程死去,则什么也不做。

从外部调用Django模块

import os 
import sys 
sys.path.insert(0,'/data/cloud_manage') 
from django.core.management import setup_environ 
import settings 
setup_environ(settings) 
from common.monitor import Monitor 
from django.db import connection, transaction

前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.
这样不仅可以调用django自身的模块,还能调用project本身的东西。

希望本文所述对大家的Python程序设计有所帮助。

下载本文
显示全文
专题