视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
实例探究Python以并发方式编写高性能端口扫描器的方法
2020-11-27 14:29:21 责编:小采
文档


关于端口扫描器
端口扫描工具(Port Scanner)指用于探测服务器或主机开放端口情况的工具。常被计算机管理员用于确认安全策略,同时被攻击者用于识别目标主机上的可运作的网络服务。

端口扫描定义是客户端向一定范围的服务器端口发送对应请求,以此确认可使用的端口。虽然其本身并不是恶意的网络活动,但也是网络攻击者探测目标主机服务,以利用该服务的已知漏洞的重要手段。端口扫描的主要用途仍然只是确认远程机器某个服务的可用性。

扫描多个主机以获取特定的某个端口被称为端口清扫(Portsweep),以此获取特定的服务。例如,基于SQL服务的计算机蠕虫就会清扫大量主机的同一端口以在 1433 端口上建立TCP连接。

Python实现

端口扫描器原理很简单,无非就是操作socket,能connect就认定这个端口开放着。

import socket 
def scan(port): 
 s = socket.socket() 
 if s.connect_ex(('localhost', port)) == 0: 
 print port, 'open' 
 s.close() 
if __name__ == '__main__': 
 map(scan,range(1,65536)) 

这样一个最简单的端口扫描器出来了。
等等喂,半天都没反应,那是因为socket是阻塞的,每次连接要等很久才超时。
我们自己给它加上的超时。

s.settimeout(0.1)

再跑一遍,感觉快多了。

多线程版本

import socket 
import threading 
def scan(port): 
 s = socket.socket() 
 s.settimeout(0.1) 
 if s.connect_ex(('localhost', port)) == 0: 
 print port, 'open' 
 s.close() 
 
if __name__ == '__main__': 
 threads = [threading.Thread(target=scan, args=(i,)) for i in xrange(1,65536)] 
 map(lambda x:x.start(),threads) 

运行一下,哇,好快,快到抛出错误了。thread.error: can't start new thread。
想一下,这个进程开启了65535个线程,有两种可能,一种是超过最大线程数了,一种是超过最大socket句柄数了。在linux可以通过ulimit来修改。
如果不修改最大,怎么用多线程不报错呢?
加个queue,变成生产者-消费者模式,开固定线程。

多线程+队列版本

import socket 
import threading 
from Queue import Queue 
def scan(port): 
 s = socket.socket() 
 s.settimeout(0.1) 
 if s.connect_ex(('localhost', port)) == 0: 
 print port, 'open' 
 s.close() 
 
def worker(): 
 while not q.empty(): 
 port = q.get() 
 try: 
 scan(port) 
 finally: 
 q.task_done() 
 
if __name__ == '__main__': 
 q = Queue() 
 map(q.put,xrange(1,65535)) 
 threads = [threading.Thread(target=worker) for i in xrange(500)] 
 map(lambda x:x.start(),threads) 
 q.join() 

这里开500个线程,不停的从队列取任务来做。

multiprocessing+队列版本
总不能开65535个进程吧?还是用生产者消费者模式

import multiprocessing 
def scan(port): 
 s = socket.socket() 
 s.settimeout(0.1) 
 if s.connect_ex(('localhost', port)) == 0: 
 print port, 'open' 
 s.close() 
 
def worker(q): 
 while not q.empty(): 
 port = q.get() 
 try: 
 scan(port) 
 finally: 
 q.task_done() 
 
if __name__ == '__main__': 
 q = multiprocessing.JoinableQueue() 
 map(q.put,xrange(1,65535)) 
 jobs = [multiprocessing.Process(target=worker, args=(q,)) for i in xrange(100)] 
 map(lambda x:x.start(),jobs) 

注意这里把队列作为一个参数传入到worker中去,因为是process safe的queue,不然会报错。
还有用的是JoinableQueue(),顾名思义就是可以join()的。

gevent的spawn版本

from gevent import monkey; monkey.patch_all(); 
import gevent 
import socket 
... 
if __name__ == '__main__': 
 threads = [gevent.spawn(scan, i) for i in xrange(1,65536)] 
 gevent.joinall(threads) 

注意monkey patch必须在被patch的东西之前import,不然会Exception KeyError.比如不能先import threading,再monkey patch.

gevent的Pool版本

from gevent import monkey; monkey.patch_all(); 
import socket 
from gevent.pool import Pool 
... 
if __name__ == '__main__': 
 pool = Pool(500) 
 pool.map(scan,xrange(1,65536)) 
 pool.join() 

concurrent.futures版本

import socket 
from Queue import Queue 
from concurrent.futures import ThreadPoolExecutor 
... 
if __name__ == '__main__': 
 q = Queue() 
 map(q.put,xrange(1,65536)) 
 with ThreadPoolExecutor(max_workers=500) as executor: 
 for i in range(500): 
 executor.submit(worker,q) 

下载本文
显示全文
专题