Simple Linux Panel
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mdserver-web/class/core/mw.py

2260 lines
61 KiB

3 years ago
# coding:utf-8
# ---------------------------------------------------------------------------------
# MW-Linux面板
# ---------------------------------------------------------------------------------
# copyright (c) 2018-∞(https://github.com/midoks/mdserver-web) All rights reserved.
# ---------------------------------------------------------------------------------
# Author: midoks <midoks@163.com>
# ---------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------
# 核心方法库
# ---------------------------------------------------------------------------------
7 years ago
import os
import sys
import time
import string
import json
import hashlib
import shlex
import datetime
import subprocess
import glob
import base64
7 years ago
import re
7 years ago
from random import Random
import db
7 years ago
7 years ago
def execShell(cmdstring, cwd=None, timeout=None, shell=True):
if shell:
cmdstring_list = cmdstring
else:
cmdstring_list = shlex.split(cmdstring)
if timeout:
end_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
sub = subprocess.Popen(cmdstring_list, cwd=cwd, stdin=subprocess.PIPE,
shell=shell, bufsize=4096, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while sub.poll() is None:
time.sleep(0.1)
if timeout:
if end_time <= datetime.datetime.now():
raise Exception("Timeout:%s" % cmdstring)
4 years ago
if sys.version_info[0] == 2:
return sub.communicate()
4 years ago
data = sub.communicate()
# python3 fix 返回byte数据
if isinstance(data[0], bytes):
t1 = str(data[0], encoding='utf-8')
if isinstance(data[1], bytes):
t2 = str(data[1], encoding='utf-8')
return (t1, t2)
7 years ago
3 years ago
def getTracebackInfo():
import traceback
errorMsg = traceback.format_exc()
return errorMsg
7 years ago
def getRunDir():
return os.getcwd()
7 years ago
def getRootDir():
return os.path.dirname(os.path.dirname(getRunDir()))
7 years ago
7 years ago
def getPluginDir():
return getRunDir() + '/plugins'
def getPanelDataDir():
return getRunDir() + '/data'
1 year ago
def getMWLogs():
return getRunDir() + '/logs'
def getPanelTmp():
return getRunDir() + '/tmp'
7 years ago
def getServerDir():
return getRootDir() + '/server'
def getLogsDir():
return getRootDir() + '/wwwlogs'
6 years ago
def getWwwDir():
6 years ago
file = getRunDir() + '/data/site.pl'
if os.path.exists(file):
return readFile(file).strip()
6 years ago
return getRootDir() + '/wwwroot'
6 years ago
def setWwwDir(wdir):
file = getRunDir() + '/data/site.pl'
return writeFile(file, wdir)
3 years ago
def getBackupDir():
file = getRunDir() + '/data/backup.pl'
if os.path.exists(file):
return readFile(file).strip()
return getRootDir() + '/backup'
6 years ago
def setBackupDir(bdir):
file = getRunDir() + '/data/backup.pl'
6 years ago
return writeFile(file, bdir)
6 years ago
3 years ago
def getAcmeDir():
acme = '/root/.acme.sh'
if isAppleSystem():
cmd = "who | sed -n '2, 1p' |awk '{print $1}'"
user = execShell(cmd)[0].strip()
acme = '/Users/' + user + '/.acme.sh'
if not os.path.exists(acme):
acme = '/.acme.sh'
return acme
2 years ago
def getAcmeDomainDir(domain):
acme_dir = getAcmeDir()
acme_domain = acme_dir + '/' + domain
acme_domain_ecc = acme_domain + '_ecc'
if os.path.exists(acme_domain_ecc):
acme_domain = acme_domain_ecc
return acme_domain
2 years ago
def fileNameCheck(filename):
f_strs = [';', '&', '<', '>']
for fs in f_strs:
if filename.find(fs) != -1:
return False
return True
3 years ago
def triggerTask():
3 years ago
isTask = getRunDir() + '/tmp/panelTask.pl'
writeFile(isTask, 'True')
3 years ago
3 years ago
def systemdCfgDir():
3 years ago
# ubuntu
3 years ago
cfg_dir = '/lib/systemd/system'
3 years ago
if os.path.exists(cfg_dir):
return cfg_dir
3 years ago
# debian,centos
3 years ago
cfg_dir = '/usr/lib/systemd/system'
if os.path.exists(cfg_dir):
return cfg_dir
# local test
return "/tmp"
3 years ago
3 years ago
def getSslCrt():
if os.path.exists('/etc/ssl/certs/ca-certificates.crt'):
return '/etc/ssl/certs/ca-certificates.crt'
if os.path.exists('/etc/pki/tls/certs/ca-bundle.crt'):
return '/etc/pki/tls/certs/ca-bundle.crt'
return ''
7 years ago
def getOs():
6 years ago
return sys.platform
7 years ago
3 years ago
def getOsName():
cmd = "cat /etc/*-release | grep PRETTY_NAME |awk -F = '{print $2}' | awk -F '\"' '{print $2}'| awk '{print $1}'"
3 years ago
data = execShell(cmd)
3 years ago
return data[0].strip().lower()
3 years ago
def getOsID():
cmd = "cat /etc/*-release | grep VERSION_ID | awk -F = '{print $2}' | awk -F '\"' '{print $2}'"
3 years ago
data = execShell(cmd)
return data[0].strip()
3 years ago
3 years ago
def getFileSuffix(file):
tmp = file.split('.')
ext = tmp[len(tmp) - 1]
return ext
6 years ago
def isAppleSystem():
if getOs() == 'darwin':
return True
return False
11 months ago
def isDocker():
return os.path.exists('/.dockerenv')
6 years ago
def isDebugMode():
if isAppleSystem():
return True
debugPath = getRunDir() + "/data/debug.pl"
if os.path.exists(debugPath):
return True
return False
3 years ago
def isNumber(s):
try:
float(s)
return True
except ValueError:
pass
try:
import unicodedata
unicodedata.numeric(s)
return True
except (TypeError, ValueError):
pass
return False
6 years ago
def deleteFile(file):
if os.path.exists(file):
os.remove(file)
6 years ago
def isInstalledWeb():
path = getServerDir() + '/openresty/nginx/sbin/nginx'
if os.path.exists(path):
return True
return False
2 years ago
def isIpAddr(ip):
check_ip = re.compile(
1 year ago
'^(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|[1-9])\\.(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)\\.(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)\\.(1\\d{2}|2[0-4]\\d|25[0-5]|[1-9]\\d|\\d)$')
2 years ago
if check_ip.match(ip):
2 years ago
return True
else:
return False
2 years ago
def getWebStatus():
pid = getServerDir() + '/openresty/nginx/logs/nginx.pid'
if os.path.exists(pid):
return True
return False
2 years ago
# ------------------------------ openresty start -----------------------------
6 years ago
def restartWeb():
3 years ago
return opWeb("reload")
def opWeb(method):
if not isInstalledWeb():
return False
# systemd
systemd = '/lib/systemd/system/openresty.service'
if os.path.exists(systemd):
3 years ago
execShell('systemctl ' + method + ' openresty')
return True
# initd
initd = getServerDir() + '/openresty/init.d/openresty'
3 years ago
if os.path.exists(initd):
3 years ago
execShell(initd + ' ' + method)
return True
return False
6 years ago
2 years ago
def opLuaMake(cmd_name):
path = getServerDir() + '/web_conf/nginx/lua/lua.conf'
root_dir = getServerDir() + '/web_conf/nginx/lua/' + cmd_name
dst_path = getServerDir() + '/web_conf/nginx/lua/' + cmd_name + '.lua'
def_path = getServerDir() + '/web_conf/nginx/lua/empty.lua'
if not os.path.exists(root_dir):
execShell('mkdir -p ' + root_dir)
files = []
for fl in os.listdir(root_dir):
suffix = getFileSuffix(fl)
if suffix != 'lua':
continue
flpath = os.path.join(root_dir, fl)
files.append(flpath)
if len(files) > 0:
def_path = dst_path
content = ''
for f in files:
t = readFile(f)
f_base = os.path.basename(f)
2 years ago
content += '-- ' + '*' * 20 + ' ' + f_base + ' start ' + '*' * 20 + "\n"
2 years ago
content += t
2 years ago
content += "\n" + '-- ' + '*' * 20 + ' ' + f_base + ' end ' + '*' * 20 + "\n"
2 years ago
writeFile(dst_path, content)
else:
if os.path.exists(dst_path):
os.remove(dst_path)
conf = readFile(path)
conf = re.sub(cmd_name + ' (.*);',
cmd_name + " " + def_path + ";", conf)
writeFile(path, conf)
2 years ago
def opLuaInitFile():
opLuaMake('init_by_lua_file')
2 years ago
def opLuaInitWorkerFile():
opLuaMake('init_worker_by_lua_file')
def opLuaInitAccessFile():
opLuaMake('access_by_lua_file')
2 years ago
def opLuaMakeAll():
2 years ago
opLuaInitFile()
opLuaInitWorkerFile()
opLuaInitAccessFile()
2 years ago
# ------------------------------ openresty end -----------------------------
6 years ago
def restartMw():
import system_api
system_api.system_api().restartMw()
12 months ago
def restartNginx(self):
writeFile('data/restart_nginx.pl', 'True')
return True
6 years ago
6 years ago
def checkWebConfig():
3 years ago
op_dir = getServerDir() + '/openresty/nginx'
2 years ago
# "ulimit -n 10240 && " +
cmd = op_dir + \
3 years ago
"/sbin/nginx -t -c " + op_dir + "/conf/nginx.conf"
6 years ago
result = execShell(cmd)
searchStr = 'test is successful'
6 years ago
if result[1].find(searchStr) == -1:
6 years ago
msg = getInfo('配置文件错误: {1}', (result[1],))
writeLog("软件管理", msg)
6 years ago
return result[1]
return True
7 years ago
def M(table):
sql = db.Sql()
return sql.table(table)
7 years ago
def getPage(args, result='1,2,3,4,5,8'):
7 years ago
data = getPageObject(args, result)
return data[0]
def getPageObject(args, result='1,2,3,4,5,8'):
7 years ago
# 取分页
import page
# 实例化分页类
page = page.Page()
info = {}
7 years ago
info['count'] = 0
4 years ago
if 'count' in args:
7 years ago
info['count'] = int(args['count'])
7 years ago
info['row'] = 10
4 years ago
if 'row' in args:
7 years ago
info['row'] = int(args['row'])
7 years ago
info['p'] = 1
4 years ago
if 'p' in args:
7 years ago
info['p'] = int(args['p'])
7 years ago
info['uri'] = {}
info['return_js'] = ''
4 years ago
if 'tojs' in args:
7 years ago
info['return_js'] = args['tojs']
7 years ago
if 'args_tpl' in args:
info['args_tpl'] = args['args_tpl']
7 years ago
return (page.GetPage(info, result), page)
7 years ago
3 years ago
def md5(content):
7 years ago
# 生成MD5
try:
m = hashlib.md5()
3 years ago
m.update(content.encode("utf-8"))
7 years ago
return m.hexdigest()
4 years ago
except Exception as ex:
7 years ago
return False
7 years ago
def getFileMd5(filename):
7 years ago
# 文件的MD5值
if not os.path.isfile(filename):
return False
myhash = hashlib.md5()
f = file(filename, 'rb')
while True:
b = f.read(8096)
if not b:
break
myhash.update(b)
f.close()
return myhash.hexdigest()
7 years ago
def getRandomString(length):
7 years ago
# 取随机字符串
2 years ago
rnd_str = ''
7 years ago
chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
chrlen = len(chars) - 1
random = Random()
for i in range(length):
2 years ago
rnd_str += chars[random.randint(0, chrlen)]
return rnd_str
7 years ago
3 years ago
def getUniqueId():
"""
根据时间生成唯一ID
:return:
"""
current_time = datetime.datetime.now()
str_time = current_time.strftime('%Y%m%d%H%M%S%f')[:-3]
unique_id = "{0}".format(str_time)
return unique_id
7 years ago
def getJson(data):
import json
return json.dumps(data)
6 years ago
def returnData(status, msg, data=None):
6 years ago
return {'status': status, 'msg': msg, 'data': data}
6 years ago
6 years ago
def returnJson(status, msg, data=None):
6 years ago
# if data == None:
# return {'status': status, 'msg': msg}
# return {'status': status, 'msg': msg, 'data': data}
6 years ago
if data == None:
6 years ago
return getJson({'status': status, 'msg': msg})
return getJson({'status': status, 'msg': msg, 'data': data})
7 years ago
3 years ago
def getLanguage():
path = 'data/language.pl'
if not os.path.exists(path):
return 'Simplified_Chinese'
return readFile(path).strip()
def getStaticJson(name="public"):
file = 'static/language/' + getLanguage() + '/' + name + '.json'
if not os.path.exists(file):
file = 'route/static/language/' + getLanguage() + '/' + name + '.json'
return file
7 years ago
def returnMsg(status, msg, args=()):
# 取通用字曲返回
3 years ago
pjson = getStaticJson('public')
6 years ago
logMessage = json.loads(readFile(pjson))
7 years ago
keys = logMessage.keys()
7 years ago
7 years ago
if msg in keys:
msg = logMessage[msg]
for i in range(len(args)):
rep = '{' + str(i + 1) + '}'
msg = msg.replace(rep, args[i])
7 years ago
return {'status': status, 'msg': msg, 'data': args}
7 years ago
7 years ago
def getInfo(msg, args=()):
# 取提示消息
for i in range(len(args)):
rep = '{' + str(i + 1) + '}'
msg = msg.replace(rep, args[i])
return msg
7 years ago
def getMsg(key, args=()):
# 取提示消息
try:
3 years ago
pjson = getStaticJson('public')
logMessage = json.loads(pjson)
7 years ago
keys = logMessage.keys()
msg = None
if key in keys:
msg = logMessage[key]
for i in range(len(args)):
rep = '{' + str(i + 1) + '}'
msg = msg.replace(rep, args[i])
return msg
except:
return key
def getLan(key):
# 取提示消息
3 years ago
pjson = getStaticJson('public')
logMessage = json.loads(pjson)
7 years ago
keys = logMessage.keys()
msg = None
if key in keys:
msg = logMessage[key]
return msg
def readFile(filename):
# 读文件内容
try:
fp = open(filename, 'r')
fBody = fp.read()
fp.close()
return fBody
4 years ago
except Exception as e:
3 years ago
# print(e)
7 years ago
return False
def getDate():
# 取格式时间
import time
return time.strftime('%Y-%m-%d %X', time.localtime())
2 years ago
def getDateFromNow(tf_format="%Y-%m-%d %H:%M:%S", time_zone="Asia/Shanghai"):
# 取格式时间
import time
os.environ['TZ'] = time_zone
time.tzset()
return time.strftime(tf_format, time.localtime())
2 years ago
def getDataFromInt(val):
time_format = '%Y-%m-%d %H:%M:%S'
time_str = time.localtime(val)
return time.strftime(time_format, time_str)
3 years ago
def writeLog(stype, msg, args=()):
7 years ago
# 写日志
2 years ago
uid = 1
7 years ago
try:
from flask import session
if 'uid' in session:
uid = session['uid']
2 years ago
except Exception as e:
2 years ago
pass
# writeFileLog(getTracebackInfo())
2 years ago
return writeDbLog(stype, msg, args, uid)
2 years ago
def writeFileLog(msg, path=None, limit_size=50 * 1024 * 1024, save_limit=3):
log_file = getServerDir() + '/mdserver-web/logs/debug.log'
if path != None:
log_file = path
if os.path.exists(log_file):
size = os.path.getsize(log_file)
if size > limit_size:
log_file_rename = log_file + "_" + \
time.strftime("%Y-%m-%d_%H%M%S") + '.log'
os.rename(log_file, log_file_rename)
logs = sorted(glob.glob(log_file + "_*"))
count = len(logs)
save_limit = count - save_limit
for i in range(count):
if i > save_limit:
break
os.remove(logs[i])
# print('|---多余日志[' + logs[i] + ']已删除!')
f = open(log_file, 'ab+')
msg += "\n"
if __name__ == '__main__':
print(msg)
f.write(msg.encode('utf-8'))
f.close()
return True
2 years ago
def writeDbLog(stype, msg, args=(), uid=1):
try:
import time
import db
import json
7 years ago
sql = db.Sql()
3 years ago
mdate = time.strftime('%Y-%m-%d %X', time.localtime())
wmsg = getInfo(msg, args)
data = (stype, wmsg, uid, mdate)
result = sql.table('logs').add('type,log,uid,addtime', data)
return True
6 years ago
except Exception as e:
3 years ago
return False
7 years ago
3 years ago
def writeFile(filename, content, mode='w+'):
7 years ago
# 写文件内容
try:
3 years ago
fp = open(filename, mode)
3 years ago
fp.write(content)
7 years ago
fp.close()
return True
3 years ago
except Exception as e:
7 years ago
return False
def backFile(file, act=None):
3 years ago
"""
@name 备份配置文件
@param file 需要备份的文件
@param act 如果存在则备份一份作为默认配置
"""
file_type = "_bak"
if act:
file_type = "_def"
# print("cp -p {0} {1}".format(file, file + file_type))
execShell("cp -p {0} {1}".format(file, file + file_type))
3 years ago
def removeBackFile(file, act=None):
"""
@name 删除备份配置文件
@param file 需要删除备份文件
@param act 如果存在则还原默认配置
"""
file_type = "_bak"
if act:
file_type = "_def"
execShell("rm -rf {0}".format(file + file_type))
def restoreFile(file, act=None):
3 years ago
"""
@name 还原配置文件
@param file 需要还原的文件
@param act 如果存在则还原默认配置
"""
file_type = "_bak"
if act:
file_type = "_def"
execShell("cp -p {1} {0}".format(file, file + file_type))
3 years ago
3 years ago
def enPunycode(domain):
if sys.version_info[0] == 2:
domain = domain.encode('utf8')
tmp = domain.split('.')
newdomain = ''
for dkey in tmp:
if dkey == '*':
continue
# 匹配非ascii字符
match = re.search(u"[\x80-\xff]+", dkey)
if not match:
match = re.search(u"[\u4e00-\u9fa5]+", dkey)
if not match:
newdomain += dkey + '.'
else:
if sys.version_info[0] == 2:
newdomain += 'xn--' + \
dkey.decode('utf-8').encode('punycode') + '.'
else:
newdomain += 'xn--' + \
dkey.encode('punycode').decode('utf-8') + '.'
if tmp[0] == '*':
newdomain = "*." + newdomain
return newdomain[0:-1]
def dePunycode(domain):
# punycode 转中文
tmp = domain.split('.')
newdomain = ''
for dkey in tmp:
if dkey.find('xn--') >= 0:
newdomain += dkey.replace('xn--',
'').encode('utf-8').decode('punycode') + '.'
else:
newdomain += dkey + '.'
return newdomain[0:-1]
def enCrypt(key, strings):
# 加密字符串
try:
2 years ago
import base64
_key = key.encode('utf-8')
2 years ago
_key = base64.urlsafe_b64encode(_key)
if type(strings) != bytes:
strings = strings.encode('utf-8')
2 years ago
import cryptography
from cryptography.fernet import Fernet
2 years ago
f = Fernet(_key)
result = f.encrypt(strings)
return result.decode('utf-8')
except:
2 years ago
writeFileLog(getTracebackInfo())
return strings
def deCrypt(key, strings):
# 解密字符串
try:
import base64
_key = key.encode('utf-8')
_key = base64.urlsafe_b64encode(_key)
if type(strings) != bytes:
strings = strings.encode('utf-8')
from cryptography.fernet import Fernet
f = Fernet(_key)
result = f.decrypt(strings).decode('utf-8')
return result
except:
2 years ago
writeFileLog(getTracebackInfo())
return strings
def enDoubleCrypt(key, strings):
# 加密字符串
try:
import base64
_key = md5(key).encode('utf-8')
_key = base64.urlsafe_b64encode(_key)
if type(strings) != bytes:
strings = strings.encode('utf-8')
import cryptography
from cryptography.fernet import Fernet
f = Fernet(_key)
result = f.encrypt(strings)
return result.decode('utf-8')
except:
2 years ago
writeFileLog(getTracebackInfo())
return strings
def deDoubleCrypt(key, strings):
# 解密字符串
try:
2 years ago
import base64
_key = md5(key).encode('utf-8')
_key = base64.urlsafe_b64encode(_key)
if type(strings) != bytes:
2 years ago
strings = strings.encode('utf-8')
from cryptography.fernet import Fernet
2 years ago
f = Fernet(_key)
result = f.decrypt(strings).decode('utf-8')
return result
except:
2 years ago
writeFileLog(getTracebackInfo())
return strings
def aesEncrypt(data, key='ABCDEFGHIJKLMNOP', vi='0102030405060708'):
# aes加密
# @param data 被加密的数据
# @param key 加解密密匙 16位
# @param vi 16位
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
if not isinstance(data, bytes):
data = data.encode()
# AES_CBC_KEY = os.urandom(32)
# AES_CBC_IV = os.urandom(16)
AES_CBC_KEY = key.encode()
AES_CBC_IV = vi.encode()
# print("AES_CBC_KEY:", AES_CBC_KEY)
# print("AES_CBC_IV:", AES_CBC_IV)
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
cipher = Cipher(algorithms.AES(AES_CBC_KEY),
modes.CBC(AES_CBC_IV),
backend=default_backend())
encryptor = cipher.encryptor()
edata = encryptor.update(padded_data)
# print(edata)
# print(str(edata))
# print(edata.decode())
return edata
def aesDecrypt(data, key='ABCDEFGHIJKLMNOP', vi='0102030405060708'):
# aes加密
# @param data 被解密的数据
# @param key 加解密密匙 16位
# @param vi 16位
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
if not isinstance(data, bytes):
data = data.encode()
AES_CBC_KEY = key.encode()
AES_CBC_IV = vi.encode()
cipher = Cipher(algorithms.AES(AES_CBC_KEY),
modes.CBC(AES_CBC_IV),
backend=default_backend())
decryptor = cipher.decryptor()
ddata = decryptor.update(data)
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
data = unpadder.update(ddata)
try:
uppadded_data = data + unpadder.finalize()
except ValueError:
raise Exception('无效的加密信息!')
return uppadded_data
def aesEncrypt_Crypto(data, key, vi):
# 该方法保留,暂时不使用
# aes加密
# @param data 被加密的数据
# @param key 加解密密匙 16位
# @param vi 16位
from Crypto.Cipher import AES
cryptor = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8'))
# 判断是否含有中文
zhmodel = re.compile(u'[\u4e00-\u9fff]')
match = zhmodel.search(data)
if match == None:
# 无中文时
add = 16 - len(data) % 16
pad = lambda s: s + add * chr(add)
data = pad(data)
enctext = cryptor.encrypt(data.encode('utf8'))
else:
# 含有中文时
data = data.encode()
add = 16 - len(data) % 16
data = data + add * (chr(add)).encode()
enctext = cryptor.encrypt(data)
encodestrs = base64.b64encode(enctext).decode('utf8')
return encodestrs
def aesDecrypt_Crypto(data, key, vi):
# 该方法保留,暂时不使用
# aes加密
# @param data 被加密的数据
# @param key 加解密密匙 16位
# @param vi 16位
from crypto.Cipher import AES
data = data.encode('utf8')
encodebytes = base64.urlsafe_b64decode(data)
cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8'))
text_decrypted = cipher.decrypt(encodebytes)
# 判断是否含有中文
zhmodel = re.compile(u'[\u4e00-\u9fff]')
match = zhmodel.search(text_decrypted)
if match == False:
# 无中文时补位
unpad = lambda s: s[0:-s[-1]]
text_decrypted = unpad(text_decrypted)
text_decrypted = text_decrypted.decode('utf8').rstrip() # 去掉补位的右侧空格
return text_decrypted
1 year ago
def getDefault(data,val,def_val=''):
if val in data:
return data[val]
return def_val
1 year ago
def encodeImage(imgsrc, newsrc):
# 图片加密
import struct
old_fp = open(imgsrc, 'rb')
imgFile = old_fp.read()
old_fp.close()
new_fp = open(newsrc,"wb")
for x in imgFile:
1 year ago
value = x ^ 86
1 year ago
value = hex(value)
s = struct.pack('B',int(value,16))
new_fp.write(s)
new_fp.close()
return True
3 years ago
def buildSoftLink(src, dst, force=False):
'''
建立软连接
'''
if not os.path.exists(src):
return False
if os.path.exists(dst) and force:
os.remove(dst)
if not os.path.exists(dst):
3 years ago
execShell('ln -sf "' + src + '" "' + dst + '"')
3 years ago
return True
return False
6 years ago
def HttpGet(url, timeout=10):
"""
发送GET请求
@url 被请求的URL地址(必需)
@timeout 超时时间默认60秒
return string
"""
if sys.version_info[0] == 2:
7 years ago
try:
6 years ago
import urllib2
import ssl
if sys.version_info[0] == 2:
reload(urllib2)
reload(ssl)
try:
ssl._create_default_https_context = ssl._create_unverified_context
except:
pass
response = urllib2.urlopen(url, timeout=timeout)
return response.read()
except Exception as ex:
return str(ex)
else:
try:
import urllib.request
import ssl
try:
ssl._create_default_https_context = ssl._create_unverified_context
except:
pass
response = urllib.request.urlopen(url, timeout=timeout)
result = response.read()
if type(result) == bytes:
result = result.decode('utf-8')
return result
except Exception as ex:
return str(ex)
6 years ago
def HttpGet2(url, timeout):
4 years ago
import urllib.request
6 years ago
try:
3 years ago
import ssl
try:
ssl._create_default_https_context = ssl._create_unverified_context
except:
pass
4 years ago
req = urllib.request.urlopen(url, timeout=timeout)
result = req.read().decode('utf-8')
return result
except Exception as e:
return str(e)
6 years ago
6 years ago
def httpGet(url, timeout=10):
6 years ago
return HttpGet2(url, timeout)
6 years ago
def HttpPost(url, data, timeout=10):
"""
发送POST请求
@url 被请求的URL地址(必需)
@data POST参数可以是字符串或字典(必需)
@timeout 超时时间默认60秒
return string
"""
if sys.version_info[0] == 2:
7 years ago
try:
6 years ago
import urllib
import urllib2
import ssl
7 years ago
ssl._create_default_https_context = ssl._create_unverified_context
6 years ago
data = urllib.urlencode(data)
req = urllib2.Request(url, data)
response = urllib2.urlopen(req, timeout=timeout)
return response.read()
except Exception as ex:
return str(ex)
else:
try:
import urllib.request
import ssl
try:
ssl._create_default_https_context = ssl._create_unverified_context
except:
pass
data = urllib.parse.urlencode(data).encode('utf-8')
req = urllib.request.Request(url, data)
response = urllib.request.urlopen(req, timeout=timeout)
result = response.read()
if type(result) == bytes:
result = result.decode('utf-8')
return result
except Exception as ex:
return str(ex)
def httpPost(url, data, timeout=10):
return HttpPost(url, data, timeout)
7 years ago
def writeSpeed(title, used, total, speed=0):
# 写进度
if not title:
data = {'title': None, 'progress': 0,
'total': 0, 'used': 0, 'speed': 0}
else:
progress = int((100.0 * used / total))
data = {'title': title, 'progress': progress,
'total': total, 'used': used, 'speed': speed}
writeFile('/tmp/panelSpeed.pl', json.dumps(data))
return True
def getSpeed():
# 取进度
path = getRootDir()
6 years ago
data = readFile(path + '/tmp/panelSpeed.pl')
7 years ago
if not data:
data = json.dumps({'title': None, 'progress': 0,
'total': 0, 'used': 0, 'speed': 0})
6 years ago
writeFile(path + '/tmp/panelSpeed.pl', data)
7 years ago
return json.loads(data)
3 years ago
def getLastLineBk(inputfile, lineNum):
7 years ago
# 读文件指定倒数行数
try:
fp = open(inputfile, 'rb')
7 years ago
lastLine = ""
lines = fp.readlines()
count = len(lines)
if count > lineNum:
num = lineNum
else:
num = count
i = 1
lastre = []
for i in range(1, (num + 1)):
n = -i
try:
lastLine = lines[n].decode("utf-8", "ignore").strip()
except Exception as e:
lastLine = ""
lastre.append(lastLine)
7 years ago
fp.close()
7 years ago
result = ''
num -= 1
while num >= 0:
result += lastre[num] + "\n"
num -= 1
return result
3 years ago
except Exception as e:
return str(e)
# return getMsg('TASK_SLEEP')
7 years ago
3 years ago
def getLastLine(path, num, p=1):
6 years ago
pyVersion = sys.version_info[0]
try:
4 years ago
import html
6 years ago
if not os.path.exists(path):
return ""
start_line = (p - 1) * num
count = start_line + num
fp = open(path, 'rb')
buf = ""
3 years ago
fp.seek(0, 2)
6 years ago
if fp.read(1) == "\n":
3 years ago
fp.seek(0, 2)
6 years ago
data = []
b = True
n = 0
3 years ago
6 years ago
for i in range(count):
while True:
newline_pos = str.rfind(str(buf), "\n")
pos = fp.tell()
if newline_pos != -1:
if n >= start_line:
line = buf[newline_pos + 1:]
try:
4 years ago
data.insert(0, html.escape(line))
except Exception as e:
6 years ago
pass
buf = buf[:newline_pos]
n += 1
break
else:
if pos == 0:
b = False
break
to_read = min(4096, pos)
fp.seek(-to_read, 1)
t_buf = fp.read(to_read)
if pyVersion == 3:
if type(t_buf) == bytes:
3 years ago
t_buf = t_buf.decode("utf-8", "ignore").strip()
6 years ago
buf = t_buf + buf
fp.seek(-to_read, 1)
if pos - to_read == 0:
buf = "\n" + buf
if not b:
break
fp.close()
except Exception as e:
3 years ago
return str(e)
6 years ago
return "\n".join(data)
7 years ago
def downloadFile(url, filename):
import urllib
urllib.urlretrieve(url, filename=filename, reporthook=downloadHook)
def downloadHook(count, blockSize, totalSize):
speed = {'total': totalSize, 'block': blockSize, 'count': count}
4 years ago
print('%02d%%' % (100.0 * count * blockSize / totalSize))
7 years ago
3 years ago
def getLocalIpBack():
7 years ago
# 取本地外网IP
try:
import re
filename = 'data/iplist.txt'
ipaddress = readFile(filename)
3 years ago
if not ipaddress or ipaddress == '127.0.0.1':
import urllib
7 years ago
url = 'http://pv.sohu.com/cityjson?ie=utf-8'
3 years ago
req = urllib.request.urlopen(url, timeout=10)
content = req.read().decode('utf-8')
1 year ago
ipaddress = re.search('\\d+.\\d+.\\d+.\\d+', content).group(0)
7 years ago
writeFile(filename, ipaddress)
1 year ago
ipaddress = re.search('\\d+.\\d+.\\d+.\\d+', ipaddress).group(0)
7 years ago
return ipaddress
3 years ago
except Exception as ex:
# print(ex)
7 years ago
return '127.0.0.1'
7 years ago
3 years ago
def getClientIp():
from flask import request
return request.remote_addr.replace('::ffff:', '')
3 years ago
def getLocalIp():
filename = 'data/iplist.txt'
3 years ago
try:
ipaddress = readFile(filename)
if not ipaddress or ipaddress == '127.0.0.1':
2 years ago
cmd = "curl --insecure -4 -sS --connect-timeout 5 -m 60 https://v6r.ipip.net/?format=text"
ip = execShell(cmd)
result = ip[0].strip()
if result == '':
raise Exception("ipv4 is empty!")
writeFile(filename, result)
return result
3 years ago
return ipaddress
except Exception as e:
2 years ago
cmd = "curl --insecure -6 -sS --connect-timeout 5 -m 60 https://v6r.ipip.net/?format=text"
ip = execShell(cmd)
result = ip[0].strip()
if result == '':
return '127.0.0.1'
writeFile(filename, result)
return result
finally:
pass
return '127.0.0.1'
3 years ago
7 years ago
def inArray(arrays, searchStr):
# 搜索数据中是否存在
for key in arrays:
if key == searchStr:
return True
return False
3 years ago
def formatDate(format="%Y-%m-%d %H:%M:%S", times=None):
# 格式化指定时间戳
if not times:
times = int(time.time())
time_local = time.localtime(times)
return time.strftime(format, time_local)
def strfToTime(sdate):
# 转换时间
import time
return time.strftime('%Y-%m-%d', time.strptime(sdate, '%b %d %H:%M:%S %Y %Z'))
7 years ago
def checkIp(ip):
# 检查是否为IPv4地址
import re
p = re.compile(
1 year ago
'^((25[0-5]|2[0-4]\\d|[01]?\\d\\d?)\\.){3}(25[0-5]|2[0-4]\\d|[01]?\\d\\d?)$')
7 years ago
if p.match(ip):
return True
else:
return False
2 years ago
def getHost(port=False):
from flask import request
host_tmp = request.headers.get('host')
if not host_tmp:
if request.url_root:
tmp = re.findall(r"(https|http)://([\w:\.-]+)", request.url_root)
if tmp:
host_tmp = tmp[0][1]
if not host_tmp:
host_tmp = getLocalIp() + ':' + readFile('data/port.pl').strip()
try:
if host_tmp.find(':') == -1:
host_tmp += ':80'
except:
host_tmp = "127.0.0.1:8888"
h = host_tmp.split(':')
if port:
return h[-1]
return ':'.join(h[0:-1])
def getClientIp():
from flask import request
return request.remote_addr.replace('::ffff:', '')
def checkDomainPanel():
12 months ago
tmp = getHost()
2 years ago
domain = readFile('data/bind_domain.pl')
port = readFile('data/port.pl').strip()
2 years ago
12 months ago
scheme = 'http'
12 months ago
12 months ago
choose_file = getRunDir()+'/ssl/choose.pl'
12 months ago
if os.path.exists(choose_file):
choose = readFile(choose_file).strip()
12 months ago
if not inArray(['local','nginx'], choose):
12 months ago
return False
12 months ago
else:
return False
12 months ago
12 months ago
local_ssl = getRunDir()+'/ssl/local'
12 months ago
if choose == 'local':
scheme = 'https'
if choose == 'nginx':
# print(port)
npid = getServerDir() + "/openresty/nginx/logs/nginx.pid"
if not os.path.exists(npid):
return False
nconf = getServerDir() + "/web_conf/nginx/vhost/panel.conf"
if os.path.exists(nconf):
port = "80"
12 months ago
if not domain:
return False
12 months ago
12 months ago
client_ip = getClientIp()
if client_ip in ['127.0.0.1', 'localhost', '::1']:
return False
2 years ago
12 months ago
if tmp.strip().lower() != domain.strip().lower():
from flask import Flask, redirect, request, url_for
to = scheme + "://" + domain + ":" + str(port)
# print(to)
return redirect(to, code=302)
12 months ago
2 years ago
return False
12 months ago
3 years ago
def createLinuxUser(user, group):
execShell("groupadd {}".format(group))
execShell('useradd -s /sbin/nologin -g {} {}'.format(user, group))
return True
def setOwn(filename, user, group=None):
if isAppleSystem():
return True
# 设置用户组
if not os.path.exists(filename):
return False
from pwd import getpwnam
try:
user_info = getpwnam(user)
user = user_info.pw_uid
if group:
user_info = getpwnam(group)
group = user_info.pw_gid
except:
if user == 'www':
createLinuxUser(user)
# 如果指定用户或组不存在,则使用www
try:
user_info = getpwnam('www')
except:
createLinuxUser(user)
user_info = getpwnam('www')
user = user_info.pw_uid
group = user_info.pw_gid
os.chown(filename, user, group)
return True
2 years ago
def setMode(filename, mode):
# 设置文件权限
if not os.path.exists(filename):
return False
mode = int(str(mode), 8)
os.chmod(filename, mode)
return True
7 years ago
def checkPort(port):
# 检查端口是否合法
2 years ago
ports = ['21', '443', '888']
7 years ago
if port in ports:
return False
intport = int(port)
if intport < 1 or intport > 65535:
return False
return True
def getStrBetween(startStr, endStr, srcStr):
# 字符串取中间
start = srcStr.find(startStr)
if start == -1:
return None
end = srcStr.find(endStr)
if end == -1:
return None
return srcStr[start + 1:end]
def getCpuType():
cpuType = ''
if isAppleSystem():
cmd = "system_profiler SPHardwareDataType | grep 'Processor Name' | awk -F ':' '{print $2}'"
cpuinfo = execShell(cmd)
return cpuinfo[0].strip()
2 years ago
current_os = getOs()
if current_os.startswith('freebsd'):
2 years ago
cmd = "sysctl -a | egrep -i 'hw.model' | awk -F ':' '{print $2}'"
cpuinfo = execShell(cmd)
return cpuinfo[0].strip()
7 years ago
# 取CPU类型
cpuinfo = open('/proc/cpuinfo', 'r').read()
1 year ago
rep = "model\\s+name\\s+:\\s+(.+)"
tmp = re.search(rep, cpuinfo, re.I)
7 years ago
if tmp:
cpuType = tmp.groups()[0]
else:
cpuinfo = execShell('LANG="en_US.UTF-8" && lscpu')[0]
1 year ago
rep = "Model\\s+name:\\s+(.+)"
tmp = re.search(rep, cpuinfo, re.I)
if tmp:
cpuType = tmp.groups()[0]
7 years ago
return cpuType
7 years ago
def isRestart():
7 years ago
# 检查是否允许重启
num = M('tasks').where('status!=?', ('1',)).count()
if num > 0:
return False
return True
7 years ago
def isUpdateLocalSoft():
num = M('tasks').where('status!=?', ('1',)).count()
7 years ago
if os.path.exists('mdserver-web.zip'):
7 years ago
return True
7 years ago
if num > 0:
data = M('tasks').where('status!=?', ('1',)).field(
'id,type,execstr').limit('1').select()
argv = data[0]['execstr'].split('|dl|')
if data[0]['type'] == 'download' and argv[1] == 'mdserver-web.zip':
return True
7 years ago
return False
7 years ago
def hasPwd(password):
# 加密密码字符
import crypt
return crypt.crypt(password, password)
6 years ago
def getTimeout(url):
7 years ago
start = time.time()
result = httpGet(url)
if result != 'True':
return False
return int((time.time() - start) * 1000)
6 years ago
def makeConf():
file = getRunDir() + '/data/json/config.json'
if not os.path.exists(file):
c = {}
11 months ago
c['title'] = '后羿面板'
6 years ago
c['home'] = 'http://github/midoks/mdserver-web'
c['recycle_bin'] = True
c['template'] = 'default'
writeFile(file, json.dumps(c))
return c
c = readFile(file)
return json.loads(c)
def getConfig(k):
c = makeConf()
return c[k]
def setConfig(k, v):
c = makeConf()
c[k] = v
file = getRunDir() + '/data/json/config.json'
return writeFile(file, json.dumps(c))
def getHostAddr():
if os.path.exists('data/iplist.txt'):
return readFile('data/iplist.txt').strip()
return '127.0.0.1'
def setHostAddr(addr):
file = getRunDir() + '/data/iplist.txt'
return writeFile(file, addr)
def getHostPort():
if os.path.exists('data/port.pl'):
return readFile('data/port.pl').strip()
return '7200'
def setHostPort(port):
file = getRunDir() + '/data/port.pl'
return writeFile(file, port)
7 years ago
def auth_decode(data):
7 years ago
# 解密数据
7 years ago
token = GetToken()
# 是否有生成Token
if not token:
return returnMsg(False, 'REQUEST_ERR')
# 校验access_key是否正确
if token['access_key'] != data['btauth_key']:
return returnMsg(False, 'REQUEST_ERR')
# 解码数据
import binascii
import hashlib
import urllib
import hmac
import json
tdata = binascii.unhexlify(data['data'])
# 校验signature是否正确
signature = binascii.hexlify(
hmac.new(token['secret_key'], tdata, digestmod=hashlib.sha256).digest())
if signature != data['signature']:
return returnMsg(False, 'REQUEST_ERR')
# 返回
return json.loads(urllib.unquote(tdata))
# 数据加密
def auth_encode(data):
token = GetToken()
pdata = {}
# 是否有生成Token
if not token:
return returnMsg(False, 'REQUEST_ERR')
# 生成signature
import binascii
import hashlib
import urllib
import hmac
import json
tdata = urllib.quote(json.dumps(data))
# 公式 hex(hmac_sha256(data))
pdata['signature'] = binascii.hexlify(
hmac.new(token['secret_key'], tdata, digestmod=hashlib.sha256).digest())
# 加密数据
pdata['btauth_key'] = token['access_key']
pdata['data'] = binascii.hexlify(tdata)
pdata['timestamp'] = time.time()
# 返回
return pdata
def checkToken(get):
7 years ago
# 检查Token
7 years ago
tempFile = 'data/tempToken.json'
if not os.path.exists(tempFile):
return False
import json
import time
tempToken = json.loads(readFile(tempFile))
if time.time() > tempToken['timeout']:
return False
if get.token != tempToken['token']:
return False
return True
def checkInput(data):
# 过滤输入
if not data:
return data
if type(data) != str:
return data
checkList = [
{'d': '<', 'r': ''},
{'d': '>', 'r': ''},
{'d': '\'', 'r': ''},
{'d': '"', 'r': ''},
{'d': '&', 'r': ''},
{'d': '#', 'r': ''},
{'d': '<', 'r': ''}
]
for v in checkList:
data = data.replace(v['d'], v['r'])
return data
7 years ago
def checkCert(certPath='ssl/certificate.pem'):
7 years ago
# 验证证书
2 years ago
openssl = '/usr/bin/openssl'
if not os.path.exists(openssl):
openssl = '/usr/local/openssl/bin/openssl'
7 years ago
if not os.path.exists(openssl):
openssl = 'openssl'
certPem = readFile(certPath)
s = "\n-----BEGIN CERTIFICATE-----"
tmp = certPem.strip().split(s)
for tmp1 in tmp:
if tmp1.find('-----BEGIN CERTIFICATE-----') == -1:
tmp1 = s + tmp1
writeFile(certPath, tmp1)
7 years ago
result = execShell(openssl + " x509 -in " +
7 years ago
certPath + " -noout -subject")
if result[1].find('-bash:') != -1:
return True
if len(result[1]) > 2:
return False
if result[0].find('error:') != -1:
return False
return True
6 years ago
def sortFileList(path, ftype = 'mtime', sort = 'desc'):
flist = os.listdir(path)
if ftype == 'mtime':
if sort == 'desc':
flist = sorted(flist, key=lambda f: os.path.getmtime(os.path.join(path,f)), reverse=True)
if sort == 'asc':
flist = sorted(flist, key=lambda f: os.path.getmtime(os.path.join(path,f)), reverse=False)
if ftype == 'size':
if sort == 'desc':
flist = sorted(flist, key=lambda f: os.path.getsize(os.path.join(path,f)), reverse=True)
if sort == 'asc':
flist = sorted(flist, key=lambda f: os.path.getsize(os.path.join(path,f)), reverse=False)
1 year ago
if ftype == 'fname':
if sort == 'desc':
flist = sorted(flist, key=lambda f: os.path.join(path,f), reverse=True)
if sort == 'asc':
flist = sorted(flist, key=lambda f: os.path.join(path,f), reverse=False)
return flist
def sortAllFileList(path, ftype = 'mtime', sort = 'desc', search = '',limit = 3000):
count = 0
flist = []
for d_list in os.walk(path):
if count >= limit:
break
for d in d_list[1]:
if count >= limit:
break
if d.lower().find(search) != -1:
filename = d_list[0] + '/' + d
if not os.path.exists(filename):
continue
count += 1
flist.append(filename)
for f in d_list[2]:
if count >= limit:
break
if f.lower().find(search) != -1:
filename = d_list[0] + '/' + f
if not os.path.exists(filename):
continue
count += 1
flist.append(filename)
if ftype == 'mtime':
if sort == 'desc':
flist = sorted(flist, key=lambda f: os.path.getmtime(f), reverse=True)
if sort == 'asc':
flist = sorted(flist, key=lambda f: os.path.getmtime(f), reverse=False)
if ftype == 'size':
if sort == 'desc':
flist = sorted(flist, key=lambda f: os.path.getsize(f), reverse=True)
if sort == 'asc':
flist = sorted(flist, key=lambda f: os.path.getsize(f), reverse=False)
return flist
6 years ago
def getPathSize(path):
# 取文件或目录大小
if not os.path.exists(path):
return 0
if not os.path.isdir(path):
return os.path.getsize(path)
size_total = 0
for nf in os.walk(path):
for f in nf[2]:
filename = nf[0] + '/' + f
size_total += os.path.getsize(filename)
return size_total
7 years ago
7 years ago
def toSize(size):
7 years ago
# 字节单位转换
d = ('b', 'KB', 'MB', 'GB', 'TB')
s = d[0]
for b in d:
if size < 1024:
6 years ago
return str(round(size, 2)) + ' ' + b
6 years ago
size = float(size) / 1024.0
7 years ago
s = b
6 years ago
return str(round(size, 2)) + ' ' + b
7 years ago
2 years ago
def getPathSuffix(path):
return os.path.splitext(path)[-1]
7 years ago
def getMacAddress():
# 获取mac
import uuid
mac = uuid.UUID(int=uuid.getnode()).hex[-12:]
return ":".join([mac[e:e + 2] for e in range(0, 11, 2)])
7 years ago
def get_string(t):
if t != -1:
max = 126
m_types = [{'m': 122, 'n': 97}, {'m': 90, 'n': 65}, {'m': 57, 'n': 48}, {
'm': 47, 'n': 32}, {'m': 64, 'n': 58}, {'m': 96, 'n': 91}, {'m': 125, 'n': 123}]
else:
max = 256
t = 0
m_types = [{'m': 255, 'n': 0}]
arr = []
for i in range(max):
if i < m_types[t]['n'] or i > m_types[t]['m']:
continue
arr.append(chr(i))
return arr
def get_string_find(t):
if type(t) != list:
t = [t]
return_str = ''
for s1 in t:
return_str += get_string(int(s1[0]))[int(s1[1:])]
return return_str
def get_string_arr(t):
s_arr = {}
t_arr = []
for s1 in t:
for i in range(6):
if not i in s_arr:
s_arr[i] = get_string(i)
for j in range(len(s_arr[i])):
if s1 == s_arr[i][j]:
t_arr.append(str(i) + str(j))
return t_arr
6 years ago
# 转换时间
def strfDate(sdate):
return time.strftime('%Y-%m-%d', time.strptime(sdate, '%Y%m%d%H%M%S'))
# 获取证书名称
def getCertName(certPath):
if not os.path.exists(certPath):
return None
try:
import OpenSSL
result = {}
x509 = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, readFile(certPath))
# 取产品名称
issuer = x509.get_issuer()
result['issuer'] = ''
if hasattr(issuer, 'CN'):
result['issuer'] = issuer.CN
if not result['issuer']:
is_key = [b'0', '0']
issue_comp = issuer.get_components()
if len(issue_comp) == 1:
is_key = [b'CN', 'CN']
for iss in issue_comp:
if iss[0] in is_key:
result['issuer'] = iss[1].decode()
break
if not result['issuer']:
if hasattr(issuer, 'O'):
result['issuer'] = issuer.O
# 取到期时间
result['notAfter'] = strfDate(
bytes.decode(x509.get_notAfter())[:-1])
# 取申请时间
result['notBefore'] = strfDate(
bytes.decode(x509.get_notBefore())[:-1])
# 取可选名称
result['dns'] = []
for i in range(x509.get_extension_count()):
s_name = x509.get_extension(i)
if s_name.get_short_name() in [b'subjectAltName', 'subjectAltName']:
s_dns = str(s_name).split(',')
for d in s_dns:
result['dns'].append(d.split(':')[1])
subject = x509.get_subject().get_components()
# 取主要认证名称
if len(subject) == 1:
result['subject'] = subject[0][1].decode()
else:
if not result['dns']:
for sub in subject:
if sub[0] == b'CN':
result['subject'] = sub[1].decode()
break
if 'subject' in result:
result['dns'].append(result['subject'])
else:
result['subject'] = result['dns'][0]
result['endtime'] = int(int(time.mktime(time.strptime(
result['notAfter'], "%Y-%m-%d")) - time.time()) / 86400)
return result
except Exception as e:
2 years ago
writeFileLog(getTracebackInfo())
return None
12 months ago
def createLocalSSL():
if not os.path.exists('ssl/local'):
execShell('mkdir -p ssl/local')
# 自签证书
12 months ago
# if os.path.exists('ssl/local/input.pl'):
# return True
client_ip = getClientIp()
import OpenSSL
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
cert = OpenSSL.crypto.X509()
cert.set_serial_number(0)
12 months ago
if client_ip == '127.0.0.1':
cert.get_subject().CN = '127.0.0.1'
else:
cert.get_subject().CN = getLocalIp()
cert.set_issuer(cert.get_subject())
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(86400 * 3650)
cert.set_pubkey(key)
cert.sign(key, 'md5')
12 months ago
cert_ca = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
private_key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
if len(cert_ca) > 100 and len(private_key) > 100:
12 months ago
writeFile('ssl/local/cert.pem', cert_ca, 'wb+')
writeFile('ssl/local/private.pem', private_key, 'wb+')
return True
return False
6 years ago
def getSSHPort():
try:
file = '/etc/ssh/sshd_config'
4 years ago
conf = readFile(file)
1 year ago
rep = "(#*)?Port\\s+([0-9]+)\\s*\n"
2 years ago
port = re.search(rep, conf).groups(0)[1]
6 years ago
return int(port)
except:
return 22
def getSSHStatus():
if os.path.exists('/usr/bin/apt-get'):
status = execShell("service ssh status | grep -P '(dead|stop)'")
else:
import system_api
version = system_api.system_api().getSystemVersion()
if version.find(' Mac ') != -1:
return True
if version.find(' 7.') != -1:
status = execShell("systemctl status sshd.service | grep 'dead'")
else:
status = execShell(
"/etc/init.d/sshd status | grep -e 'stopped' -e '已停'")
if len(status[0]) > 3:
status = False
else:
status = True
return status
3 years ago
def requestFcgiPHP(sock, uri, document_root='/tmp', method='GET', pdata=b''):
# 直接请求到PHP-FPM
# version php版本
# uri 请求uri
# filename 要执行的php文件
# args 请求参数
# method 请求方式
sys.path.append(os.getcwd() + "/class/plugin")
import fpm
p = fpm.fpm(sock, document_root)
if type(pdata) == dict:
pdata = url_encode(pdata)
result = p.load_url_public(uri, pdata, method)
return result
3 years ago
def getMyORM():
'''
获取MySQL资源的ORM
'''
sys.path.append(os.getcwd() + "/class/plugin")
import orm
o = orm.ORM()
return o
3 years ago
def getMyORMDb():
'''
获取MySQL资源的ORM pip install mysqlclient==2.0.3 | pip install mysql-python
'''
sys.path.append(os.getcwd() + "/class/plugin")
import ormDb
o = ormDb.ORM()
return o
2 years ago
##################### notify start #########################################
def initNotifyConfig():
p = getNotifyPath()
if not os.path.exists(p):
writeFile(p, '{}')
return True
def getNotifyPath():
path = 'data/notify.json'
return path
def getNotifyData(is_parse=False):
initNotifyConfig()
notify_file = getNotifyPath()
notify_data = readFile(notify_file)
data = json.loads(notify_data)
if is_parse:
tag_list = ['tgbot', 'email']
for t in tag_list:
if t in data and 'cfg' in data[t]:
data[t]['data'] = json.loads(deDoubleCrypt(t, data[t]['cfg']))
return data
def writeNotify(data):
p = getNotifyPath()
return writeFile(p, json.dumps(data))
2 years ago
def tgbotNotifyChatID():
data = getNotifyData(True)
if 'tgbot' in data and 'enable' in data['tgbot']:
if data['tgbot']['enable']:
t = data['tgbot']['data']
return t['chat_id']
return ''
2 years ago
2 years ago
def tgbotNotifyObject():
data = getNotifyData(True)
if 'tgbot' in data and 'enable' in data['tgbot']:
if data['tgbot']['enable']:
t = data['tgbot']['data']
import telebot
bot = telebot.TeleBot(app_token)
return True, bot
return False, None
2 years ago
def tgbotNotifyMessage(app_token, chat_id, msg):
import telebot
bot = telebot.TeleBot(app_token)
try:
2 years ago
data = bot.send_message(chat_id, msg)
return True
except Exception as e:
2 years ago
writeFileLog(str(e))
return False
2 years ago
def tgbotNotifyHttpPost(app_token, chat_id, msg):
2 years ago
try:
url = 'https://api.telegram.org/bot' + app_token + '/sendMessage'
post_data = {
'chat_id': chat_id,
'text': msg,
}
rdata = httpPost(url, post_data)
return True
except Exception as e:
writeFileLog(str(e))
return False
2 years ago
2 years ago
def tgbotNotifyTest(app_token, chat_id):
msg = 'MW-通知验证测试OK'
return tgbotNotifyHttpPost(app_token, chat_id, msg)
def emailNotifyMessage(data):
'''
邮件通知
'''
sys.path.append(os.getcwd() + "/class/plugin")
import memail
try:
if data['smtp_ssl'] == 'ssl':
memail.sendSSL(data['smtp_host'], data['smtp_port'],
data['username'], data['password'],
data['to_mail_addr'], data['subject'], data['content'])
else:
memail.send(data['smtp_host'], data['smtp_port'],
data['username'], data['password'],
data['to_mail_addr'], data['subject'], data['content'])
return True
except Exception as e:
print(getTracebackInfo())
return False
def emailNotifyTest(data):
2 years ago
# print(data)
data['subject'] = 'MW通知测试'
data['content'] = data['mail_test']
return emailNotifyMessage(data)
2 years ago
def notifyMessageTry(msg, stype='common', trigger_time=300, is_write_log=True):
lock_file = getPanelTmp() + '/notify_lock.json'
if not os.path.exists(lock_file):
writeFile(lock_file, '{}')
lock_data = json.loads(readFile(lock_file))
if stype in lock_data:
diff_time = time.time() - lock_data[stype]['do_time']
if diff_time >= trigger_time:
lock_data[stype]['do_time'] = time.time()
else:
return False
else:
lock_data[stype] = {'do_time': time.time()}
writeFile(lock_file, json.dumps(lock_data))
if is_write_log:
writeLog("通知管理[" + stype + "]", msg)
2 years ago
data = getNotifyData(True)
2 years ago
# tag_list = ['tgbot', 'email']
# tagbot
do_notify = False
if 'tgbot' in data and 'enable' in data['tgbot']:
if data['tgbot']['enable']:
t = data['tgbot']['data']
2 years ago
i = sys.version_info
# telebot 在python小于3.7无法使用
if i[0] < 3 or i[1] < 7:
do_notify = tgbotNotifyHttpPost(
t['app_token'], t['chat_id'], msg)
else:
do_notify = tgbotNotifyMessage(
t['app_token'], t['chat_id'], msg)
if 'email' in data and 'enable' in data['email']:
if data['email']['enable']:
t = data['email']['data']
t['subject'] = 'MW通知'
t['content'] = msg
do_notify = emailNotifyMessage(t)
2 years ago
return do_notify
2 years ago
def notifyMessage(msg, stype='common', trigger_time=300, is_write_log=True):
try:
return notifyMessageTry(msg, stype, trigger_time, is_write_log)
except Exception as e:
2 years ago
writeFileLog(getTracebackInfo())
2 years ago
return False
##################### notify end #########################################
2 years ago
11 months ago
def getGlibcVersion():
try:
cmd_result = execShell("ldd --version")[0]
if not cmd_result: return ''
glibc_version = cmd_result.split("\n")[0].split()[-1]
except:
return ''
return glibc_version
2 years ago
##################### ssh start #########################################
def getSshDir():
if isAppleSystem():
user = execShell("who | sed -n '2, 1p' |awk '{print $1}'")[0].strip()
return '/Users/' + user + '/.ssh'
return '/root/.ssh'
2 years ago
def processExists(pname, exe=None, cmdline=None):
# 进程是否存在
try:
import psutil
pids = psutil.pids()
for pid in pids:
try:
p = psutil.Process(pid)
if p.name() == pname:
if not exe and not cmdline:
return True
else:
if exe:
if p.exe() == exe:
return True
if cmdline:
if cmdline in p.cmdline():
return True
except:
pass
return False
except:
return True
2 years ago
def createRsa():
2 years ago
# ssh-keygen -t rsa -P "" -C "midoks@163.com"
2 years ago
ssh_dir = getSshDir()
# mw.execShell("rm -f /root/.ssh/*")
if not os.path.exists(ssh_dir + '/authorized_keys'):
execShell('touch ' + ssh_dir + '/authorized_keys')
if not os.path.exists(ssh_dir + '/id_rsa.pub') and os.path.exists(ssh_dir + '/id_rsa'):
execShell('echo y | ssh-keygen -q -t rsa -P "" -f ' +
ssh_dir + '/id_rsa')
else:
execShell('ssh-keygen -q -t rsa -P "" -f ' + ssh_dir + '/id_rsa')
execShell('cat ' + ssh_dir + '/id_rsa.pub >> ' +
ssh_dir + '/authorized_keys')
execShell('chmod 600 ' + ssh_dir + '/authorized_keys')
def createSshInfo():
ssh_dir = getSshDir()
if not os.path.exists(ssh_dir + '/id_rsa') or not os.path.exists(ssh_dir + '/id_rsa.pub'):
createRsa()
# 检查是否写入authorized_keys
data = execShell("cat " + ssh_dir + "/id_rsa.pub | awk '{print $3}'")
if data[0] != "":
cmd = "cat " + ssh_dir + "/authorized_keys | grep " + data[0]
ak_data = execShell(cmd)
if ak_data[0] == "":
cmd = 'cat ' + ssh_dir + '/id_rsa.pub >> ' + ssh_dir + '/authorized_keys'
execShell(cmd)
execShell('chmod 600 ' + ssh_dir + '/authorized_keys')
def connectSsh():
import paramiko
ssh = paramiko.SSHClient()
createSshInfo()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
port = getSSHPort()
try:
ssh.connect('127.0.0.1', port, timeout=5)
except Exception as e:
ssh.connect('localhost', port, timeout=5)
except Exception as e:
ssh.connect(getHostAddr(), port, timeout=30)
except Exception as e:
return False
shell = ssh.invoke_shell(term='xterm', width=83, height=21)
shell.setblocking(0)
return shell
def clearSsh():
# 服务器IP
ip = getHostAddr()
sh = '''
#!/bin/bash
PLIST=`who | grep localhost | awk '{print $2}'`
for i in $PLIST
do
ps -t /dev/$i |grep -v TTY | awk '{print $1}' | xargs kill -9
done
# getHostAddr
PLIST=`who | grep "${ip}" | awk '{print $2}'`
for i in $PLIST
do
ps -t /dev/$i |grep -v TTY | awk '{print $1}' | xargs kill -9
done
'''
if not isAppleSystem():
info = execShell(sh)
print(info[0], info[1])
##################### ssh end #########################################
2 years ago
# ---------------------------------------------------------------------------------
2 years ago
# 打印相关 START
2 years ago
# ---------------------------------------------------------------------------------
def echoStart(tag):
2 years ago
print("=" * 89)
2 years ago
print("★开始{}[{}]".format(tag, formatDate()))
2 years ago
print("=" * 89)
2 years ago
def echoEnd(tag):
2 years ago
print("=" * 89)
2 years ago
print("{}完成[{}]".format(tag, formatDate()))
2 years ago
print("=" * 89)
2 years ago
print("\n")
def echoInfo(msg):
print("|-{}".format(msg))
2 years ago
# ---------------------------------------------------------------------------------
# 打印相关 END
# ---------------------------------------------------------------------------------