pull/121/head
midoks 3 years ago
parent 60fe47be21
commit fe86ab7eee
  1. 153
      class/plugin/fcgi_client.py
  2. 208
      class/plugin/fpm.py

@ -23,14 +23,18 @@
# SUCH DAMAGE.
#
# $Id$
#
# Copyright (c) 2011 Vladimir Rusinov <vladimir@greenmice.info>
__author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import sys
import select
import struct
import socket
import errno
import types
__all__ = ['FCGIApp']
@ -85,7 +89,7 @@ if __debug__:
# Set non-zero to write debug output to a file.
DEBUG = 0
DEBUGLOG = '/tmp/fcgi_app.log'
DEBUGLOG = '/www/server/mdserver-web/logs/fastcgi.log'
def _debug(level, msg):
if DEBUG < level:
@ -98,6 +102,7 @@ if __debug__:
except:
pass
def decode_pair(s, pos=0):
"""
Decodes a name/value pair.
@ -126,6 +131,7 @@ def decode_pair(s, pos=0):
return (pos, (name, value))
def encode_pair(name, value):
"""
Encodes a name/value pair.
@ -134,24 +140,26 @@ def encode_pair(name, value):
"""
nameLength = len(name)
if nameLength < 128:
s = chr(nameLength)
s = chr(nameLength).encode()
else:
s = struct.pack('!L', nameLength | 0x80000000)
valueLength = len(value)
if valueLength < 128:
s += chr(valueLength)
s += chr(valueLength).encode()
else:
s += struct.pack('!L', valueLength | 0x80000000)
return s + name + value
class Record(object):
"""
A FastCGI Record.
Used for encoding/decoding records.
"""
def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID):
self.version = FCGI_VERSION_1
self.type = type
@ -171,7 +179,7 @@ class Record(object):
try:
data = sock.recv(length)
except socket.error as e:
if e.errno == errno.EAGAIN:
if e[0] == errno.EAGAIN:
select.select([sock], [], [])
continue
else:
@ -182,7 +190,7 @@ class Record(object):
dataLen = len(data)
recvLen += dataLen
length -= dataLen
return ''.join(dataList), recvLen
return b''.join(dataList), recvLen
_recvall = staticmethod(_recvall)
def read(self, sock):
@ -198,7 +206,8 @@ class Record(object):
self.version, self.type, self.requestId, self.contentLength, \
self.paddingLength = struct.unpack(FCGI_Header, header)
if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, '
if __debug__:
_debug(9, 'read: fd = %d, type = %d, requestId = %d, '
'contentLength = %d' %
(sock.fileno(), self.type, self.requestId,
self.contentLength))
@ -228,7 +237,7 @@ class Record(object):
try:
sent = sock.send(data)
except socket.error as e:
if e.errno == errno.EAGAIN:
if e[0] == errno.EAGAIN:
select.select([], [sock], [])
continue
else:
@ -241,7 +250,8 @@ class Record(object):
"""Encode and write a Record to a socket."""
self.paddingLength = -self.contentLength & 7
if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, '
if __debug__:
_debug(9, 'write: fd = %d, type = %d, requestId = %d, '
'contentLength = %d' %
(sock.fileno(), self.type, self.requestId,
self.contentLength))
@ -253,28 +263,20 @@ class Record(object):
if self.contentLength:
self._sendall(sock, self.contentData)
if self.paddingLength:
self._sendall(sock, '\x00'*self.paddingLength)
self._sendall(sock, b'\x00' * self.paddingLength)
class FCGIApp(object):
def __init__(self, command=None, connect=None, host=None, port=None,
filterEnviron=True):
def __init__(self, connect=None, host=None, port=None, filterEnviron=True):
if host is not None:
assert port is not None
connect = (host, port)
assert (command is not None and connect is None) or \
(command is None and connect is not None)
self._command = command
self._connect = connect
self._filterEnviron = filterEnviron
#sock = self._getConnection()
#print self._fcgiGetValues(sock, ['FCGI_MAX_CONNS', 'FCGI_MAX_REQS', 'FCGI_MPXS_CONNS'])
#sock.close()
def __call__(self, environ, start_response):
def __call__(self, environ, io, start_response=None):
# For sanity's sake, we don't care about FCGI_MPXS_CONN
# (connection multiplexing). For every request, we obtain a new
# transport socket, perform the request, then discard the socket.
@ -298,103 +300,48 @@ class FCGIApp(object):
else:
params = self._lightFilterEnviron(environ)
# TODO: Anything not from environ that needs to be sent also?
# return '200 OK',[],str(params),''
self._fcgiParams(sock, requestId, params)
self._fcgiParams(sock, requestId, {})
# Transfer wsgi.input to FCGI_STDIN
content_length = int(environ.get('CONTENT_LENGTH') or 0)
s = ''
#io = StringIO(stdin)
while True:
if not io:
break
chunk_size = min(content_length, 4096)
s = environ['wsgi.input'].read(chunk_size)
s = io.read(chunk_size)
content_length -= len(s)
rec = Record(FCGI_STDIN, requestId)
rec.contentData = s
rec.contentLength = len(s)
rec.write(sock)
if not s: break
if not s:
break
# Empty FCGI_DATA stream
rec = Record(FCGI_DATA, requestId)
rec.write(sock)
# Main loop. Process FCGI_STDOUT, FCGI_STDERR, FCGI_END_REQUEST
# records from the application.
result = []
while True:
inrec = Record()
inrec.read(sock)
if inrec.type == FCGI_STDOUT:
if inrec.contentData:
result.append(inrec.contentData)
else:
# TODO: Should probably be pedantic and no longer
# accept FCGI_STDOUT records?
pass
elif inrec.type == FCGI_STDERR:
# Simply forward to wsgi.errors
environ['wsgi.errors'].write(inrec.contentData)
elif inrec.type == FCGI_END_REQUEST:
# TODO: Process appStatus/protocolStatus fields?
break
# Done with this transport socket, close it. (FCGI_KEEP_CONN was not
# set in the FCGI_BEGIN_REQUEST record we sent above. So the
# application is expected to do the same.)
sock.close()
result = ''.join(result)
# Parse response headers from FCGI_STDOUT
status = '200 OK'
headers = []
pos = 0
while True:
eolpos = result.find('\n', pos)
if eolpos < 0: break
line = result[pos:eolpos-1]
pos = eolpos + 1
# strip in case of CR. NB: This will also strip other
# whitespace...
line = line.strip()
# Empty line signifies end of headers
if not line: break
# TODO: Better error handling
header, value = line.split(':', 1)
header = header.strip().lower()
value = value.strip()
if header == 'status':
# Special handling of Status header
status = value
if status.find(' ') < 0:
# Append a dummy reason phrase if one was not provided
status += ' FCGIApp'
else:
headers.append((header, value))
result = result[pos:]
# Set WSGI status, headers, and return result.
start_response(status, headers)
return [result]
return sock
def _getConnection(self):
if self._connect is not None:
# The simple case. Create a socket and connect to the
# application.
if type(self._connect) is str:
if isinstance(self._connect, str):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self._connect)
elif hasattr(socket, 'create_connection'):
sock = socket.create_connection(self._connect)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(self._connect)
return sock
# To be done when I have more time...
raise NotImplementedError('Launching and managing FastCGI programs not yet implemented')
# , 'Launching and managing FastCGI programs not yet implemented'
raise NotImplementedError
def _fcgiGetValues(self, sock, vars):
# Construct FCGI_GET_VALUES record
@ -422,27 +369,34 @@ class FCGIApp(object):
rec = Record(FCGI_PARAMS, requestId)
data = []
for name, value in params.items():
data.append(encode_pair(name, value))
data = ''.join(data)
data.append(encode_pair(name.encode(
'latin-1'), value.encode('latin-1')))
data = b''.join(data)
rec.contentData = data
rec.contentLength = len(data)
rec.write(sock)
_environPrefixes = ['SERVER_', 'HTTP_', 'REQUEST_', 'REMOTE_', 'PATH_',
'CONTENT_']
'CONTENT_', 'DOCUMENT_', 'SCRIPT_']
_environCopies = ['SCRIPT_NAME', 'QUERY_STRING', 'AUTH_TYPE']
_environRenames = {}
_environRenames = []
def _defaultFilterEnviron(self, environ):
result = {}
for n in environ.keys():
iv = False
for p in self._environPrefixes:
if n.startswith(p):
result[n] = environ[n]
iv = True
if n in self._environCopies:
result[n] = environ[n]
iv = True
if n in self._environRenames:
result[self._environRenames[n]] = environ[n]
iv = True
if not iv:
result[n] = environ[n]
return result
@ -452,10 +406,3 @@ class FCGIApp(object):
if n.upper() == n:
result[n] = environ[n]
return result
if __name__ == '__main__':
from flup.server.ajp import WSGIServer
app = FCGIApp(connect=('localhost', 4242))
#import paste.lint
#app = paste.lint.middleware(app)
WSGIServer(app).run()

@ -0,0 +1,208 @@
# coding:utf-8
import json
import os
import time
import re
import sys
import time
import struct
import fcgi_client
FCGI_Header = '!BBHHBx'
if sys.version_info[0] == 2:
try:
from cStringIO import StringIO
except:
from StringIO import StringIO
else:
from io import BytesIO as StringIO
def get_header_data(sock):
'''
@name 获取头部32KB数据
@param sock socketobject(fastcgi套接字对象)
@return bytes
'''
headers_data = b''
total_len = 0
header_len = 1024 * 128
while True:
fastcgi_header = sock.recv(8)
if not fastcgi_header:
break
if len(fastcgi_header) != 8:
headers_data += fastcgi_header
break
fast_pack = struct.unpack(FCGI_Header, fastcgi_header)
if fast_pack[1] == 3:
break
tlen = fast_pack[3]
while tlen > 0:
sd = sock.recv(tlen)
if not sd:
break
headers_data += sd
tlen -= len(sd)
total_len += fast_pack[3]
if fast_pack[4]:
sock.recv(fast_pack[4])
if total_len > header_len:
break
return headers_data
def format_header_data(headers_data):
'''
@name 格式化响应头
@param headers_data bytes(fastcgi头部32KB数据)
@return status int(响应状态), headers dict(响应头), bdata bytes(格式化响应头后的多余数据)
'''
status = '200 OK'
headers = {}
pos = 0
while True:
eolpos = headers_data.find(b'\n', pos)
if eolpos < 0:
break
line = headers_data[pos:eolpos - 1]
pos = eolpos + 1
line = line.strip()
if len(line) < 2:
break
if line.find(b':') == -1:
continue
header, value = line.split(b':', 1)
header = header.strip()
value = value.strip()
if isinstance(header, bytes):
header = header.decode()
value = value.decode()
if header == 'Status':
status = value
if status.find(' ') < 0:
status += ' BTPanel'
else:
headers[header] = value
bdata = headers_data[pos:]
status = int(status.split(' ')[0])
return status, headers, bdata
def resp_sock(sock, bdata):
'''
@name 以流的方式发送剩余数据
@param sock socketobject(fastcgi套接字对象)
@param bdata bytes(格式化响应头后的多余数据)
@return yield bytes
'''
# 发送除响应头以外的多余头部数据
yield bdata
while True:
fastcgi_header = sock.recv(8)
if not fastcgi_header:
break
if len(fastcgi_header) != 8:
yield fastcgi_header
break
fast_pack = struct.unpack(FCGI_Header, fastcgi_header)
if fast_pack[1] == 3:
break
tlen = fast_pack[3]
while tlen > 0:
sd = sock.recv(tlen)
if not sd:
break
tlen -= len(sd)
if sd:
yield sd
if fast_pack[4]:
sock.recv(fast_pack[4])
sock.close()
class fpm(object):
def __init__(self, sock=None, document_root='', last_path=''):
'''
@name 实例化FPM对象
@param sock string(unixsocket路径)
@param document_root string(PHP文档根目录)
@return FPM
'''
if sock:
self.fcgi_sock = sock
if document_root[-1:] != '/':
document_root += '/'
self.document_root = document_root
self.last_path = last_path
def load_url_public(self, url, content=b'', method='GET', content_type='application/x-www-form-urlencoded'):
'''
@name 转发URL到PHP-FPM 公共
@param url string(URI地址)
@param content stream(POST数据io对象)
@return fastcgi-socket
'''
fcgi = fcgi_client.FCGIApp(connect=self.fcgi_sock)
try:
script_name, query_string = url.split('?')
except ValueError:
script_name = url
query_string = ''
content_length = len(content)
if content:
content = StringIO(content)
env = {
'SCRIPT_FILENAME': '%s%s' % (self.document_root, script_name),
'QUERY_STRING': query_string,
'REQUEST_METHOD': method,
'SCRIPT_NAME': self.last_path + script_name,
'REQUEST_URI': url,
'GATEWAY_INTERFACE': 'CGI/1.1',
'SERVER_SOFTWARE': 'MW-Panel',
'REDIRECT_STATUS': '200',
'CONTENT_TYPE': content_type,
'CONTENT_LENGTH': str(content_length),
'DOCUMENT_URI': script_name,
'DOCUMENT_ROOT': self.document_root,
'SERVER_PROTOCOL': 'HTTP/1.1',
'REMOTE_ADDR': '127.0.0.1',
'REMOTE_PORT': '7200',
'SERVER_ADDR': '127.0.0.1',
'SERVER_PORT': '80',
'SERVER_NAME': 'MW-Panel'
}
fpm_sock = fcgi(env, content)
_data = b''
while True:
fastcgi_header = fpm_sock.recv(8)
if not fastcgi_header:
break
if len(fastcgi_header) != 8:
_data += fastcgi_header
break
fast_pack = struct.unpack(FCGI_Header, fastcgi_header)
if fast_pack[1] == 3:
break
tlen = fast_pack[3]
while tlen > 0:
sd = fpm_sock.recv(tlen)
if not sd:
break
tlen -= len(sd)
_data += sd
if fast_pack[4]:
fpm_sock.recv(fast_pack[4])
status, headers, data = format_header_data(_data)
return data
Loading…
Cancel
Save