diff --git a/class/plugin/fcgi_client.py b/class/plugin/fcgi_client.py index 3f71a5313..f1e0c67e7 100644 --- a/class/plugin/fcgi_client.py +++ b/class/plugin/fcgi_client.py @@ -23,14 +23,18 @@ # SUCH DAMAGE. # # $Id$ +# +# Copyright (c) 2011 Vladimir Rusinov __author__ = 'Allan Saddi ' __version__ = '$Revision$' - +import sys import select import struct import socket import errno +import types + __all__ = ['FCGIApp'] @@ -85,7 +89,7 @@ if __debug__: # Set non-zero to write debug output to a file. DEBUG = 0 - DEBUGLOG = '/tmp/fcgi_app.log' + DEBUGLOG = '/www/server/mdserver-web/logs/fastcgi.log' def _debug(level, msg): if DEBUG < level: @@ -98,6 +102,7 @@ if __debug__: except: pass + def decode_pair(s, pos=0): """ Decodes a name/value pair. @@ -107,25 +112,26 @@ def decode_pair(s, pos=0): """ nameLength = ord(s[pos]) if nameLength & 128: - nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff + nameLength = struct.unpack('!L', s[pos:pos + 4])[0] & 0x7fffffff pos += 4 else: pos += 1 valueLength = ord(s[pos]) if valueLength & 128: - valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff + valueLength = struct.unpack('!L', s[pos:pos + 4])[0] & 0x7fffffff pos += 4 else: pos += 1 - name = s[pos:pos+nameLength] + name = s[pos:pos + nameLength] pos += nameLength - value = s[pos:pos+valueLength] + value = s[pos:pos + valueLength] pos += valueLength return (pos, (name, value)) + def encode_pair(name, value): """ Encodes a name/value pair. @@ -134,24 +140,26 @@ def encode_pair(name, value): """ nameLength = len(name) if nameLength < 128: - s = chr(nameLength) + s = chr(nameLength).encode() else: s = struct.pack('!L', nameLength | 0x80000000) valueLength = len(value) if valueLength < 128: - s += chr(valueLength) + s += chr(valueLength).encode() else: s += struct.pack('!L', valueLength | 0x80000000) return s + name + value + class Record(object): """ A FastCGI Record. Used for encoding/decoding records. """ + def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID): self.version = FCGI_VERSION_1 self.type = type @@ -171,18 +179,18 @@ class Record(object): try: data = sock.recv(length) except socket.error as e: - if e.errno == errno.EAGAIN: + if e[0] == errno.EAGAIN: select.select([sock], [], []) continue else: raise - if not data: # EOF + if not data: # EOF break dataList.append(data) dataLen = len(data) recvLen += dataLen length -= dataLen - return ''.join(dataList), recvLen + return b''.join(dataList), recvLen _recvall = staticmethod(_recvall) def read(self, sock): @@ -194,15 +202,16 @@ class Record(object): if length < FCGI_HEADER_LEN: raise EOFError - + self.version, self.type, self.requestId, self.contentLength, \ - self.paddingLength = struct.unpack(FCGI_Header, header) + self.paddingLength = struct.unpack(FCGI_Header, header) + + if __debug__: + _debug(9, 'read: fd = %d, type = %d, requestId = %d, ' + 'contentLength = %d' % + (sock.fileno(), self.type, self.requestId, + self.contentLength)) - if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, ' - 'contentLength = %d' % - (sock.fileno(), self.type, self.requestId, - self.contentLength)) - if self.contentLength: try: self.contentData, length = self._recvall(sock, @@ -228,7 +237,7 @@ class Record(object): try: sent = sock.send(data) except socket.error as e: - if e.errno == errno.EAGAIN: + if e[0] == errno.EAGAIN: select.select([], [sock], []) continue else: @@ -241,10 +250,11 @@ class Record(object): """Encode and write a Record to a socket.""" self.paddingLength = -self.contentLength & 7 - if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, ' - 'contentLength = %d' % - (sock.fileno(), self.type, self.requestId, - self.contentLength)) + if __debug__: + _debug(9, 'write: fd = %d, type = %d, requestId = %d, ' + 'contentLength = %d' % + (sock.fileno(), self.type, self.requestId, + self.contentLength)) header = struct.pack(FCGI_Header, self.version, self.type, self.requestId, self.contentLength, @@ -253,28 +263,20 @@ class Record(object): if self.contentLength: self._sendall(sock, self.contentData) if self.paddingLength: - self._sendall(sock, '\x00'*self.paddingLength) + self._sendall(sock, b'\x00' * self.paddingLength) + class FCGIApp(object): - def __init__(self, command=None, connect=None, host=None, port=None, - filterEnviron=True): + + def __init__(self, connect=None, host=None, port=None, filterEnviron=True): if host is not None: assert port is not None - connect=(host, port) + connect = (host, port) - assert (command is not None and connect is None) or \ - (command is None and connect is not None) - - self._command = command self._connect = connect - self._filterEnviron = filterEnviron - - #sock = self._getConnection() - #print self._fcgiGetValues(sock, ['FCGI_MAX_CONNS', 'FCGI_MAX_REQS', 'FCGI_MPXS_CONNS']) - #sock.close() - - def __call__(self, environ, start_response): + + def __call__(self, environ, io, start_response=None): # For sanity's sake, we don't care about FCGI_MPXS_CONN # (connection multiplexing). For every request, we obtain a new # transport socket, perform the request, then discard the socket. @@ -298,104 +300,49 @@ class FCGIApp(object): else: params = self._lightFilterEnviron(environ) # TODO: Anything not from environ that needs to be sent also? + # return '200 OK',[],str(params),'' self._fcgiParams(sock, requestId, params) self._fcgiParams(sock, requestId, {}) # Transfer wsgi.input to FCGI_STDIN content_length = int(environ.get('CONTENT_LENGTH') or 0) + s = '' + #io = StringIO(stdin) while True: + if not io: + break chunk_size = min(content_length, 4096) - s = environ['wsgi.input'].read(chunk_size) + s = io.read(chunk_size) content_length -= len(s) rec = Record(FCGI_STDIN, requestId) rec.contentData = s rec.contentLength = len(s) rec.write(sock) - - if not s: break - + if not s: + break # Empty FCGI_DATA stream rec = Record(FCGI_DATA, requestId) rec.write(sock) - - # Main loop. Process FCGI_STDOUT, FCGI_STDERR, FCGI_END_REQUEST - # records from the application. - result = [] - while True: - inrec = Record() - inrec.read(sock) - if inrec.type == FCGI_STDOUT: - if inrec.contentData: - result.append(inrec.contentData) - else: - # TODO: Should probably be pedantic and no longer - # accept FCGI_STDOUT records? - pass - elif inrec.type == FCGI_STDERR: - # Simply forward to wsgi.errors - environ['wsgi.errors'].write(inrec.contentData) - elif inrec.type == FCGI_END_REQUEST: - # TODO: Process appStatus/protocolStatus fields? - break - - # Done with this transport socket, close it. (FCGI_KEEP_CONN was not - # set in the FCGI_BEGIN_REQUEST record we sent above. So the - # application is expected to do the same.) - sock.close() - - result = ''.join(result) - - # Parse response headers from FCGI_STDOUT - status = '200 OK' - headers = [] - pos = 0 - while True: - eolpos = result.find('\n', pos) - if eolpos < 0: break - line = result[pos:eolpos-1] - pos = eolpos + 1 - - # strip in case of CR. NB: This will also strip other - # whitespace... - line = line.strip() - - # Empty line signifies end of headers - if not line: break - - # TODO: Better error handling - header, value = line.split(':', 1) - header = header.strip().lower() - value = value.strip() - - if header == 'status': - # Special handling of Status header - status = value - if status.find(' ') < 0: - # Append a dummy reason phrase if one was not provided - status += ' FCGIApp' - else: - headers.append((header, value)) - - result = result[pos:] - - # Set WSGI status, headers, and return result. - start_response(status, headers) - return [result] + return sock def _getConnection(self): if self._connect is not None: # The simple case. Create a socket and connect to the # application. - if type(self._connect) is str: + if isinstance(self._connect, str): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(self._connect) + elif hasattr(socket, 'create_connection'): + sock = socket.create_connection(self._connect) else: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(self._connect) + sock.connect(self._connect) return sock # To be done when I have more time... - raise NotImplementedError('Launching and managing FastCGI programs not yet implemented') - + # , 'Launching and managing FastCGI programs not yet implemented' + raise NotImplementedError + def _fcgiGetValues(self, sock, vars): # Construct FCGI_GET_VALUES record outrec = Record(FCGI_GET_VALUES) @@ -421,29 +368,36 @@ class FCGIApp(object): def _fcgiParams(self, sock, requestId, params): rec = Record(FCGI_PARAMS, requestId) data = [] - for name,value in params.items(): - data.append(encode_pair(name, value)) - data = ''.join(data) + for name, value in params.items(): + data.append(encode_pair(name.encode( + 'latin-1'), value.encode('latin-1'))) + data = b''.join(data) rec.contentData = data rec.contentLength = len(data) rec.write(sock) _environPrefixes = ['SERVER_', 'HTTP_', 'REQUEST_', 'REMOTE_', 'PATH_', - 'CONTENT_'] + 'CONTENT_', 'DOCUMENT_', 'SCRIPT_'] _environCopies = ['SCRIPT_NAME', 'QUERY_STRING', 'AUTH_TYPE'] - _environRenames = {} + _environRenames = [] def _defaultFilterEnviron(self, environ): result = {} for n in environ.keys(): + iv = False for p in self._environPrefixes: if n.startswith(p): result[n] = environ[n] + iv = True if n in self._environCopies: result[n] = environ[n] + iv = True if n in self._environRenames: result[self._environRenames[n]] = environ[n] - + iv = True + if not iv: + result[n] = environ[n] + return result def _lightFilterEnviron(self, environ): @@ -452,10 +406,3 @@ class FCGIApp(object): if n.upper() == n: result[n] = environ[n] return result - -if __name__ == '__main__': - from flup.server.ajp import WSGIServer - app = FCGIApp(connect=('localhost', 4242)) - #import paste.lint - #app = paste.lint.middleware(app) - WSGIServer(app).run() diff --git a/class/plugin/fpm.py b/class/plugin/fpm.py new file mode 100644 index 000000000..ed6e8058d --- /dev/null +++ b/class/plugin/fpm.py @@ -0,0 +1,208 @@ +# coding:utf-8 + + +import json +import os +import time +import re +import sys +import time +import struct +import fcgi_client + +FCGI_Header = '!BBHHBx' + +if sys.version_info[0] == 2: + try: + from cStringIO import StringIO + except: + from StringIO import StringIO +else: + from io import BytesIO as StringIO + + +def get_header_data(sock): + ''' + @name 获取头部32KB数据 + @param sock socketobject(fastcgi套接字对象) + @return bytes + ''' + headers_data = b'' + total_len = 0 + header_len = 1024 * 128 + while True: + fastcgi_header = sock.recv(8) + if not fastcgi_header: + break + if len(fastcgi_header) != 8: + headers_data += fastcgi_header + break + fast_pack = struct.unpack(FCGI_Header, fastcgi_header) + if fast_pack[1] == 3: + break + + tlen = fast_pack[3] + while tlen > 0: + sd = sock.recv(tlen) + if not sd: + break + headers_data += sd + tlen -= len(sd) + + total_len += fast_pack[3] + if fast_pack[4]: + sock.recv(fast_pack[4]) + if total_len > header_len: + break + return headers_data + + +def format_header_data(headers_data): + ''' + @name 格式化响应头 + @param headers_data bytes(fastcgi头部32KB数据) + @return status int(响应状态), headers dict(响应头), bdata bytes(格式化响应头后的多余数据) + ''' + status = '200 OK' + headers = {} + pos = 0 + while True: + eolpos = headers_data.find(b'\n', pos) + if eolpos < 0: + break + line = headers_data[pos:eolpos - 1] + pos = eolpos + 1 + line = line.strip() + if len(line) < 2: + break + if line.find(b':') == -1: + continue + header, value = line.split(b':', 1) + header = header.strip() + value = value.strip() + if isinstance(header, bytes): + header = header.decode() + value = value.decode() + if header == 'Status': + status = value + if status.find(' ') < 0: + status += ' BTPanel' + else: + headers[header] = value + bdata = headers_data[pos:] + status = int(status.split(' ')[0]) + return status, headers, bdata + + +def resp_sock(sock, bdata): + ''' + @name 以流的方式发送剩余数据 + @param sock socketobject(fastcgi套接字对象) + @param bdata bytes(格式化响应头后的多余数据) + @return yield bytes + ''' + # 发送除响应头以外的多余头部数据 + yield bdata + while True: + fastcgi_header = sock.recv(8) + if not fastcgi_header: + break + if len(fastcgi_header) != 8: + yield fastcgi_header + break + fast_pack = struct.unpack(FCGI_Header, fastcgi_header) + if fast_pack[1] == 3: + break + tlen = fast_pack[3] + while tlen > 0: + sd = sock.recv(tlen) + if not sd: + break + tlen -= len(sd) + if sd: + yield sd + + if fast_pack[4]: + sock.recv(fast_pack[4]) + sock.close() + + +class fpm(object): + + def __init__(self, sock=None, document_root='', last_path=''): + ''' + @name 实例化FPM对象 + @param sock string(unixsocket路径) + @param document_root string(PHP文档根目录) + @return FPM + ''' + if sock: + self.fcgi_sock = sock + if document_root[-1:] != '/': + document_root += '/' + self.document_root = document_root + self.last_path = last_path + + def load_url_public(self, url, content=b'', method='GET', content_type='application/x-www-form-urlencoded'): + ''' + @name 转发URL到PHP-FPM 公共 + @param url string(URI地址) + @param content stream(POST数据io对象) + @return fastcgi-socket + ''' + fcgi = fcgi_client.FCGIApp(connect=self.fcgi_sock) + try: + script_name, query_string = url.split('?') + except ValueError: + script_name = url + query_string = '' + + content_length = len(content) + if content: + content = StringIO(content) + + env = { + 'SCRIPT_FILENAME': '%s%s' % (self.document_root, script_name), + 'QUERY_STRING': query_string, + 'REQUEST_METHOD': method, + 'SCRIPT_NAME': self.last_path + script_name, + 'REQUEST_URI': url, + 'GATEWAY_INTERFACE': 'CGI/1.1', + 'SERVER_SOFTWARE': 'MW-Panel', + 'REDIRECT_STATUS': '200', + 'CONTENT_TYPE': content_type, + 'CONTENT_LENGTH': str(content_length), + 'DOCUMENT_URI': script_name, + 'DOCUMENT_ROOT': self.document_root, + 'SERVER_PROTOCOL': 'HTTP/1.1', + 'REMOTE_ADDR': '127.0.0.1', + 'REMOTE_PORT': '7200', + 'SERVER_ADDR': '127.0.0.1', + 'SERVER_PORT': '80', + 'SERVER_NAME': 'MW-Panel' + } + + fpm_sock = fcgi(env, content) + + _data = b'' + while True: + fastcgi_header = fpm_sock.recv(8) + if not fastcgi_header: + break + if len(fastcgi_header) != 8: + _data += fastcgi_header + break + fast_pack = struct.unpack(FCGI_Header, fastcgi_header) + if fast_pack[1] == 3: + break + tlen = fast_pack[3] + while tlen > 0: + sd = fpm_sock.recv(tlen) + if not sd: + break + tlen -= len(sd) + _data += sd + if fast_pack[4]: + fpm_sock.recv(fast_pack[4]) + status, headers, data = format_header_data(_data) + return data