pull/632/head
Mr Chen 6 months ago
parent 3ae8f0253e
commit 06abfd92f2
  1. 19
      class/core/mw.py
  2. 163
      panel_task.py
  3. 71
      web/core/mw.py
  4. 2
      web/utils/crontab.py
  5. 408
      web/utils/php/fcgi_client.py
  6. 209
      web/utils/php/fpm.py

@ -602,7 +602,7 @@ def writeLog(stype, msg, args=()):
def writeFileLog(msg, path=None, limit_size=50 * 1024 * 1024, save_limit=3): def writeFileLog(msg, path=None, limit_size=50 * 1024 * 1024, save_limit=3):
log_file = getServerDir() + '/mdserver-web/logs/debug.log' log_file = getPanelDir() + '/logs/debug.log'
if path != None: if path != None:
log_file = path log_file = path
@ -632,14 +632,15 @@ def writeFileLog(msg, path=None, limit_size=50 * 1024 * 1024, save_limit=3):
def writeDbLog(stype, msg, args=(), uid=1): def writeDbLog(stype, msg, args=(), uid=1):
try: try:
import time add_time = formatDate()
import db format_msg = getInfo(msg, args)
import json add_data = {
sql = db.Sql() 'type':stype,
mdate = time.strftime('%Y-%m-%d %X', time.localtime()) 'msg':format_msg,
wmsg = getInfo(msg, args) 'uid':uid,
data = (stype, wmsg, uid, mdate) 'add_time':add_time,
result = sql.table('logs').add('type,log,uid,addtime', data) }
result = M('logs').insert(add_data)
return True return True
except Exception as e: except Exception as e:
return False return False

@ -27,16 +27,10 @@ sys.path.append(web_dir)
from admin import app from admin import app
from admin import model from admin import model
import thisdb
import core.mw as mw import core.mw as mw
import core.db as db import core.db as db
global pre, timeoutCount, logPath, isTask, oldEdate, isCheck
pre = 0
timeoutCount = 0
isCheck = 0
oldEdate = None
g_log_file = mw.getPanelTaskLog() g_log_file = mw.getPanelTaskLog()
isTask = mw.getMWLogs() + '/panelTask.pl' isTask = mw.getMWLogs() + '/panelTask.pl'
@ -44,30 +38,8 @@ if not os.path.exists(g_log_file):
os.system("touch " + g_log_file) os.system("touch " + g_log_file)
def execShell(cmdstring, cwd=None, timeout=None, shell=True): def execShell(cmdstring, cwd=None, timeout=None, shell=True):
try: cmd = cmdstring + ' > ' + g_log_file + ' 2>&1'
global g_log_file return mw.execShell(cmd)
import shlex
import datetime
import subprocess
if timeout:
end_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
cmd = cmdstring + ' > ' + g_log_file + ' 2>&1'
sub = subprocess.Popen(cmd, cwd=cwd, stdin=subprocess.PIPE, shell=shell, bufsize=4096)
while sub.poll() is None:
time.sleep(0.1)
data = sub.communicate()
# python3 fix 返回byte数据
if isinstance(data[0], bytes):
t1 = str(data[0], encoding='utf-8')
if isinstance(data[1], bytes):
t2 = str(data[1], encoding='utf-8')
return (t1, t2)
except Exception as e:
return (None, None)
def service_cmd(method): def service_cmd(method):
cmd = '/etc/init.d/mw' cmd = '/etc/init.d/mw'
@ -76,37 +48,12 @@ def service_cmd(method):
return return
cmd = mw.getPanelDir() + '/scripts/init.d/mw' cmd = mw.getPanelDir() + '/scripts/init.d/mw'
print(cmd)
if os.path.exists(cmd): if os.path.exists(cmd):
print(cmd + ' ' + method) print(cmd + ' ' + method)
data = execShell(cmd + ' ' + method) data = execShell(cmd + ' ' + method)
print(data) print(data)
return return
def openresty_cmd(method = 'reload'):
# 检查是否安装
odir = mw.getServerDir() + '/openresty'
if not os.path.exists(odir):
return False
# systemd
systemd = mw.systemdCfgDir()+'/openresty.service'
if os.path.exists(systemd):
execShell('systemctl ' + method + ' openresty')
return True
sys_initd = '/etc/init.d/openresty'
if os.path.exists(sys_initd):
os.system(sys_initd + ' ' + method)
return True
install_initd = mw.getServerDir()+'/openresty/init.d/openresty'
if os.path.exists(install_initd):
os.system(install_initd + ' ' + method)
return True
return False
def mw_async(f): def mw_async(f):
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
thr = threading.Thread(target=f, args=args, kwargs=kwargs) thr = threading.Thread(target=f, args=args, kwargs=kwargs)
@ -225,8 +172,8 @@ def siteEdate():
def systemTask(): def systemTask():
from utils.system import monitor
# 系统监控任务 # 系统监控任务
from utils.system import monitor
try: try:
while True: while True:
monitor.instance().run() monitor.instance().run()
@ -240,10 +187,11 @@ def systemTask():
# 502错误检查线程 # 502错误检查线程
def check502Task(): def check502Task():
try: try:
check_file = mw.getPanelDir() + '/data/502Task.pl'
while True: while True:
if os.path.exists(mw.getPanelDir() + '/data/502Task.pl'): if os.path.exists(check_file):
check502() check502()
time.sleep(30) time.sleep(10)
except: except:
time.sleep(30) time.sleep(30)
check502Task() check502Task()
@ -257,8 +205,8 @@ def check502():
'82', '83', '84' '82', '83', '84'
] ]
for ver in verlist: for ver in verlist:
sdir = mw.getServerDir() server_dir = mw.getServerDir()
php_path = sdir + '/php/' + ver + '/sbin/php-fpm' php_path = server_dir + '/php/' + ver + '/sbin/php-fpm'
if not os.path.exists(php_path): if not os.path.exists(php_path):
continue continue
if checkPHPVersion(ver): if checkPHPVersion(ver):
@ -266,13 +214,14 @@ def check502():
if startPHPVersion(ver): if startPHPVersion(ver):
print('检测到PHP-' + ver + '处理异常,已自动修复!') print('检测到PHP-' + ver + '处理异常,已自动修复!')
mw.writeLog('PHP守护程序', '检测到PHP-' + ver + '处理异常,已自动修复!') mw.writeLog('PHP守护程序', '检测到PHP-' + ver + '处理异常,已自动修复!')
except Exception as e: except Exception as e:
print(str(e)) mw.writeLog('PHP守护程序', '自动修复异常:'+str(e))
# 处理指定PHP版本 # 处理指定PHP版本
def startPHPVersion(version): def startPHPVersion(version):
sdir = mw.getServerDir() server_dir = mw.getServerDir()
try: try:
# system # system
phpService = mw.systemdCfgDir() + '/php' + version + '.service' phpService = mw.systemdCfgDir() + '/php' + version + '.service'
@ -282,8 +231,8 @@ def startPHPVersion(version):
return True return True
# initd # initd
fpm = sdir + '/php/init.d/php' + version fpm = server_dir + '/php/init.d/php' + version
php_path = sdir + '/php/' + version + '/sbin/php-fpm' php_path = server_dir + '/php/' + version + '/sbin/php-fpm'
if not os.path.exists(php_path): if not os.path.exists(php_path):
if os.path.exists(fpm): if os.path.exists(fpm):
os.remove(fpm) os.remove(fpm)
@ -296,7 +245,6 @@ def startPHPVersion(version):
os.system(fpm + ' reload') os.system(fpm + ' reload')
if checkPHPVersion(version): if checkPHPVersion(version):
return True return True
# 尝试重启服务 # 尝试重启服务
cgi = '/tmp/php-cgi-' + version + '.sock' cgi = '/tmp/php-cgi-' + version + '.sock'
pid = sdir + '/php/' + version + '/var/run/php-fpm.pid' pid = sdir + '/php/' + version + '/var/run/php-fpm.pid'
@ -316,47 +264,18 @@ def startPHPVersion(version):
if os.path.exists(cgi): if os.path.exists(cgi):
return True return True
except Exception as e: except Exception as e:
print(str(e)) mw.writeLog('PHP守护程序', '自动修复异常:'+str(e))
return True return True
def getFpmConfFile(version):
return mw.getServerDir() + '/php/' + version + '/etc/php-fpm.d/www.conf'
def getFpmAddress(version):
fpm_address = '/tmp/php-cgi-{}.sock'.format(version)
php_fpm_file = getFpmConfFile(version)
try:
content = readFile(php_fpm_file)
tmp = re.findall(r"listen\s*=\s*(.+)", content)
if not tmp:
return fpm_address
if tmp[0].find('sock') != -1:
return fpm_address
if tmp[0].find(':') != -1:
listen_tmp = tmp[0].split(':')
if bind:
fpm_address = (listen_tmp[0], int(listen_tmp[1]))
else:
fpm_address = ('127.0.0.1', int(listen_tmp[1]))
else:
fpm_address = ('127.0.0.1', int(tmp[0]))
return fpm_address
except:
return fpm_address
def checkPHPVersion(version): def checkPHPVersion(version):
# 检查指定PHP版本 # 检查指定PHP版本
try: try:
sock = getFpmAddress(version) sock = mw.getFpmAddress(version)
data = mw.requestFcgiPHP(sock, '/phpfpm_status_' + version + '?json') data = mw.requestFcgiPHP(sock, '/phpfpm_status_' + version + '?json')
result = str(data, encoding='utf-8') result = str(data, encoding='utf-8')
except Exception as e: except Exception as e:
result = 'Bad Gateway' result = 'Bad Gateway'
# print(version,result)
# 检查openresty # 检查openresty
if result.find('Bad Gateway') != -1: if result.find('Bad Gateway') != -1:
return False return False
@ -366,24 +285,9 @@ def checkPHPVersion(version):
# 检查Web服务是否启动 # 检查Web服务是否启动
if result.find('Connection refused') != -1: if result.find('Connection refused') != -1:
return False return False
# global isTask
# if os.path.exists(isTask):
# isStatus = mw.readFile(isTask)
# if isStatus == 'True':
# return True
# # systemd
# systemd = mw.systemdCfgDir() + '/openresty.service'
# if os.path.exists(systemd):
# execShell('systemctl reload openresty')
# return True
# # initd
# initd = '/etc/init.d/openresty'
# if os.path.exists(initd):
# os.system(initd + ' reload')
return True return True
# --------------------------------------PHP监控 end--------------------------------------------- # # -------------------------------------- PHP监控 end --------------------------------------------- #
# --------------------------------------OpenResty Auto Restart Start --------------------------------------------- # # --------------------------------------OpenResty Auto Restart Start --------------------------------------------- #
@ -396,11 +300,10 @@ def openrestyAutoRestart():
if not os.path.exists(odir): if not os.path.exists(odir):
time.sleep(86400) time.sleep(86400)
continue continue
mw.opWeb('reload')
openresty_cmd('reload')
time.sleep(86400) time.sleep(86400)
except Exception as e: except Exception as e:
print(str(e)) mw.writeLog('OpenResty检测', '自动修复异常:'+str(e))
time.sleep(86400) time.sleep(86400)
# --------------------------------------OpenResty Auto Restart End --------------------------------------------- # # --------------------------------------OpenResty Auto Restart End --------------------------------------------- #
@ -409,11 +312,11 @@ def openrestyAutoRestart():
def openrestyRestartAtOnce(): def openrestyRestartAtOnce():
restart_nginx_tip = 'data/restart_nginx.pl' restart_nginx_tip = mw.getPanelDir()+'/data/restart_nginx.pl'
while True: while True:
if os.path.exists(restart_nginx_tip): if os.path.exists(restart_nginx_tip):
os.remove(restart_nginx_tip) os.remove(restart_nginx_tip)
openresty_cmd('reload') mw.opWeb('reload')
time.sleep(1) time.sleep(1)
# ----------------------------------- OpenResty Restart At Once End ------------------------------------------ # # ----------------------------------- OpenResty Restart At Once End ------------------------------------------ #
@ -424,7 +327,7 @@ def restartPanelService():
while True: while True:
if os.path.exists(restartTip): if os.path.exists(restartTip):
os.remove(restartTip) os.remove(restartTip)
service_cmd('restart_panel') mw.panelCmd('restart_panel')
time.sleep(1) time.sleep(1)
# --------------------------------------Panel Restart End --------------------------------------------- # # --------------------------------------Panel Restart End --------------------------------------------- #
@ -443,20 +346,20 @@ def run():
sysTask.start() sysTask.start()
# # PHP 502错误检查线程 # # PHP 502错误检查线程
# php502 = threading.Thread(target=check502Task) php502 = threading.Thread(target=check502Task)
# php502 = setDaemon(php502) php502 = setDaemon(php502)
# php502.start() php502.start()
# # OpenResty Restart At Once Start # OpenResty Restart At Once Start
# oraos = threading.Thread(target=openrestyRestartAtOnce) oraos = threading.Thread(target=openrestyRestartAtOnce)
# oraos = setDaemon(oraos) oraos = setDaemon(oraos)
# oraos.start() oraos.start()
# # OpenResty Auto Restart Start # OpenResty Auto Restart Start
# oar = threading.Thread(target=openrestyAutoRestart) oar = threading.Thread(target=openrestyAutoRestart)
# oar = setDaemon(oar) oar = setDaemon(oar)
# oar.start() oar.start()
# Panel Restart Start # Panel Restart Start

@ -679,6 +679,18 @@ def restartMw():
restart_file = getPanelDir()+'/data/restart.pl' restart_file = getPanelDir()+'/data/restart.pl'
writeFile(restart_file, 'True') writeFile(restart_file, 'True')
return True return True
def panelCmd(method):
cmd = '/etc/init.d/mw'
if os.path.exists(cmd):
execShell(cmd + ' ' + method)
return
cmd = mw.getPanelDir() + '/scripts/init.d/mw'
if os.path.exists(cmd):
data = execShell(cmd + ' ' + method)
return
# ------------------------------ panel end ----------------------------- # ------------------------------ panel end -----------------------------
# ------------------------------ openresty start ----------------------------- # ------------------------------ openresty start -----------------------------
@ -696,14 +708,19 @@ def opWeb(method):
return False return False
# systemd # systemd
systemd = '/lib/systemd/system/openresty.service' systemd = systemdCfgDir() + '/openresty.service'
if os.path.exists(systemd): if os.path.exists(systemd):
execShell('systemctl ' + method + ' openresty') execShell('systemctl ' + method + ' openresty')
return True return True
sys_initd = '/etc/init.d/openresty'
if os.path.exists(sys_initd):
os.system(sys_initd + ' ' + method)
return True
# initd # initd
initd = getServerDir() + '/openresty/init.d/openresty' initd = getServerDir() + '/openresty/init.d/openresty'
if os.path.exists(initd): if os.path.exists(initd):
execShell(initd + ' ' + method) execShell(initd + ' ' + method)
return True return True
@ -766,9 +783,59 @@ def opLuaMakeAll():
# ------------------------------ openresty end ----------------------------- # ------------------------------ openresty end -----------------------------
# ---------------------------------------------------------------------------------
# PHP START
# ---------------------------------------------------------------------------------
def getFpmConfFile(version):
return getServerDir() + '/php/' + version + '/etc/php-fpm.d/www.conf'
def getFpmAddress(version):
fpm_address = '/tmp/php-cgi-{}.sock'.format(version)
php_fpm_file = getFpmConfFile(version)
try:
content = readFile(php_fpm_file)
tmp = re.findall(r"listen\s*=\s*(.+)", content)
if not tmp:
return fpm_address
if tmp[0].find('sock') != -1:
return fpm_address
if tmp[0].find(':') != -1:
listen_tmp = tmp[0].split(':')
if bind:
fpm_address = (listen_tmp[0], int(listen_tmp[1]))
else:
fpm_address = ('127.0.0.1', int(listen_tmp[1]))
else:
fpm_address = ('127.0.0.1', int(tmp[0]))
return fpm_address
except:
return fpm_address
def requestFcgiPHP(sock, uri, document_root='/tmp', method='GET', pdata=b''):
# 直接请求到PHP-FPM
# version php版本
# uri 请求uri
# filename 要执行的php文件
# args 请求参数
# method 请求方式
import utils.php.fpm as fpm
p = fpm.fpm(sock, document_root)
if type(pdata) == dict:
pdata = url_encode(pdata)
result = p.load_url_public(uri, pdata, method)
return result
# ---------------------------------------------------------------------------------
# PHP END
# ---------------------------------------------------------------------------------
# --------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------
# 数据库 START # 数据库 START
# --------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------
def getMyORM(): def getMyORM():
''' '''
获取MySQL资源的ORM 获取MySQL资源的ORM

@ -7,3 +7,5 @@
# --------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------
# Author: midoks <midoks@163.com> # Author: midoks <midoks@163.com>
# --------------------------------------------------------------------------------- # ---------------------------------------------------------------------------------

@ -0,0 +1,408 @@
# Copyright (c) 2006 Allan Saddi <allan@saddi.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# $Id$
#
# Copyright (c) 2011 Vladimir Rusinov <vladimir@greenmice.info>
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import sys
import select
import struct
import socket
import errno
import types
__all__ = ['FCGIApp']
# Constants from the spec.
FCGI_LISTENSOCK_FILENO = 0
FCGI_HEADER_LEN = 8
FCGI_VERSION_1 = 1
FCGI_BEGIN_REQUEST = 1
FCGI_ABORT_REQUEST = 2
FCGI_END_REQUEST = 3
FCGI_PARAMS = 4
FCGI_STDIN = 5
FCGI_STDOUT = 6
FCGI_STDERR = 7
FCGI_DATA = 8
FCGI_GET_VALUES = 9
FCGI_GET_VALUES_RESULT = 10
FCGI_UNKNOWN_TYPE = 11
FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
FCGI_NULL_REQUEST_ID = 0
FCGI_KEEP_CONN = 1
FCGI_RESPONDER = 1
FCGI_AUTHORIZER = 2
FCGI_FILTER = 3
FCGI_REQUEST_COMPLETE = 0
FCGI_CANT_MPX_CONN = 1
FCGI_OVERLOADED = 2
FCGI_UNKNOWN_ROLE = 3
FCGI_MAX_CONNS = 'FCGI_MAX_CONNS'
FCGI_MAX_REQS = 'FCGI_MAX_REQS'
FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS'
FCGI_Header = '!BBHHBx'
FCGI_BeginRequestBody = '!HB5x'
FCGI_EndRequestBody = '!LB3x'
FCGI_UnknownTypeBody = '!B7x'
FCGI_BeginRequestBody_LEN = struct.calcsize(FCGI_BeginRequestBody)
FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody)
FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody)
if __debug__:
import time
# Set non-zero to write debug output to a file.
DEBUG = 0
DEBUGLOG = '/www/server/mdserver-web/logs/fastcgi.log'
def _debug(level, msg):
if DEBUG < level:
return
try:
f = open(DEBUGLOG, 'a')
f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg))
f.close()
except:
pass
def decode_pair(s, pos=0):
"""
Decodes a name/value pair.
The number of bytes decoded as well as the name/value pair
are returned.
"""
nameLength = ord(s[pos])
if nameLength & 128:
nameLength = struct.unpack('!L', s[pos:pos + 4])[0] & 0x7fffffff
pos += 4
else:
pos += 1
valueLength = ord(s[pos])
if valueLength & 128:
valueLength = struct.unpack('!L', s[pos:pos + 4])[0] & 0x7fffffff
pos += 4
else:
pos += 1
name = s[pos:pos + nameLength]
pos += nameLength
value = s[pos:pos + valueLength]
pos += valueLength
return (pos, (name, value))
def encode_pair(name, value):
"""
Encodes a name/value pair.
The encoded string is returned.
"""
nameLength = len(name)
if nameLength < 128:
s = chr(nameLength).encode()
else:
s = struct.pack('!L', nameLength | 0x80000000)
valueLength = len(value)
if valueLength < 128:
s += chr(valueLength).encode()
else:
s += struct.pack('!L', valueLength | 0x80000000)
return s + name + value
class Record(object):
"""
A FastCGI Record.
Used for encoding/decoding records.
"""
def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID):
self.version = FCGI_VERSION_1
self.type = type
self.requestId = requestId
self.contentLength = 0
self.paddingLength = 0
self.contentData = ''
def _recvall(sock, length):
"""
Attempts to receive length bytes from a socket, blocking if necessary.
(Socket may be blocking or non-blocking.)
"""
dataList = []
recvLen = 0
while length:
try:
data = sock.recv(length)
except socket.error as e:
if e[0] == errno.EAGAIN:
select.select([sock], [], [])
continue
else:
raise
if not data: # EOF
break
dataList.append(data)
dataLen = len(data)
recvLen += dataLen
length -= dataLen
return b''.join(dataList), recvLen
_recvall = staticmethod(_recvall)
def read(self, sock):
"""Read and decode a Record from a socket."""
try:
header, length = self._recvall(sock, FCGI_HEADER_LEN)
except:
raise EOFError
if length < FCGI_HEADER_LEN:
raise EOFError
self.version, self.type, self.requestId, self.contentLength, \
self.paddingLength = struct.unpack(FCGI_Header, header)
if __debug__:
_debug(9, 'read: fd = %d, type = %d, requestId = %d, '
'contentLength = %d' %
(sock.fileno(), self.type, self.requestId,
self.contentLength))
if self.contentLength:
try:
self.contentData, length = self._recvall(sock,
self.contentLength)
except:
raise EOFError
if length < self.contentLength:
raise EOFError
if self.paddingLength:
try:
self._recvall(sock, self.paddingLength)
except:
raise EOFError
def _sendall(sock, data):
"""
Writes data to a socket and does not return until all the data is sent.
"""
length = len(data)
while length:
try:
sent = sock.send(data)
except socket.error as e:
if e[0] == errno.EAGAIN:
select.select([], [sock], [])
continue
else:
raise
data = data[sent:]
length -= sent
_sendall = staticmethod(_sendall)
def write(self, sock):
"""Encode and write a Record to a socket."""
self.paddingLength = -self.contentLength & 7
if __debug__:
_debug(9, 'write: fd = %d, type = %d, requestId = %d, '
'contentLength = %d' %
(sock.fileno(), self.type, self.requestId,
self.contentLength))
header = struct.pack(FCGI_Header, self.version, self.type,
self.requestId, self.contentLength,
self.paddingLength)
self._sendall(sock, header)
if self.contentLength:
self._sendall(sock, self.contentData)
if self.paddingLength:
self._sendall(sock, b'\x00' * self.paddingLength)
class FCGIApp(object):
def __init__(self, connect=None, host=None, port=None, filterEnviron=True):
if host is not None:
assert port is not None
connect = (host, port)
self._connect = connect
self._filterEnviron = filterEnviron
def __call__(self, environ, io, start_response=None):
# For sanity's sake, we don't care about FCGI_MPXS_CONN
# (connection multiplexing). For every request, we obtain a new
# transport socket, perform the request, then discard the socket.
# This is, I believe, how mod_fastcgi does things...
sock = self._getConnection()
# Since this is going to be the only request on this connection,
# set the request ID to 1.
requestId = 1
# Begin the request
rec = Record(FCGI_BEGIN_REQUEST, requestId)
rec.contentData = struct.pack(FCGI_BeginRequestBody, FCGI_RESPONDER, 0)
rec.contentLength = FCGI_BeginRequestBody_LEN
rec.write(sock)
# Filter WSGI environ and send it as FCGI_PARAMS
if self._filterEnviron:
params = self._defaultFilterEnviron(environ)
else:
params = self._lightFilterEnviron(environ)
# TODO: Anything not from environ that needs to be sent also?
# return '200 OK',[],str(params),''
self._fcgiParams(sock, requestId, params)
self._fcgiParams(sock, requestId, {})
# Transfer wsgi.input to FCGI_STDIN
content_length = int(environ.get('CONTENT_LENGTH') or 0)
s = ''
#io = StringIO(stdin)
while True:
if not io:
break
chunk_size = min(content_length, 4096)
s = io.read(chunk_size)
content_length -= len(s)
rec = Record(FCGI_STDIN, requestId)
rec.contentData = s
rec.contentLength = len(s)
rec.write(sock)
if not s:
break
# Empty FCGI_DATA stream
rec = Record(FCGI_DATA, requestId)
rec.write(sock)
return sock
def _getConnection(self):
if self._connect is not None:
# The simple case. Create a socket and connect to the
# application.
if isinstance(self._connect, str):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self._connect)
elif hasattr(socket, 'create_connection'):
sock = socket.create_connection(self._connect)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(self._connect)
return sock
# To be done when I have more time...
# , 'Launching and managing FastCGI programs not yet implemented'
raise NotImplementedError
def _fcgiGetValues(self, sock, vars):
# Construct FCGI_GET_VALUES record
outrec = Record(FCGI_GET_VALUES)
data = []
for name in vars:
data.append(encode_pair(name, ''))
data = ''.join(data)
outrec.contentData = data
outrec.contentLength = len(data)
outrec.write(sock)
# Await response
inrec = Record()
inrec.read(sock)
result = {}
if inrec.type == FCGI_GET_VALUES_RESULT:
pos = 0
while pos < inrec.contentLength:
pos, (name, value) = decode_pair(inrec.contentData, pos)
result[name] = value
return result
def _fcgiParams(self, sock, requestId, params):
rec = Record(FCGI_PARAMS, requestId)
data = []
for name, value in params.items():
data.append(encode_pair(name.encode(
'latin-1'), value.encode('latin-1')))
data = b''.join(data)
rec.contentData = data
rec.contentLength = len(data)
rec.write(sock)
_environPrefixes = ['SERVER_', 'HTTP_', 'REQUEST_', 'REMOTE_', 'PATH_',
'CONTENT_', 'DOCUMENT_', 'SCRIPT_']
_environCopies = ['SCRIPT_NAME', 'QUERY_STRING', 'AUTH_TYPE']
_environRenames = []
def _defaultFilterEnviron(self, environ):
result = {}
for n in environ.keys():
iv = False
for p in self._environPrefixes:
if n.startswith(p):
result[n] = environ[n]
iv = True
if n in self._environCopies:
result[n] = environ[n]
iv = True
if n in self._environRenames:
result[self._environRenames[n]] = environ[n]
iv = True
if not iv:
result[n] = environ[n]
return result
def _lightFilterEnviron(self, environ):
result = {}
for n in environ.keys():
if n.upper() == n:
result[n] = environ[n]
return result

@ -0,0 +1,209 @@
# coding:utf-8
import json
import os
import time
import re
import sys
import time
import struct
from .fcgi_client import FCGIApp
FCGI_Header = '!BBHHBx'
if sys.version_info[0] == 2:
try:
from cStringIO import StringIO
except:
from StringIO import StringIO
else:
from io import BytesIO as StringIO
def get_header_data(sock):
'''
@name 获取头部32KB数据
@param sock socketobject(fastcgi套接字对象)
@return bytes
'''
headers_data = b''
total_len = 0
header_len = 1024 * 128
while True:
fastcgi_header = sock.recv(8)
if not fastcgi_header:
break
if len(fastcgi_header) != 8:
headers_data += fastcgi_header
break
fast_pack = struct.unpack(FCGI_Header, fastcgi_header)
if fast_pack[1] == 3:
break
tlen = fast_pack[3]
while tlen > 0:
sd = sock.recv(tlen)
if not sd:
break
headers_data += sd
tlen -= len(sd)
total_len += fast_pack[3]
if fast_pack[4]:
sock.recv(fast_pack[4])
if total_len > header_len:
break
return headers_data
def format_header_data(headers_data):
'''
@name 格式化响应头
@param headers_data bytes(fastcgi头部32KB数据)
@return status int(响应状态), headers dict(响应头), bdata bytes(格式化响应头后的多余数据)
'''
status = '200 OK'
headers = {}
pos = 0
while True:
eolpos = headers_data.find(b'\n', pos)
if eolpos < 0:
break
line = headers_data[pos:eolpos - 1]
pos = eolpos + 1
line = line.strip()
if len(line) < 2:
break
if line.find(b':') == -1:
continue
header, value = line.split(b':', 1)
header = header.strip()
value = value.strip()
if isinstance(header, bytes):
header = header.decode()
value = value.decode()
if header == 'Status':
status = value
if status.find(' ') < 0:
status += ' BTPanel'
else:
headers[header] = value
bdata = headers_data[pos:]
status = int(status.split(' ')[0])
return status, headers, bdata
def resp_sock(sock, bdata):
'''
@name 以流的方式发送剩余数据
@param sock socketobject(fastcgi套接字对象)
@param bdata bytes(格式化响应头后的多余数据)
@return yield bytes
'''
# 发送除响应头以外的多余头部数据
yield bdata
while True:
fastcgi_header = sock.recv(8)
if not fastcgi_header:
break
if len(fastcgi_header) != 8:
yield fastcgi_header
break
fast_pack = struct.unpack(FCGI_Header, fastcgi_header)
if fast_pack[1] == 3:
break
tlen = fast_pack[3]
while tlen > 0:
sd = sock.recv(tlen)
if not sd:
break
tlen -= len(sd)
if sd:
yield sd
if fast_pack[4]:
sock.recv(fast_pack[4])
sock.close()
class fpm(object):
def __init__(self, sock=None, document_root='', last_path=''):
'''
@name 实例化FPM对象
@param sock string(unixsocket路径)
@param document_root string(PHP文档根目录)
@return FPM
'''
if sock:
self.fcgi_sock = sock
if document_root[-1:] != '/':
document_root += '/'
self.document_root = document_root
self.last_path = last_path
def load_url_public(self, url, content=b'', method='GET', content_type='application/x-www-form-urlencoded'):
'''
@name 转发URL到PHP-FPM 公共
@param url string(URI地址)
@param content stream(POST数据io对象)
@return fastcgi-socket
'''
fcgi = FCGIApp(connect=self.fcgi_sock)
try:
script_name, query_string = url.split('?')
except ValueError:
script_name = url
query_string = ''
content_length = len(content)
if content:
content = StringIO(content)
env = {
'SCRIPT_FILENAME': '%s%s' % (self.document_root, script_name),
'QUERY_STRING': query_string,
'REQUEST_METHOD': method,
'SCRIPT_NAME': self.last_path + script_name,
'REQUEST_URI': url,
'GATEWAY_INTERFACE': 'CGI/1.1',
'SERVER_SOFTWARE': 'MW-PANEL',
'REDIRECT_STATUS': '200',
'CONTENT_TYPE': content_type,
'CONTENT_LENGTH': str(content_length),
'DOCUMENT_URI': script_name,
'DOCUMENT_ROOT': self.document_root,
'SERVER_PROTOCOL': 'HTTP/1.1',
'REMOTE_ADDR': '127.0.0.1',
'REMOTE_PORT': '7200',
'SERVER_ADDR': '127.0.0.1',
'SERVER_PORT': '80',
'SERVER_NAME': 'MW-Panel'
}
fpm_sock = fcgi(env, content)
_data = b''
while True:
fastcgi_header = fpm_sock.recv(8)
if not fastcgi_header:
break
if len(fastcgi_header) != 8:
_data += fastcgi_header
break
fast_pack = struct.unpack(FCGI_Header, fastcgi_header)
if fast_pack[1] == 3:
break
tlen = fast_pack[3]
while tlen > 0:
sd = fpm_sock.recv(tlen)
if not sd:
break
tlen -= len(sd)
_data += sd
if fast_pack[4]:
fpm_sock.recv(fast_pack[4])
status, headers, data = format_header_data(_data)
return data
Loading…
Cancel
Save