视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
python实现封装得到virustotal扫描结果
2020-11-27 14:40:47 责编:小采
文档


本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下:

import simplejson 
import urllib 
import urllib2 
import os, sys 
import logging 
 
try: 
 import sqlite3 
except ImportError: 
 sys.stderr.write("ERROR: Unable to locate Python SQLite3 module. " 
 "Please verify your installation. Exiting...
") 
 sys.exit(-1) 
 
MD5 = "5248f774d2ee0a10936d0b1dc107f1" 
MD5 = "12fa5fb74201d9b6a14f63fbf9a81ff6" #do not have report on virustotal.com 
 
 
APIKEY = "xxxxxxxxxxxxxxxxxx"用自己的 

class VirusTotalDatabase: 
 """ 
 Database abstraction layer. 
 """ 
 def __init__(self, db_file): 
 log = logging.getLogger("Database.Init") 
 self.__dbfile = db_file 
 self._conn = None 
 self._cursor = None 
 
 # Check if SQLite database already exists. If it doesn't exist I invoke 
 # the generation procedure. 
 if not os.path.exists(self.__dbfile): 
 if self._generate(): 
 print("Generated database "%s" which didn't" 
 " exist before." % self.__dbfile) 
 else: 
 print("Unable to generate database") 
 
 # Once the database is generated of it already has been, I can 
 # initialize the connection. 
 try: 
 self._conn = sqlite3.connect(self.__dbfile) 
 self._cursor = self._conn.cursor() 
 except Exception, why: 
 print("Unable to connect to database "%s": %s." 
 % (self.__dbfile, why)) 
 
 log.debug("Connected to SQLite database "%s"." % self.__dbfile) 
 
 def _generate(self): 
 """ 
 Creates database structure in a SQLite file. 
 """ 
 if os.path.exists(self.__dbfile): 
 return False 
 
 db_dir = os.path.dirname(self.__dbfile) 
 if not os.path.exists(db_dir): 
 try: 
 os.makedirs(db_dir) 
 except (IOError, os.error), why: 
 print("Something went wrong while creating database " 
 "directory "%s": %s" % (db_dir, why)) 
 return False 
 
 conn = sqlite3.connect(self.__dbfile) 
 cursor = conn.cursor() 
 
 cursor.execute("CREATE TABLE virustotal (
" 
 " id INTEGER PRIMARY KEY,
" 
 " md5 TEXT NOT NULL,
" 
 " Kaspersky TEXT DEFAULT NULL,
" 
 " McAfee TEXT DEFAULT NULL,
" 
 " Symantec TEXT DEFAULT NULL,
" 
 " Norman TEXT DEFAULT NULL,
" 
 " Avast TEXT DEFAULT NULL,
" 
 " NOD32 TEXT DEFAULT NULL,
" 
 " BitDefender TEXT DEFAULT NULL,
" 
 " Microsoft TEXT DEFAULT NULL,
" 
 " Rising TEXT DEFAULT NULL,
" 
 " Panda TEXT DEFAULT NULL
" 
 ");") 
 print "create db:%s sucess" % self.__dbfile 
 
 return True 
 
 def _get_task_dict(self, row): 
 try: 
 task = {} 
 task["id"] = row[0] 
 task["md5"] = row[1] 
 task["Kaspersky"] = row[2] 
 task["McAfee"] = row[3] 
 task["Symantec"] = row[4] 
 task["Norman"] = row[5] 
 task["Avast"] = row[6] 
 task["NOD32"] = row[7] 
 task["BitDefender"] = row[8] 
 task["Microsoft"] = row[9] 
 task["Rising"] = row[10] 
 task["Panda"] = row[11] 
 return task 
 except Exception, why: 
 return None 
 
 def add_sample(self, md5, virus_dict): 
 """ 
 
 """ 
 task_id = None 
 
 if not self._cursor: 
 return None 
 if not md5 or md5 == "": 
 return None 
 
 Kaspersky = virus_dict.get("Kaspersky", None) 
 McAfee = virus_dict.get("McAfee", None) 
 Symantec = virus_dict.get("Symantec", None) 
 Norman = virus_dict.get("Norman", None) 
 Avast = virus_dict.get("Avast", None) 
 NOD32 = virus_dict.get("NOD32", None) 
 BitDefender = virus_dict.get("BitDefender", None) 
 Microsoft = virus_dict.get("Microsoft", None) 
 Rising = virus_dict.get("Rising", None) 
 Panda = virus_dict.get("Panda", None) 
 
 self._conn.text_factory = str 
 try: 
 self._cursor.execute("SELECT id FROM virustotal WHERE md5 = ?;", 
 (md5,)) 
 sample_row = self._cursor.fetchone() 
 except sqlite3.OperationalError, why: 
 print "sqlite3 error:%s
" % str(why) 
 return False 
 
 if sample_row: 
 try: 
 sample_row = sample_row[0] 
 self._cursor.execute("UPDATE virustotal SET Kaspersky=?, McAfee=?, Symantec=?, Norman=?, Avast=?, 
 NOD32=?, BitDefender=?, Microsoft=?, Rising=?, Panda=? WHERE id = ?;", 
 (Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, Microsoft, 
 Rising, Panda, sample_row)) 
 self._conn.commit() 
 task_id = sample_row 
 except sqlite3.OperationalError, why: 
 print("Unable to update database: %s." % why) 
 return False 
 else: #the sample not in the database 
 try: 
 self._cursor.execute("INSERT INTO virustotal " 
 "(md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, 
 Microsoft, Rising, Panda) " 
 "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", 
 (md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, 
 Microsoft, Rising, Panda)) 
 self._conn.commit() 
 task_id = self._cursor.lastrowid 
 except sqlite3.OperationalError, why: 
 print "why",str(why) 
 return None 
 print "add_to_db:%s, task_id:%s" % (str(self.__dbfile), str(task_id)) 
 return task_id 
 
 def get_sample(self): 
 """ 
 Gets a task from pending queue. 
 """ 
 log = logging.getLogger("Database.GetTask") 
 
 if not self._cursor: 
 log.error("Unable to acquire cursor.") 
 return None 
 
 # Select one item from the queue table with higher priority and older 
 # addition date which has not already been processed. 
 try: 
 self._cursor.execute("SELECT * FROM virustotal " 
 #"WHERE lock = 0 " 
 #"AND status = 0 " 
 "ORDER BY id, added_on LIMIT 1;") 
 except sqlite3.OperationalError, why: 
 log.error("Unable to query database: %s." % why) 
 return None 
 
 sample_row = self._cursor.fetchone() 
 
 if sample_row: 
 return self._get_task_dict(sample_row) 
 else: 
 return None 
 
 def search_md5(self, md5): 
 """ 
 
 """ 
 if not self._cursor: 
 return None 
 
 if not md5 or len(md5) != 32: 
 return None 
 
 try: 
 self._cursor.execute("SELECT * FROM virustotal " 
 "WHERE md5 = ? " 
 #"AND status = 1 " 
 "ORDER BY id DESC;", 
 (md5,)) 
 except sqlite3.OperationalError, why: 
 return None 
 
 task_dict = {} 
 for row in self._cursor.fetchall(): 
 task_dict = self._get_task_dict(row) 
 #if task_dict: 
 #tasks.append(task_dict) 
 
 return task_dict 
 
 
 
class VirusTotal: 
 """""" 
 
 def __init__(self, md5): 
 """Constructor""" 
 self._virus_dict = {} 
 self._md5 = md5 
 self._db_file = r"./db/virustotal.db" 
 self.get_report_dict() 
 
 def repr(self): 
 return str(self._virus_dict) 
 
 def submit_md5(self, file_path): 
 import postfile 
 #submit the file 
 FILE_NAME = os.path.basename(file_path) 
 
 
 host = "www.virustotal.com" 
 selector = "https://www.virustotal.com/vtapi/v2/file/scan" 
 fields = [("apikey", APIKEY)] 
 file_to_send = open(file_path, "rb").read() 
 files = [("file", FILE_NAME, file_to_send)] 
 json = postfile.post_multipart(host, selector, fields, files) 
 print json 
 pass 
 
 def get_report_dict(self): 
 result_dict = {} 
 
 url = "https://www.virustotal.com/vtapi/v2/file/report" 
 parameters = {"resource": self._md5, 
 "apikey": APIKEY} 
 data = urllib.urlencode(parameters) 
 req = urllib2.Request(url, data) 
 response = urllib2.urlopen(req) 
 json = response.read() 
 
 response_dict = simplejson.loads(json) 
 if response_dict["response_code"]: #has result 
 scans_dict = response_dict.get("scans", {}) 
 for anti_virus_comany, virus_name in scans_dict.iteritems(): 
 if virus_name["detected"]: 
 result_dict.setdefault(anti_virus_comany, virus_name["result"]) 
 return result_dict 
 
 def write_to_db(self): 
 """""" 
 db = VirusTotalDatabase(self._db_file) 
 virus_dict = self.get_report_dict() 
 db.add_sample(self._md5, virus_dict) 

使用方法如下:

config = {'input':"inputMd5s"} 
fp = open(config['input'], "r") 
content = fp.readlines() 
MD5S = [] 
for md5 in ifilter(lambda x:len(x)>0, imap(string.strip, content)): 
 MD5S.append(md5) 
print "MD5S",MD5S 
fp.close() 
 
 
from getVirusTotalInfo import VirusTotal 
#得到扫描结果并写入数库 
for md5 in MD5S: 
 virus_total = VirusTotal(md5) 
 virus_total.write_to_db() 

希望本文所述对大家的Python程序设计有所帮助。

下载本文
显示全文
专题