# coding:utf-8 # --------------------------------------------------------------------------------- # MW-Linux面板 # --------------------------------------------------------------------------------- # copyright (c) 2018-∞(https://github.com/midoks/mdserver-web) All rights reserved. # --------------------------------------------------------------------------------- # Author: midoks # --------------------------------------------------------------------------------- # --------------------------------------------------------------------------------- # 核心方法库 # --------------------------------------------------------------------------------- import os import sys import time import string import json import hashlib import shlex import datetime import subprocess import glob import base64 import re from random import Random def execShell(cmdstring, cwd=None, timeout=None, shell=True): if shell: cmdstring_list = cmdstring else: cmdstring_list = shlex.split(cmdstring) if timeout: end_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout) sub = subprocess.Popen(cmdstring_list, cwd=cwd, stdin=subprocess.PIPE, shell=shell, bufsize=4096, stdout=subprocess.PIPE, stderr=subprocess.PIPE) while sub.poll() is None: time.sleep(0.1) if timeout: if end_time <= datetime.datetime.now(): raise Exception("Timeout:%s" % cmdstring) if sys.version_info[0] == 2: return sub.communicate() data = sub.communicate() # python3 fix 返回byte数据 if isinstance(data[0], bytes): t1 = str(data[0], encoding='utf-8') if isinstance(data[1], bytes): t2 = str(data[1], encoding='utf-8') return (t1, t2) def getTracebackInfo(): import traceback return traceback.format_exc() def getRunDir(): return os.getcwd() def getRootDir(): return os.path.dirname(getRunDir()) def getPanelDir(): return getRootDir() def getFatherDir(): return os.path.dirname(os.path.dirname(getPanelDir())) def getPluginDir(): return getPanelDir() + '/plugins' def getPanelDataDir(): return getPanelDir() + '/data' def getMWLogs(): return getPanelDir() + '/logs' def getPanelTmp(): return getPanelDir() + '/tmp' def getPanelTaskLog(): return getMWLogs() + '/panel_task.log' def getServerDir(): return getFatherDir() + '/server' def getLogsDir(): return getFatherDir() + '/wwwlogs' def getRecycleBinDir(): rb_dir = getFatherDir() + '/recycle_bin' if not os.path.exists(rb_dir): os.system('mkdir -p ' + rb_dir) return rb_dir def getRandomString(length): # 取随机字符串 rnd_str = '' chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789' chrlen = len(chars) - 1 random = Random() for i in range(length): rnd_str += chars[random.randint(0, chrlen)] return rnd_str def getUniqueId(): """ 根据时间生成唯一ID :return: """ current_time = datetime.datetime.now() str_time = current_time.strftime('%Y%m%d%H%M%S%f')[:-3] unique_id = "{0}".format(str_time) return unique_id def getDate(): # 取格式时间 import time return time.strftime('%Y-%m-%d %X', time.localtime()) def getDateFromNow(tf_format="%Y-%m-%d %H:%M:%S", time_zone="Asia/Shanghai"): # 取格式时间 import time os.environ['TZ'] = time_zone time.tzset() return time.strftime(tf_format, time.localtime()) def getDataFromInt(val): time_format = '%Y-%m-%d %H:%M:%S' time_str = time.localtime(val) return time.strftime(time_format, time_str) def getCommonFile(): # 统一默认配置文件 base_dir = getPanelDir()+'/' data = { 'debug' : base_dir+'data/debug.pl', # DEBUG文件 'close' : base_dir+'data/close.pl', # 识别关闭面板文件 'basic_auth' : base_dir+'data/basic_auth.json', # 面板Basic验证 'ipv6' : base_dir+'data/ipv6.pl', # ipv6识别文件 'bind_domain' : base_dir+'data/bind_domain.pl', # 面板域名绑定 'auth_secret': base_dir+'data/auth_secret.pl', # 二次验证密钥 'ssl': base_dir+'ssl/choose.pl', # ssl设置 } return data def toSize(size, middle='') -> str: """ 字节单位转换 """ units = ('b', 'KB', 'MB', 'GB', 'TB') s = units[0] for u in units: if size < 1024: return str(round(size, 2)) + middle + u size = float(size) / 1024.0 s = u return str(round(size, 2)) + middle + u def returnData(status, msg, data=None): if data == None: return {'status': status, 'msg': msg} return {'status': status, 'msg': msg, 'data': data} def returnJson(status, msg, data=None): return getJson({'status': status, 'msg': msg, 'data': data}) def readFile(filename): # 读文件内容 try: fp = open(filename, 'r') fBody = fp.read() fp.close() return fBody except Exception as e: # print(e) return False def writeFile(filename, content, mode='w+'): # 写文件内容 try: fp = open(filename, mode) fp.write(content) fp.close() return True except Exception as e: return False def triggerTask(): isTask = getPanelDir() + '/tmp/panelTask.pl' writeFile(isTask, 'True') def systemdCfgDir(): # ubuntu cfg_dir = '/lib/systemd/system' if os.path.exists(cfg_dir): return cfg_dir # debian,centos cfg_dir = '/usr/lib/systemd/system' if os.path.exists(cfg_dir): return cfg_dir # local test return "/tmp" def formatDate(format="%Y-%m-%d %H:%M:%S", times=None): # 格式化指定时间戳 if not times: times = int(time.time()) time_local = time.localtime(times) return time.strftime(format, time_local) def strfToTime(sdate): # 转换时间 import time return time.strftime('%Y-%m-%d', time.strptime(sdate, '%b %d %H:%M:%S %Y %Z')) def md5(content): # 生成MD5 try: m = hashlib.md5() m.update(content.encode("utf-8")) return m.hexdigest() except Exception as ex: return False def getFileMd5(filename): # 文件的MD5值 if not os.path.isfile(filename): return False myhash = hashlib.md5() f = file(filename, 'rb') while True: b = f.read(8096) if not b: break myhash.update(b) f.close() return myhash.hexdigest() def getClientIp(): from flask import request return request.remote_addr.replace('::ffff:', '') def getJson(data): import json return json.dumps(data) def getObjectByJson(data): import json return json.loads(data) def getSslCrt(): if os.path.exists('/etc/ssl/certs/ca-certificates.crt'): return '/etc/ssl/certs/ca-certificates.crt' if os.path.exists('/etc/pki/tls/certs/ca-bundle.crt'): return '/etc/pki/tls/certs/ca-bundle.crt' return '' def getOs(): return sys.platform def getOsName(): cmd = "cat /etc/*-release | grep PRETTY_NAME |awk -F = '{print $2}' | awk -F '\"' '{print $2}'| awk '{print $1}'" data = execShell(cmd) return data[0].strip().lower() def getOsID(): cmd = "cat /etc/*-release | grep VERSION_ID | awk -F = '{print $2}' | awk -F '\"' '{print $2}'" data = execShell(cmd) return data[0].strip() # 获取文件权限描述 def getFileStatsDesc( filename: str | None = None, path: str | None = None, ): # print(filename,path) filename = filename.replace('//', '/') try: stat = os.stat(filename) accept = str(oct(stat.st_mode)[-3:]) mtime = str(int(stat.st_mtime)) user = '' try: user = str(pwd.getpwuid(stat.st_uid).pw_name) except: user = str(stat.st_uid) size = str(stat.st_size) link = '' if os.path.islink(filename): link = ' -> ' + os.readlink(filename) if path: tmp_path = (path + '/').replace('//', '/') filename = filename.replace(tmp_path, '', 1) return filename + ';' + size + ';' + mtime + ';' + accept + ';' + user + ';' + link except Exception as e: return ';;;;;' def getFileSuffix(file): tmp = file.split('.') ext = tmp[len(tmp) - 1] return ext def getPathSuffix(path): return os.path.splitext(path)[-1] def getHostAddr(): ip_text = getPanelDataDir() + '/iplist.txt' if os.path.exists(ip_text): return readFile(ip_text).strip() return '127.0.0.1' def getSqitePrefix(): WIN = sys.platform.startswith('win') if WIN: # 如果是 Windows 系统,使用三个斜线 prefix = 'sqlite:///' else: # 否则使用四个斜线 prefix = 'sqlite:////' return prefix def getCpuType(): cpuType = '' if isAppleSystem(): cmd = "system_profiler SPHardwareDataType | grep 'Processor Name' | awk -F ':' '{print $2}'" cpuinfo = execShell(cmd) return cpuinfo[0].strip() current_os = getOs() if current_os.startswith('freebsd'): cmd = "sysctl -a | egrep -i 'hw.model' | awk -F ':' '{print $2}'" cpuinfo = execShell(cmd) return cpuinfo[0].strip() # 取CPU类型 cpuinfo = open('/proc/cpuinfo', 'r').read() rep = "model\\s+name\\s+:\\s+(.+)" tmp = re.search(rep, cpuinfo, re.I) if tmp: cpuType = tmp.groups()[0] else: cpuinfo = execShell('LANG="en_US.UTF-8" && lscpu')[0] rep = "Model\\s+name:\\s+(.+)" tmp = re.search(rep, cpuinfo, re.I) if tmp: cpuType = tmp.groups()[0] return cpuType def getInfo(msg, args=()): # 取提示消息 for i in range(len(args)): rep = '{' + str(i + 1) + '}' msg = msg.replace(rep, args[i]) return msg def getLastLine(path, num, p=1): pyVersion = sys.version_info[0] try: import html if not os.path.exists(path): return "" start_line = (p - 1) * num count = start_line + num fp = open(path, 'rb') buf = "" fp.seek(0, 2) if fp.read(1) == "\n": fp.seek(0, 2) data = [] b = True n = 0 for i in range(count): while True: newline_pos = str.rfind(str(buf), "\n") pos = fp.tell() if newline_pos != -1: if n >= start_line: line = buf[newline_pos + 1:] try: data.insert(0, html.escape(line)) except Exception as e: pass buf = buf[:newline_pos] n += 1 break else: if pos == 0: b = False break to_read = min(4096, pos) fp.seek(-to_read, 1) t_buf = fp.read(to_read) if pyVersion == 3: if type(t_buf) == bytes: t_buf = t_buf.decode("utf-8", "ignore").strip() buf = t_buf + buf fp.seek(-to_read, 1) if pos - to_read == 0: buf = "\n" + buf if not b: break fp.close() except Exception as e: return str(e) return "\n".join(data) # 获取系统温度 def getSystemDeviceTemperature(): import psutil if not hasattr(psutil, "sensors_temperatures"): return False, "platform not supported" temps = psutil.sensors_temperatures() if not temps: return False, "can't read any temperature" for name, entries in temps.items(): for entry in entries: return True, entry.label # print("%-20s %s °C (high = %s °C, critical = %s °C)" % ( # entry.label or name, entry.current, entry.high, # entry.critical)) return False, "" def getPage(args, result='1,2,3,4,5,8'): data = getPageObject(args, result) return data[0] def getPageObject(args, result='1,2,3,4,5,8'): # 取分页 from utils import page # 实例化分页类 page = page.Page() info = {} info['count'] = 0 if 'count' in args: info['count'] = int(args['count']) info['row'] = 10 if 'row' in args: info['row'] = int(args['row']) info['p'] = 1 if 'p' in args: info['p'] = int(args['p']) info['uri'] = {} info['return_js'] = '' if 'tojs' in args: info['return_js'] = args['tojs'] if 'args_tpl' in args: info['args_tpl'] = args['args_tpl'] return (page.GetPage(info, result), page) def getHostPort(): port_file = getPanelDir() + '/data/port.pl' if os.path.exists(port_file): return readFile(port_file).strip() return '7200' def setHostPort(port): file = getPanelDir() + '/data/port.pl' return writeFile(file, port) def isAppleSystem(): if getOs() == 'darwin': return True return False def isDocker(): return os.path.exists('/.dockerenv') def isSupportSystemctl(): if isAppleSystem(): return False if isDocker(): return False current_os = getOs() if current_os.startswith("freebsd"): return False return True def isDebugMode(): if isAppleSystem(): return True debug = M('option').field('name').where('name=?',('debug',)).getField('value') if debug == 'open': return True return False def isNumber(s): try: float(s) return True except ValueError: pass try: import unicodedata unicodedata.numeric(s) return True except (TypeError, ValueError): pass return False def isRestart(): # 检查是否允许重启 num = M('tasks').where('status!=?', ('1',)).count() if num > 0: return False return True # 检查端口是否占用 def isOpenPort(port): import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.connect(('127.0.0.1', int(port))) s.shutdown(2) return True except Exception as e: return False def debugLog(*data): if isDebugMode(): print(data) return True def writeLog(stype, msg, args=()): # 写日志 uid = 0 try: from flask import session if 'uid' in session: uid = session['uid'] except Exception as e: print("writeLog:"+str(e)) # pass # writeFileLog(getTracebackInfo()) return writeDbLog(stype, msg, args, uid) def writeDbLog(stype, msg, args=(), uid=1): try: from admin import model format_msg = getInfo(msg, args) model.addLog(stype, format_msg, uid) return True except Exception as e: print("writeDbLog:"+str(e)) return False def M(table=''): import core.db as db sql = db.Sql() if table == '': return sql return sql.table(table) def enDoubleCrypt(key, strings): # 加密字符串 try: import base64 _key = md5(key).encode('utf-8') _key = base64.urlsafe_b64encode(_key) if type(strings) != bytes: strings = strings.encode('utf-8') import cryptography from cryptography.fernet import Fernet f = Fernet(_key) result = f.encrypt(strings) return result.decode('utf-8') except: writeFileLog(getTracebackInfo()) return strings def deDoubleCrypt(key, strings): # 解密字符串 try: import base64 _key = md5(key).encode('utf-8') _key = base64.urlsafe_b64encode(_key) if type(strings) != bytes: strings = strings.encode('utf-8') from cryptography.fernet import Fernet f = Fernet(_key) result = f.decrypt(strings).decode('utf-8') return result except: writeFileLog(getTracebackInfo()) return strings # ------------------------------ openresty start ----------------------------- def restartMw(): restart_file = getPanelDir()+'/data/restart.pl' writeFile(restart_file, 'True') return True def restartWeb(): return opWeb("reload") def isInstalledWeb(): path = getServerDir() + '/openresty/nginx/sbin/nginx' if os.path.exists(path): return True return False def opWeb(method): if not isInstalledWeb(): return False # systemd systemd = '/lib/systemd/system/openresty.service' if os.path.exists(systemd): execShell('systemctl ' + method + ' openresty') return True # initd initd = getServerDir() + '/openresty/init.d/openresty' if os.path.exists(initd): execShell(initd + ' ' + method) return True return False def opLuaMake(cmd_name): path = getServerDir() + '/web_conf/nginx/lua/lua.conf' root_dir = getServerDir() + '/web_conf/nginx/lua/' + cmd_name dst_path = getServerDir() + '/web_conf/nginx/lua/' + cmd_name + '.lua' def_path = getServerDir() + '/web_conf/nginx/lua/empty.lua' if not os.path.exists(root_dir): execShell('mkdir -p ' + root_dir) files = [] for fl in os.listdir(root_dir): suffix = getFileSuffix(fl) if suffix != 'lua': continue flpath = os.path.join(root_dir, fl) files.append(flpath) if len(files) > 0: def_path = dst_path content = '' for f in files: t = readFile(f) f_base = os.path.basename(f) content += '-- ' + '*' * 20 + ' ' + f_base + ' start ' + '*' * 20 + "\n" content += t content += "\n" + '-- ' + '*' * 20 + ' ' + f_base + ' end ' + '*' * 20 + "\n" writeFile(dst_path, content) else: if os.path.exists(dst_path): os.remove(dst_path) conf = readFile(path) conf = re.sub(cmd_name + ' (.*);', cmd_name + " " + def_path + ";", conf) writeFile(path, conf) def opLuaInitFile(): opLuaMake('init_by_lua_file') def opLuaInitWorkerFile(): opLuaMake('init_worker_by_lua_file') def opLuaInitAccessFile(): opLuaMake('access_by_lua_file') def opLuaMakeAll(): opLuaInitFile() opLuaInitWorkerFile() opLuaInitAccessFile() # ------------------------------ openresty end ----------------------------- # --------------------------------------------------------------------------------- # 数据库 START # --------------------------------------------------------------------------------- def getMyORM(): ''' 获取MySQL资源的ORM ''' import core.orm as orm o = orm.ORM() return o # --------------------------------------------------------------------------------- # 数据库 START # --------------------------------------------------------------------------------- # --------------------------------------------------------------------------------- # 打印相关 START # --------------------------------------------------------------------------------- def echoStart(tag): print("=" * 89) print("★开始{}[{}]".format(tag, formatDate())) print("=" * 89) def echoEnd(tag): print("=" * 89) print("☆{}完成[{}]".format(tag, formatDate())) print("=" * 89) print("\n") def echoInfo(msg): print("|-{}".format(msg)) # --------------------------------------------------------------------------------- # 打印相关 END # ---------------------------------------------------------------------------------