视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
python实现的解析crontab配置文件代码
2020-11-27 14:30:32 责编:小采
文档


#/usr/bin/env python
#-*- coding:utf-8 -*-
 
"""
1.解析 crontab 配置文件中的五个数间参数(分 时 日 月 周),获取他们对应的取值范围
2.将时间戳与crontab配置中一行时间参数对比,判断该时间戳是否在配置设定的时间范围内
"""
 
#$Id $
 
import re, time, sys
from Core.FDateTime.FDateTime import FDateTime
 
def get_struct_time(time_stamp_int):
	"""
	按整型时间戳获取格式化时间 分 时 日 月 周
	Args:
	time_stamp_int 为传入的值为时间戳(整形),如:1332888820
	经过localtime转换后变成
	time.struct_time(tm_year=2012, tm_mon=3, tm_mday=28, tm_hour=6, tm_min=53, tm_sec=40, tm_wday=2, tm_yday=88, tm_isdst=0)
	Return:
	list____返回 分 时 日 月 周
	"""
 
	st_time = time.localtime(time_stamp_int)
	return [st_time.tm_min, st_time.tm_hour, st_time.tm_mday, st_time.tm_mon, st_time.tm_wday]
 
 
def get_strptime(time_str, str_format):
	"""从字符串获取 整型时间戳
	Args:
	time_str 字符串类型的时间戳 如 '31/Jul/2013:17:46:01'
	str_format 指定 time_str 的格式 如 '%d/%b/%Y:%H:%M:%S'
	Return:
	返回10位整型(int)时间戳,如 1375146861
	"""
	return int(time.mktime(time.strptime(time_str, str_format)))
 
def get_str_time(time_stamp, str_format='%Y%m%d%H%M'):
	"""
	获取时间戳,
	Args:
	time_stamp 10位整型(int)时间戳,如 1375146861
	str_format 指定返回格式,值类型为 字符串 str
	Rturn:
	返回格式 默认为 年月日时分,如2013年7月9日1时3分 :201207090103
	"""
	return time.strftime("%s" % str_format, time.localtime(time_stamp))
 
def match_cont(patten, cont):
	"""
	正则匹配(精确符合的匹配)
	Args:
	patten 正则表达式
	cont____ 匹配内容
	Return:
	True or False
	"""
	res = re.match(patten, cont)
	if res:
	return True
	else:
	return False
 
def handle_num(val, ranges=(0, 100), res=list()):
	"""处理纯数字"""
	val = int(val)
	if val >= ranges[0] and val <= ranges[1]:
	res.append(val)
	return res
 
def handle_nlist(val, ranges=(0, 100), res=list()):
	"""处理数字列表 如 1,2,3,6"""
	val_list = val.split(',')
	for tmp_val in val_list:
	tmp_val = int(tmp_val)
	if tmp_val >= ranges[0] and tmp_val <= ranges[1]:
	res.append(tmp_val)
	return res
 
def handle_star(val, ranges=(0, 100), res=list()):
	"""处理星号"""
	if val == '*':
	tmp_val = ranges[0]
	while tmp_val <= ranges[1]:
	res.append(tmp_val)
	tmp_val = tmp_val + 1
	return res
 
def handle_starnum(val, ranges=(0, 100), res=list()):
	"""星号/数字 组合 如 */3"""
	tmp = val.split('/')
	val_step = int(tmp[1])
	if val_step < 1:
	return res
	val_tmp = int(tmp[1])
	while val_tmp <= ranges[1]:
	res.append(val_tmp)
	val_tmp = val_tmp + val_step
	return res
 
def handle_range(val, ranges=(0, 100), res=list()):
	"""处理区间 如 8-20"""
	tmp = val.split('-')
	range1 = int(tmp[0])
	range2 = int(tmp[1])
	tmp_val = range1
	if range1 < 0:
	return res
	while tmp_val <= range2 and tmp_val <= ranges[1]:
	res.append(tmp_val)
	tmp_val = tmp_val + 1
	return res
 
def handle_rangedv(val, ranges=(0, 100), res=list()):
	"""处理区间/步长 组合 如 8-20/3 """
	tmp = val.split('/')
	range2 = tmp[0].split('-')
	val_start = int(range2[0])
	val_end = int(range2[1])
	val_step = int(tmp[1])
	if (val_step < 1) or (val_start < 0):
	return res
	val_tmp = val_start
	while val_tmp <= val_end and val_tmp <= ranges[1]:
	res.append(val_tmp)
	val_tmp = val_tmp + val_step
	return res
 
def parse_conf(conf, ranges=(0, 100), res=list()):
	"""解析crontab 五个时间参数中的任意一个"""
	#去除空格,再拆分
	conf = conf.strip(' ').strip(' ')
	conf_list = conf.split(',')
	other_conf = []
	number_conf = []
	for conf_val in conf_list:
	if match_cont(PATTEN['number'], conf_val):
	#记录拆分后的纯数字参数
	number_conf.append(conf_val)
	else:
	#记录拆分后纯数字以外的参数,如通配符 * , 区间 0-8, 及 0-8/3 之类
	other_conf.append(conf_val)
	if other_conf:
	#处理纯数字外各种参数
	for conf_val in other_conf:
	for key, ptn in PATTEN.items():
	if match_cont(ptn, conf_val):
	res = PATTEN_HANDLER[key](val=conf_val, ranges=ranges, res=res)
	if number_conf:
	if len(number_conf) > 1 or other_conf:
	#纯数字多于1,或纯数字与其它参数共存,则数字作为时间列表
	res = handle_nlist(val=','.join(number_conf), ranges=ranges, res=res)
	else:
	#只有一个纯数字存在,则数字为时间 间隔
	res = handle_num(val=number_conf[0], ranges=ranges, res=res)
	return res
 
def parse_crontab_time(conf_string):
	"""
	解析crontab时间配置参数
	Args:
	conf_string 配置内容(共五个值:分 时 日 月 周)
	 取值范围 分钟:0-59 小时:1-23 日期:1-31 月份:1-12 星期:0-6(0表示周日)
	Return:
	crontab_range	 list格式,分 时 日 月 周 五个传入参数分别对应的取值范围
	"""
	time_limit	= ((0, 59), (1, 23), (1, 31), (1, 12), (0, 6))
	crontab_range = []
	clist = []
	conf_length = 5
	tmp_list = conf_string.split(' ')
	for val in tmp_list:
	if len(clist) == conf_length:
	break
	if val:
	clist.append(val)
 
	if len(clist) != conf_length:
	return -1, 'config error whith [%s]' % conf_string
	cindex = 0
	for conf in clist:
	res_conf = []
	res_conf = parse_conf(conf, ranges=time_limit[cindex], res=res_conf)
	if not res_conf:
	return -1, 'config error whith [%s]' % conf_string
	crontab_range.append(res_conf)
	cindex = cindex + 1
	return 0, crontab_range
 
def time_match_crontab(crontab_time, time_struct):
	"""
	将时间戳与crontab配置中一行时间参数对比,判断该时间戳是否在配置设定的时间范围内
	Args:
	crontab_time____crontab配置中的五个时间(分 时 日 月 周)参数对应时间取值范围
	time_struct____ 某个整型时间戳,如:1375027200 对应的 分 时 日 月 周
	Return:
	tuple 状态码, 状态描述
	"""
	cindex = 0
	for val in time_struct:
	if val not in crontab_time[cindex]:
	return 0, False
	cindex = cindex + 1
	return 0, True
 
def close_to_cron(crontab_time, time_struct):
	"""coron的指定范围(crontab_time)中 最接近 指定时间 time_struct 的值"""
	close_time = time_struct
	cindex = 0
	for val_struct in time_struct:
	offset_min = val_struct
	val_close = val_struct
	for val_cron in crontab_time[cindex]:
	offset_tmp = val_struct - val_cron
	if offset_tmp > 0 and offset_tmp < offset_min:
	val_close = val_struct
	offset_min = offset_tmp
	close_time[cindex] = val_close
	cindex = cindex + 1
	return close_time
 
def cron_time_list(
	cron_time,
	year_num=int(get_str_time(time.time(), "%Y")),
	limit_start=get_str_time(time.time(), "%Y%m%d%H%M"),
	limit_end=get_str_time(time.time() + 800, "%Y%m%d%H%M")
	):
	#print "
from ", limit_start , ' to ' ,limit_end
	"""
	获取crontab时间配置参数取值范围内的所有时间点 的 时间戳
	Args:
	cron_time 符合crontab配置指定的所有时间点
	year_num____指定在哪一年内 获取
	limit_start 开始时间
	Rturn:
	List 所有时间点组成的列表(年月日时分 组成的时间,如2013年7月29日18时56分:201307291856)
	"""
	#按小时 和 分钟组装
	hour_minute = []
	for minute in cron_time[0]:
	minute = str(minute)
	if len(minute) < 2:
	minute = '0%s' % minute
	for hour in cron_time[1]:
	hour = str(hour)
	if len(hour) < 2:
	hour = '0%s' % hour
	hour_minute.append('%s%s' % (hour, minute))
	#按天 和 小时组装
	day_hm = []
	for day in cron_time[2]:
	day = str(day)
	if len(day) < 2:
	day = '0%s' % day
	for hour_mnt in hour_minute:
	day_hm.append('%s%s' % (day, hour_mnt))
	#按月 和 天组装
	month_dhm = []
	#只有30天的月份
	month_short = ['02', '04', '06', '09', '11']
	for month in cron_time[3]:
	month = str(month)
	if len(month) < 2:
	month = '0%s' % month
	for day_hm_s in day_hm:
	if month == '02':
	if (((not year_num % 4 ) and (year_num % 100)) or (not year_num % 400)):
	#闰年2月份有29天
	if int(day_hm_s[:2]) > 29:
	continue
	else:
	#其它2月份有28天
	if int(day_hm_s[:2]) > 28:
	continue
	if month in month_short:
	if int(day_hm_s[:2]) > 30:
	continue
	month_dhm.append('%s%s' % (month, day_hm_s))
	#按年 和 月组装
	len_start = len(limit_start)
	len_end = len(limit_end)
	month_dhm_limit = []
	for month_dhm_s in month_dhm:
	time_ymdhm = '%s%s' % (str(year_num), month_dhm_s)
	#开始时间结束时间以外的排除
	if (int(time_ymdhm[:len_start]) < int(limit_start)) or 
	 (int(time_ymdhm[:len_end]) > int(limit_end)):
	continue
	month_dhm_limit.append(time_ymdhm)
	if len(cron_time[4]) < 7:
	#按不在每周指定时间的排除
	month_dhm_week = []
	for time_minute in month_dhm_limit:
	str_time = time.strptime(time_minute, '%Y%m%d%H%M%S')
	if str_time.tm_wday in cron_time[4]:
	month_dhm_week.append(time_minute)
	return month_dhm_week
	return month_dhm_limit
 
 
#crontab时间参数各种写法 的 正则匹配
PATTEN = {
	#纯数字
	'number':'^[0-9]+$',
	#数字列表,如 1,2,3,6
	'num_list':'^[0-9]+([,][0-9]+)+$',
	#星号 *
	'star':'^*$',
	#星号/数字 组合,如 */3
	'star_num':'^*/[0-9]+$',
	#区间 如 8-20
	'range':'^[0-9]+[-][0-9]+$',
	#区间/步长 组合 如 8-20/3
	'range_div':'^[0-9]+[-][0-9]+[/][0-9]+$'
	#区间/步长 列表 组合,如 8-20/3,21,22,34
	#'range_div_list':'^([0-9]+[-][0-9]+[/][0-9]+)([,][0-9]+)+$'
	}
#各正则对应的处理方法
PATTEN_HANDLER = {
	'number':handle_num,
	'num_list':handle_nlist,
	'star':handle_star,
	'star_num':handle_starnum,
	'range':handle_range,
	'range_div':handle_rangedv
}
 
 
def isdo(strs,tips=None):
	"""
	判断是否匹配成功!
	"""
	try:
	tips = tips==None and "文件名称格式错误:job_月-周-天-时-分_文件名.txt" or tips
	timer = strs.replace('@',"*").replace('%','/').split('_')[1]
	month,week,day,hour,mins = timer.split('-')
	conf_string = mins+" "+hour+" "+day+" "+month+" "+week
	res, desc = parse_crontab_time(conf_string)
	if res == 0:
	cron_time = desc
	else:
	return False
 
	now =FDateTime.now()
	now = FDateTime.datetostring(now, "%Y%m%d%H%M00")
 
	time_stamp = FDateTime.strtotime(now, "%Y%m%d%H%M00")
 
	#time_stamp = int(time.time())
	#解析 时间戳对应的 分 时 日 月 周
	time_struct = get_struct_time(time_stamp)
	match_res = time_match_crontab(cron_time, time_struct)
	return match_res[1]
	except:
	print tips
	return False
 
def main():
	"""测试用实例"""
	#crontab配置中一行时间参数
	#conf_string = '*/10 * * * * (cd /opt/pythonpm/devpapps; /usr/local/bin/python2.5 data_test.py>>output_error.txt)'
	conf_string = '*/10 * * * *'
	#时间戳
	time_stamp = int(time.time())
 
	#解析crontab时间配置参数 分 时 日 月 周 各个取值范围
	res, desc = parse_crontab_time(conf_string)
 
	if res == 0:
	cron_time = desc
	else:
	print desc
	sys, exit(-1)
 
	print "
config:", conf_string
	print "
parse result(range for crontab):"
 
	print " minute:", cron_time[0]
	print " hour: ", cron_time[1]
	print " day: ", cron_time[2]
	print " month: ", cron_time[3]
	print " week day:", cron_time[4]
 
	#解析 时间戳对应的 分 时 日 月 周
	time_struct = get_struct_time(time_stamp)
	print "
struct time(minute hour day month week) for %d :" % 
	 time_stamp, time_struct
 
	#将时间戳与crontab配置中一行时间参数对比,判断该时间戳是否在配置设定的时间范围内
	match_res = time_match_crontab(cron_time, time_struct)
	print "
matching result:", match_res
 
	#crontab配置设定范围中最近接近时指定间戳的一组时间
	most_close = close_to_cron(cron_time, time_struct)
	print "
in range of crontab time which is most colse to struct ", most_close
 
	time_list = cron_time_list(cron_time)
	print "

 %d times need to tart-up:
" % len(time_list)
	print time_list[:10], '...'
 
 
if __name__ == '__main__':
	#请看 使用实例
	strs = 'job_@-@-@-@-@_test02.txt.sh'
	print isdo(strs)
 
	#main()0")

下载本文
显示全文
专题