Simple Linux Panel
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mdserver-web/class/core/cert_request.py

1203 lines
49 KiB

# coding:utf-8
# ---------------------------------------------------------------------------------
# MW-Linux面板
# ---------------------------------------------------------------------------------
# copyright (c) 2018-∞(https://github.com/midoks/mdserver-web) All rights reserved.
# ---------------------------------------------------------------------------------
# Author: midoks <midoks@163.com>
# ---------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------
# 证书申请
# ---------------------------------------------------------------------------------
import re
import fcntl
import datetime
import binascii
import hashlib
import base64
import json
import time
import os
import sys
import argparse
import mw
try:
import OpenSSL
except:
mw.execShell("pip install pyopenssl")
import OpenSSL
try:
import dns.resolver
except:
mw.execShell("pip install dnspython")
import dns.resolver
# import http_requests as requests
# requests.DEFAULT_TYPE = 'curl'
import requests
requests.packages.urllib3.disable_warnings()
def echoErr(msg):
writeLog("\033[31m=" * 65)
writeLog("|-错误:{}\033[0m".format(msg))
exit()
def writeLog(log_str, mode="ab+"):
# 写日志
if __name__ == "__main__":
print(log_str)
return
_log_file = 'logs/letsencrypt.log'
f = open(_log_file, mode)
log_str += "\n"
f.write(log_str.encode('utf-8'))
f.close()
return True
class cert_request:
__debug = True
__user_agent = "MW-Panel"
__apis = None
__url = None
__replay_nonce = None
__acme_timeout = 30
__max_check_num = 5
__wait_time = 5
__bits = 2048
__digest = "sha256"
__verify = False
__config = {}
__dns_class = None
__auto_wildcard = False
__mod_index = {True: "Staging", False: "Production"}
__save_path = 'data/letsencrypt'
__cfg_file = 'data/letsencrypt.json'
def __init__(self):
if self.__debug:
self.__url = 'https://acme-staging-v02.api.letsencrypt.org/directory'
else:
self.__url = 'https://acme-v02.api.letsencrypt.org/directory'
self.__config = self.readConfig()
def D(self, name, val):
if self.__debug:
print('---------{} start--------'.format(name))
print(val)
print('---------{} end--------'.format(name))
# 读配置文件
def readConfig(self):
if not os.path.exists(self.__cfg_file):
self.__config['orders'] = {}
self.__config['account'] = {}
self.__config['apis'] = {}
self.__config['email'] = mw.M('users').where(
'id=?', (1,)).getField('email')
if self.__config['email'] in ['midoks@163.com']:
self.__config['email'] = None
self.saveConfig()
return self.__config
tmp_config = mw.readFile(self.__cfg_file)
if not tmp_config:
return self.__config
try:
self.__config = json.loads(tmp_config)
except:
self.saveConfig()
return self.__config
return self.__config
# 写配置文件
def saveConfig(self):
fp = open(self.__cfg_file, 'w+')
fcntl.flock(fp, fcntl.LOCK_EX) # 加锁
fp.write(json.dumps(self.__config))
fcntl.flock(fp, fcntl.LOCK_UN) # 解锁
fp.close()
return True
def getApis(self):
if not self.__apis:
# 尝试从配置文件中获取
api_index = self.__mod_index[self.__debug]
if not 'apis' in self.__config:
self.__config['apis'] = {}
if api_index in self.__config['apis']:
if 'expires' in self.__config['apis'][api_index] and 'directory' in self.__config['apis'][api_index]:
if time.time() < self.__config['apis'][api_index]['expires']:
self.__apis = self.__config['apis'][
api_index]['directory']
return self.__apis
# 尝试从官方获取获取
try:
res = mw.httpGet(self.__url)
result = json.loads(res)
self.__apis = {}
self.__apis['newAccount'] = result['newAccount']
self.__apis['newNonce'] = result['newNonce']
self.__apis['newOrder'] = result['newOrder']
self.__apis['revokeCert'] = result['revokeCert']
self.__apis['keyChange'] = result['keyChange']
# 保存到配置文件
self.__config['apis'][api_index] = {}
self.__config['apis'][api_index]['directory'] = self.__apis
self.__config['apis'][api_index]['expires'] = time.time() + \
86400 # 24小时后过期
self.saveConfig()
except Exception as e:
raise Exception(
'服务因维护而关闭或发生内部错误,查看 <a href="https://letsencrypt.status.io/" target="_blank" class="btlink">https://letsencrypt.status.io/</a> 了解更多详细信息。')
return self.__apis
# 系列化payload
def stringfyItems(self, payload):
if isinstance(payload, str):
return payload
for k, v in payload.items():
if isinstance(k, bytes):
k = k.decode("utf-8")
if isinstance(v, bytes):
v = v.decode("utf-8")
payload[k] = v
return payload
# 构造域名列表
def formatDomains(self, domains):
if type(domains) != list:
return []
# 是否自动构造通配符
if self.__auto_wildcard:
domains = self.autoWildcard(domains)
wildcard = []
tmp_domains = []
for domain in domains:
domain = domain.strip()
if domain in tmp_domains:
continue
# 将通配符域名转为验证正则表达式
f_index = domain.find("*.")
if f_index not in [-1, 0]:
continue
if f_index == 0:
wildcard.append(domain.replace(
"*", r"^[\w-]+").replace(".", r"\."))
# 添加到申请列表
tmp_domains.append(domain)
# 处理通配符包含
apply_domains = tmp_domains[:]
for domain in tmp_domains:
for w in wildcard:
if re.match(w, domain):
apply_domains.pop(domain)
return apply_domains
# 转为无填充的Base64
def calculateSafeBase64(self, un_encoded_data):
if sys.version_info[0] == 3:
if isinstance(un_encoded_data, str):
un_encoded_data = un_encoded_data.encode("utf8")
r = base64.urlsafe_b64encode(un_encoded_data).rstrip(b"=")
return r.decode("utf8")
# 创建Key
def createKey(self, key_type=OpenSSL.crypto.TYPE_RSA):
key = OpenSSL.crypto.PKey()
key.generate_key(key_type, self.__bits)
private_key = OpenSSL.crypto.dump_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key)
return private_key
# 获用户取密钥对
def getAccountKey(self):
if not 'account' in self.__config:
self.__config['account'] = {}
k = self.__mod_index[self.__debug]
if not k in self.__config['account']:
self.__config['account'][k] = {}
if not 'key' in self.__config['account'][k]:
self.__config['account'][k]['key'] = self.createKey()
if type(self.__config['account'][k]['key']) == bytes:
self.__config['account'][k]['key'] = self.__config[
'account'][k]['key'].decode()
self.saveConfig()
return self.__config['account'][k]['key']
# 注册acme帐户
def register(self, existing=False):
if not 'email' in self.__config:
self.__config['email'] = 'demo@bt.cn'
if existing:
payload = {"onlyReturnExisting": True}
elif self.__config['email']:
payload = {
"termsOfServiceAgreed": True,
"contact": ["mailto:{0}".format(self.__config['email'])],
}
else:
payload = {"termsOfServiceAgreed": True}
res = self.acmeRequest(url=self.__apis['newAccount'], payload=payload)
if res.status_code not in [201, 200, 409]:
raise Exception("注册ACME帐户失败: {}".format(res.json()))
kid = res.headers["Location"]
return kid
# 获取kid
def getKid(self, force=False):
# 如果配置文件中不存在kid或force = True时则重新注册新的acme帐户
if not 'account' in self.__config:
self.__config['account'] = {}
k = self.__mod_index[self.__debug]
if not k in self.__config['account']:
self.__config['account'][k] = {}
if not 'kid' in self.__config['account'][k]:
self.__config['account'][k]['kid'] = self.register()
self.saveConfig()
time.sleep(3)
self.__config = self.readConfig()
return self.__config['account'][k]['kid']
# 获取随机数
def getNonce(self, force=False):
# 如果没有保存上一次的随机数或force=True时则重新获取新的随机数
if not self.__replay_nonce or force:
headers = {"User-Agent": self.__user_agent}
response = requests.get(
self.__apis['newNonce'],
timeout=self.__acme_timeout,
headers=headers,
verify=self.__verify
)
self.__replay_nonce = response.headers["Replay-Nonce"]
return self.__replay_nonce
# 获请ACME请求头
def getAcmeHeader(self, url):
header = {"alg": "RS256", "nonce": self.getNonce(), "url": url}
if url in [self.__apis['newAccount'], 'GET_THUMBPRINT']:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
private_key = serialization.load_pem_private_key(
self.getAccountKey().encode(),
password=None,
backend=default_backend(),
)
public_key_public_numbers = private_key.public_key().public_numbers()
exponent = "{0:x}".format(public_key_public_numbers.e)
exponent = "0{0}".format(exponent) if len(
exponent) % 2 else exponent
modulus = "{0:x}".format(public_key_public_numbers.n)
jwk = {
"kty": "RSA",
"e": self.calculateSafeBase64(binascii.unhexlify(exponent)),
"n": self.calculateSafeBase64(binascii.unhexlify(modulus)),
}
header["jwk"] = jwk
else:
header["kid"] = self.getKid()
return header
# 计算signature
def signMessage(self, message):
pk = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, self.getAccountKey().encode())
return OpenSSL.crypto.sign(pk, message.encode("utf8"), self.__digest)
def getSiteRunPathByid(self, site_id):
'''
@name 通过site_id获取网站运行目录
@author hwliang
@param site_id<int> 网站标识
@return None or string
'''
if mw.M('sites').where('id=? and project_type=?', (site_id, 'PHP')).count() >= 1:
site_path = public.M('sites').where(
'id=?', site_id).getField('path')
if not site_path:
return None
if not os.path.exists(site_path):
return None
args = public.dict_obj()
args.id = site_id
import panelSite
run_path = panelSite.panelSite().GetRunPath(args)
if run_path in ['/']:
run_path = ''
if run_path:
if run_path[0] == '/':
run_path = run_path[1:]
site_run_path = os.path.join(site_path, run_path)
if not os.path.exists(site_run_path):
return site_path
return site_run_path
else:
return False
def getSiteRunPath(self, domains):
'''
@name 通过域名列表获取网站运行目录
@author hwliang
@param domains<list> 域名列表
@return None or string
'''
site_id = 0
for domain in domains:
site_id = mw.M('domain').where(
"name=?", domain).getField('pid')
if site_id:
break
if not site_id:
return None
return self.getSiteRunPathByid(site_id)
# 清理验证文件
def claerAuthFile(self, index):
if not self.__config['orders'][index]['auth_type'] in ['http', 'tls']:
return True
acme_path = '{}/.well-known/acme-challenge'.format(
self.__config['orders'][index]['auth_to'])
writeLog("|-验证目录:{}".format(acme_path))
if os.path.exists(acme_path):
mw.execShell("rm -f {}/*".format(acme_path))
acme_path = mw.getServerDir() + '/stop/.well-known/acme-challenge'
if os.path.exists(acme_path):
mw.execShell("rm -f {}/*".format(acme_path))
# 获取域名验证方式
def getAuthType(self, index):
if not index in self.__config['orders']:
raise Exception('指定订单不存在!')
s_type = 'http-01'
if 'auth_type' in self.__config['orders'][index]:
if self.__config['orders'][index]['auth_type'] == 'dns':
s_type = 'dns-01'
elif self.__config['orders'][index]['auth_type'] == 'tls':
s_type = 'tls-alpn-01'
else:
s_type = 'http-01'
return s_type
# 构造验证信息
def getIdentifierAuth(self, index, url, auth_info):
s_type = self.getAuthType(index)
writeLog("|-验证类型:{}".format(s_type))
domain = auth_info['identifier']['value']
wildcard = False
# 处理通配符
if 'wildcard' in auth_info:
wildcard = auth_info['wildcard']
if wildcard:
domain = "*." + domain
for auth in auth_info['challenges']:
if auth['type'] != s_type:
continue
identifier_auth = {
"domain": domain,
"url": url,
"wildcard": wildcard,
"token": auth['token'],
"dns_challenge_url": auth['url'],
}
return identifier_auth
return None
# 构造域名验证头和验证值
def getKeyauthorization(self, token):
acme_header_jwk_json = json.dumps(
self.getAcmeHeader("GET_THUMBPRINT")["jwk"], sort_keys=True, separators=(",", ":")
)
acme_thumbprint = self.calculateSafeBase64(
hashlib.sha256(acme_header_jwk_json.encode("utf8")).digest()
)
acme_keyauthorization = "{0}.{1}".format(token, acme_thumbprint)
base64_of_acme_keyauthorization = self.calculateSafeBase64(
hashlib.sha256(acme_keyauthorization.encode("utf8")).digest()
)
return acme_keyauthorization, base64_of_acme_keyauthorization
# 写验证文件
def writeAuthFile(self, auth_to, token, acme_keyauthorization):
try:
self.D('writeAuthFile', auth_to)
acme_path = '{}/.well-known/acme-challenge'.format(auth_to)
if not os.path.exists(acme_path):
os.makedirs(acme_path)
mw.setOwn(acme_path, 'www')
wellknown_path = '{}/{}'.format(acme_path, token)
mw.writeFile(wellknown_path, acme_keyauthorization)
mw.setOwn(wellknown_path, 'www')
acme_path = mw.getServerDir() + '/stop/.well-known/acme-challenge'
if not os.path.exists(acme_path):
os.makedirs(acme_path)
mw.setOwn(acme_path, 'www')
wellknown_path = '{}/{}'.format(acme_path, token)
mw.writeFile(wellknown_path, acme_keyauthorization)
mw.setOwn(wellknown_path, 'www')
return True
except:
err = mw.getTracebackInfo()
print(err)
raise Exception("写入验证文件失败: {}".format(err))
# 设置验证信息
def setAuthInfo(self, identifier_auth):
# 是否手动验证DNS
if identifier_auth['auth_to'] == 'dns':
return None
# 是否文件验证
if identifier_auth['type'] in ['http', 'tls']:
self.writeAuthFile(identifier_auth['auth_to'], identifier_auth[
'token'], identifier_auth['acme_keyauthorization'])
# 获取验证信息
def getAuths(self, index):
if not index in self.__config['orders']:
raise Exception('指定订单不存在!')
# 检查是否已经获取过授权信息
if 'auths' in self.__config['orders'][index]:
# 检查授权信息是否过期
if time.time() < self.__config['orders'][index]['auths'][0]['expires']:
return self.__config['orders'][index]['auths']
if self.__config['orders'][index]['auth_type'] != 'dns':
site_run_path = self.getSiteRunPath(
self.__config['orders'][index]['domains'])
if site_run_path:
self.__config['orders'][index]['auth_to'] = site_run_path
# 清理旧验证
self.claerAuthFile(index)
auths = []
for auth_url in self.__config['orders'][index]['authorizations']:
res = self.acmeRequest(auth_url, "")
if res.status_code not in [200, 201]:
raise Exception("获取授权失败: {}".format(res.json()))
s_body = res.json()
if 'status' in s_body:
if s_body['status'] in ['invalid']:
raise Exception("无效订单,此订单当前为验证失败状态!")
if s_body['status'] in ['valid']: # 跳过无需验证的域名
continue
s_body['expires'] = self.utcToTime(s_body['expires'])
identifier_auth = self.getIdentifierAuth(index, auth_url, s_body)
if not identifier_auth:
raise Exception("验证信息构造失败!{}")
acme_keyauthorization, auth_value = self.getKeyauthorization(identifier_auth[
'token'])
identifier_auth['acme_keyauthorization'] = acme_keyauthorization
identifier_auth['auth_value'] = auth_value
identifier_auth['expires'] = s_body['expires']
identifier_auth['auth_to'] = self.__config[
'orders'][index]['auth_to']
identifier_auth['type'] = self.__config[
'orders'][index]['auth_type']
# print(identifier_auth)
# 设置验证信息
self.setAuthInfo(identifier_auth)
auths.append(identifier_auth)
self.__config['orders'][index]['auths'] = auths
self.saveConfig()
return auths
# 更新随机数
def updateReplayNonce(self, res):
replay_nonce = res.headers.get('Replay-Nonce')
if replay_nonce:
self.__replay_nonce = replay_nonce
# 请求到ACME接口
def acmeRequest(self, url, payload):
headers = {"User-Agent": self.__user_agent}
payload = self.stringfyItems(payload)
if payload == "":
payload64 = payload
else:
payload64 = self.calculateSafeBase64(json.dumps(payload))
protected = self.getAcmeHeader(url)
protected64 = self.calculateSafeBase64(json.dumps(protected))
signature = self.signMessage(
message="{0}.{1}".format(protected64, payload64)) # bytes
signature64 = self.calculateSafeBase64(signature) # str
data = json.dumps(
{"protected": protected64, "payload": payload64, "signature": signature64})
headers.update({"Content-Type": "application/jose+json"})
response = requests.post(
url, data=data.encode("utf8"), timeout=self.__acme_timeout, headers=headers, verify=self.__verify
)
# 更新随机数
self.updateReplayNonce(response)
return response
# UTC时间转时间戳
def utcToTime(self, utc_string):
try:
utc_string = utc_string.split('.')[0]
utc_date = datetime.datetime.strptime(
utc_string, "%Y-%m-%dT%H:%M:%S")
# 按北京时间返回
return int(time.mktime(utc_date.timetuple())) + (3600 * 8)
except:
return int(time.time() + 86400 * 7)
# 保存订单
def saveOrder(self, order_object, index):
if not 'orders' in self.__config:
self.__config['orders'] = {}
renew = False
if not index:
index = mw.md5(json.dumps(order_object['identifiers']))
else:
renew = True
order_object['certificate_url'] = self.__config[
'orders'][index]['certificate_url']
order_object['save_path'] = self.__config[
'orders'][index]['save_path']
order_object['expires'] = self.utcToTime(order_object['expires'])
self.__config['orders'][index] = order_object
self.__config['orders'][index]['index'] = index
if not renew:
self.__config['orders'][index]['create_time'] = int(time.time())
self.__config['orders'][index]['renew_time'] = 0
self.saveConfig()
return index
def getError(self, error):
# 格式化错误输出
if error.find("Max checks allowed") >= 0:
return "CA无法验证您的域名,请检查域名解析是否正确,或等待5-10分钟后重试."
elif error.find("Max retries exceeded with") >= 0 or error.find('status_code=0 ') != -1:
return "CA服务器连接超时,请稍候重试."
elif error.find("The domain name belongs") >= 0:
return "域名不属于此DNS服务商,请确保域名填写正确."
elif error.find('login token ID is invalid') >= 0:
return 'DNS服务器连接失败,请检查密钥是否正确.'
elif error.find('Error getting validation data') != -1:
return '数据验证失败,CA无法从验证连接中获到正确的验证码.'
elif "too many certificates already issued for exact set of domains" in error:
return '签发失败,该域名%s超出了每周的重复签发次数限制!' % re.findall("exact set of domains: (.+):", error)
elif "Error creating new account :: too many registrations for this IP" in error:
return '签发失败,当前服务器IP已达到每3小时最多创建10个帐户的限制.'
elif "DNS problem: NXDOMAIN looking up A for" in error:
return '验证失败,没有解析域名,或解析未生效!'
elif "Invalid response from" in error:
return '验证失败,域名解析错误或验证URL无法被访问!'
elif error.find('TLS Web Server Authentication') != -1:
return "连接CA服务器失败,请稍候重试."
elif error.find('Name does not end in a public suffix') != -1:
return "不支持的域名%s,请检查域名是否正确!" % re.findall("Cannot issue for \"(.+)\":", error)
elif error.find('No valid IP addresses found for') != -1:
return "域名%s没有找到解析记录,请检查域名是否解析生效!" % re.findall("No valid IP addresses found for (.+)", error)
elif error.find('No TXT record found at') != -1:
return "没有在域名%s中找到有效的TXT解析记录,请检查是否正确解析TXT记录,如果是DNSAPI方式申请的,请10分钟后重试!" % re.findall("No TXT record found at (.+)", error)
elif error.find('Incorrect TXT record') != -1:
return "%s上发现错误的TXT记录:%s,请检查TXT解析是否正确,如果是DNSAPI方式申请的,请10分钟后重试!" % (re.findall("found at (.+)", error), re.findall("Incorrect TXT record \"(.+)\"", error))
elif error.find('Domain not under you or your user') != -1:
return "这个dnspod账户下面不存在这个域名,添加解析失败!"
elif error.find('SERVFAIL looking up TXT for') != -1:
return "没有在域名%s中找到有效的TXT解析记录,请检查是否正确解析TXT记录,如果是DNSAPI方式申请的,请10分钟后重试!" % re.findall("looking up TXT for (.+)", error)
elif error.find('Timeout during connect') != -1:
return "连接超时,CA服务器无法访问您的网站!"
elif error.find("DNS problem: SERVFAIL looking up CAA for") != -1:
return "域名%s当前被要求验证CAA记录,请手动解析CAA记录,或1小时后重新尝试申请!" % re.findall("looking up CAA for (.+)", error)
elif error.find("Read timed out.") != -1:
return "验证超时,请检查域名是否正确解析,若已正确解析,可能服务器与Let'sEncrypt连接异常,请稍候再重试!"
elif error.find('Cannot issue for') != -1:
return "无法为{}颁发证书,不能直接用域名后缀申请通配符证书!".format(re.findall(r'for\s+"(.+)"', error))
elif error.find('too many failed authorizations recently'):
return '该帐户1小时内失败的订单次数超过5次,请等待1小时再重试!'
elif error.find("Error creating new order") != -1:
return "订单创建失败,请稍候重试!"
elif error.find("Too Many Requests") != -1:
return "1小时内超过5次验证失败,暂时禁止申请,请稍候重试!"
elif error.find('HTTP Error 400: Bad Request') != -1:
return "CA服务器拒绝访问,请稍候重试!"
elif error.find('Temporary failure in name resolution') != -1:
return '服务器DNS故障,无法解析域名,请使用Linux工具箱检查dns配置'
elif error.find('Too Many Requests') != -1:
return '该域名请求申请次数过多,请3小时后重试'
else:
return error
# 创建订单
def createOrder(self, domains, auth_type, auth_to, index=None):
domains = self.formatDomains(domains)
if not domains:
raise Exception("至少需要有一个域名")
# 构造标识
identifiers = []
for domain_name in domains:
identifiers.append({"type": 'dns', "value": domain_name})
payload = {"identifiers": identifiers}
# 请求创建订单
res = self.acmeRequest(self.__apis['newOrder'], payload)
if not res.status_code in [201]: # 如果创建失败
e_body = res.json()
if 'type' in e_body:
# 如果随机数失效
if e_body['type'].find('error:badNonce') != -1:
self.getNonce(force=True)
res = self.acmeRequest(self.__apis['newOrder'], payload)
# 如果帐户失效
if e_body['detail'].find('KeyID header contained an invalid account URL') != -1:
k = self._mod_index[self.__debug]
del(self.__config['account'][k])
self.getKid()
self.getNonce(force=True)
res = self.acmeRequest(self.__apis['newOrder'], payload)
if not res.status_code in [201]:
a_auth = res.json()
ret_title = self.getError(str(a_auth))
raise StopIteration("{0} >>>> {1}".format(
ret_title, json.dumps(a_auth)))
# 返回验证地址和验证
s_json = res.json()
s_json['auth_type'] = auth_type
s_json['domains'] = domains
s_json['auth_to'] = auth_to
index = self.saveOrder(s_json, index)
return index
# 检查验证状态
def checkAuthStatus(self, url, desired_status=None):
desired_status = desired_status or ["pending", "valid", "invalid"]
number_of_checks = 0
while True:
if desired_status == ['valid', 'invalid']:
writeLog("|-第{}次查询验证结果..".format(number_of_checks + 1))
time.sleep(self.__wait_time)
check_authorization_status_response = self.acmeRequest(url, "")
a_auth = check_authorization_status_response.json()
authorization_status = a_auth["status"]
number_of_checks += 1
if authorization_status in desired_status:
if authorization_status == "invalid":
writeLog("|-验证失败!")
try:
if 'error' in a_auth['challenges'][0]:
ret_title = a_auth['challenges'][
0]['error']['detail']
elif 'error' in a_auth['challenges'][1]:
ret_title = a_auth['challenges'][
1]['error']['detail']
elif 'error' in a_auth['challenges'][2]:
ret_title = a_auth['challenges'][
2]['error']['detail']
else:
ret_title = str(a_auth)
ret_title = self.getError(ret_title)
except:
ret_title = str(a_auth)
raise StopIteration("{0} >>>> {1}".format(
ret_title, json.dumps(a_auth)))
break
if number_of_checks == self._max_check_num:
raise StopIteration(
"错误:已尝试验证{0}次. 最大验证次数为{1}. 验证时间间隔为{2}秒.".format(
number_of_checks,
self.__max_check_num,
self.__wait_time
)
)
if desired_status == ['valid', 'invalid']:
writeLog("|-验证成功!")
return check_authorization_status_response
# 发送验证请求
def respondToChallenge(self, auth):
payload = {"keyAuthorization": "{0}".format(
auth['acme_keyauthorization'])}
respond_to_challenge_response = self.acmeRequest(
auth['dns_challenge_url'], payload)
return respond_to_challenge_response
def authDomain(self, index):
if not index in self.__config['orders']:
raise Exception('指定订单不存在!')
# 开始验证
for auth in self.__config['orders'][index]['auths']:
res = self.checkAuthStatus(auth['url']) # 检查是否需要验证
if res.json()['status'] == 'pending':
if auth['type'] == 'dns': # 尝试提前验证dns解析
self.check_dns(
"_acme-challenge.{}".format(
auth['domain'].replace('*.', '')),
auth['auth_value'],
"TXT"
)
self.respondToChallenge(auth)
# 检查验证结果
for i in range(len(self.__config['orders'][index]['auths'])):
self.checkAuthStatus(self.__config['orders'][index]['auths'][i]['url'], [
'valid', 'invalid'])
self.__config['orders'][index]['status'] = 'valid'
# 构造可选名称
def getAltNames(self, index):
domain_name = self.__config['orders'][index]['domains'][0]
domain_alt_names = []
if len(self.__config['orders'][index]['domains']) > 1:
domain_alt_names = self.__config['orders'][index]['domains'][1:]
return domain_name, domain_alt_names
# 创建CSR
def createCsr(self, index):
if 'csr' in self.__config['orders'][index]:
return self.__config['orders']['csr']
domain_name, domain_alt_names = self.getAltNames(index)
X509Req = OpenSSL.crypto.X509Req()
X509Req.get_subject().CN = domain_name
if domain_alt_names:
SAN = "DNS:{0}, ".format(domain_name).encode("utf8") + ", ".join(
"DNS:" + i for i in domain_alt_names
).encode("utf8")
else:
SAN = "DNS:{0}".format(domain_name).encode("utf8")
X509Req.add_extensions(
[
OpenSSL.crypto.X509Extension(
"subjectAltName".encode("utf8"), critical=False, value=SAN
)
]
)
pk = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, self.createCertificateKey(
index).encode()
)
X509Req.set_pubkey(pk)
X509Req.set_version(2)
X509Req.sign(pk, self.__digest)
return OpenSSL.crypto.dump_certificate_request(OpenSSL.crypto.FILETYPE_ASN1, X509Req)
# 获取证书密钥对
def createCertificateKey(self, index):
# 判断是否已经创建private_key
if 'private_key' in self.__config['orders'][index]:
return self.__config['orders'][index]['private_key']
# 创建新的私钥
private_key = self.createKey()
if type(private_key) == bytes:
private_key = private_key.decode()
# 保存私钥到订单配置文件
self.__config['orders'][index]['private_key'] = private_key
self.saveConfig()
return private_key
# 发送CSR
def sendCsr(self, index):
csr = self.createCsr(index)
payload = {"csr": self.calculateSafeBase64(csr)}
send_csr_response = self.acmeRequest(
url=self.__config['orders'][index]['finalize'], payload=payload)
if send_csr_response.status_code not in [200, 201]:
raise ValueError(
"错误: 发送CSR: 响应状态{status_code} 响应值:{response}".format(
status_code=send_csr_response.status_code,
response=send_csr_response.json(),
)
)
send_csr_response_json = send_csr_response.json()
certificate_url = send_csr_response_json["certificate"]
self.__config['orders'][index]['certificate_url'] = certificate_url
self.saveConfig()
return certificate_url
# 转换时间
def strfDate(self, sdate):
return time.strftime('%Y-%m-%d', time.strptime(sdate, '%Y%m%d%H%M%S'))
# 证书转为DER
def dumpDer(self, cert_path):
cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, public.readFile(cert_path + '/cert.csr'))
return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)
# 证书转为pkcs12
def dumpPkcs12(self, key_pem=None, cert_pem=None, ca_pem=None, friendly_name=None):
p12 = OpenSSL.crypto.PKCS12()
if cert_pem:
p12.set_certificate(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert_pem.encode()))
if key_pem:
p12.set_privatekey(OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key_pem.encode()))
if ca_pem:
p12.set_ca_certificates((OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, ca_pem.encode()),))
if friendly_name:
p12.set_friendlyname(friendly_name.encode())
return p12.export()
# 拆分根证书
def splitCaData(self, cert):
sp_key = '-----END CERTIFICATE-----\n'
datas = cert.split(sp_key)
return {"cert": datas[0] + sp_key, "root": sp_key.join(datas[1:])}
# 获取指定证书基本信息
def getCertInit(self, pem_file):
if not os.path.exists(pem_file):
return None
try:
result = {}
x509 = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, public.readFile(pem_file))
# 取产品名称
issuer = x509.get_issuer()
result['issuer'] = ''
if hasattr(issuer, 'CN'):
result['issuer'] = issuer.CN
if not result['issuer']:
is_key = [b'0', '0']
issue_comp = issuer.get_components()
if len(issue_comp) == 1:
is_key = [b'CN', 'CN']
for iss in issue_comp:
if iss[0] in is_key:
result['issuer'] = iss[1].decode()
break
# 取到期时间
result['notAfter'] = self.strfDate(
bytes.decode(x509.get_notAfter())[:-1])
# 取申请时间
result['notBefore'] = self.strfDate(
bytes.decode(x509.get_notBefore())[:-1])
# 取可选名称
result['dns'] = []
for i in range(x509.get_extension_count()):
s_name = x509.get_extension(i)
if s_name.get_short_name() in [b'subjectAltName', 'subjectAltName']:
s_dns = str(s_name).split(',')
for d in s_dns:
result['dns'].append(d.split(':')[1])
subject = x509.get_subject().get_components()
# 取主要认证名称
if len(subject) == 1:
result['subject'] = subject[0][1].decode()
else:
result['subject'] = result['dns'][0]
return result
except:
return None
# 替换服务器上的同域名同品牌证书
def subAllCert(self, key_file, pem_file):
cert_init = self.get_cert_init(pem_file) # 获取新证书的基本信息
paths = ['/www/server/mdserver-web/vhost/cert',
'/www/server/mdserver-web/vhost/ssl',
'/www/server/mdserver-web']
is_panel = False
for path in paths:
if not os.path.exists(path):
continue
for p_name in os.listdir(path):
to_path = path + '/' + p_name
to_pem_file = to_path + '/fullchain.pem'
to_key_file = to_path + '/privkey.pem'
to_info = to_path + '/info.json'
# 判断目标证书是否存在
if not os.path.exists(to_pem_file):
if not p_name in ['ssl']:
continue
to_pem_file = to_path + '/certificate.pem'
to_key_file = to_path + '/privateKey.pem'
if not os.path.exists(to_pem_file):
continue
if path == paths[-1]:
is_panel = True
# 获取目标证书的基本信息
to_cert_init = self.getCertInit(to_pem_file)
# 判断证书品牌是否一致
try:
if to_cert_init['issuer'] != cert_init['issuer'] and to_cert_init['issuer'].find("Let's Encrypt") == -1 and to_cert_init['issuer'] != 'R3':
continue
except:
continue
# 判断目标证书的到期时间是否较早
if to_cert_init['notAfter'] > cert_init['notAfter']:
continue
# 判断认识名称是否一致
if len(to_cert_init['dns']) != len(cert_init['dns']):
continue
is_copy = True
for domain in to_cert_init['dns']:
if not domain in cert_init['dns']:
is_copy = False
if not is_copy:
continue
# 替换新的证书文件和基本信息
mw.writeFile(to_pem_file, mw.readFile(pem_file, 'rb'), 'wb')
mw.writeFile(to_key_file, mw.readFile(key_file, 'rb'), 'wb')
mw.writeFile(to_info, json.dumps(cert_init))
writeLog("|-检测到{}下的证书与本次申请的证书重叠,且到期时间较早,已替换为新证书!".format(to_path))
# 重载web服务
mw.restartWeb()
# 保存证书到文件
def saveCert(self, cert, index):
try:
domain_name = self.__config['orders'][index]['domains'][0]
path = self.__config['orders'][index]['save_path']
if not os.path.exists(path):
os.makedirs(path, 384)
# 存储证书
key_file = path + "/privkey.pem"
pem_file = path + "/fullchain.pem"
mw.writeFile(key_file, cert['private_key'])
mw.writeFile(pem_file, cert['cert'] + cert['root'])
mw.writeFile(path + "/cert.csr", cert['cert'])
mw.writeFile(path + "/root_cert.csr", cert['root'])
# 转为IIS证书
pfx_buffer = self.dumpPkcs12(
cert['private_key'], cert['cert'] + cert['root'], cert['root'], domain_name)
mw.writeFile(path + "/fullchain.pfx", pfx_buffer, 'wb+')
ps = '''文件说明:
privkey.pem 证书私钥
fullchain.pem 包含证书链的PEM格式证书(nginx/apache)
root_cert.csr 根证书
cert.csr 域名证书
fullchain.pfx 用于IIS的证书格式
如何在MW面板使用
privkey.pem 粘贴到密钥输入框
fullchain.pem 粘贴到证书输入框
'''
mw.writeFile(path + '/readme.txt', ps)
self.subAllCert(key_file, pem_file)
except:
writeLog(public.getTracebackInfo())
# 获取证书到期时间
def getCertTimeout(self, cret_data):
try:
x509 = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, cret_data)
cert_timeout = bytes.decode(x509.get_notAfter())[:-1]
return int(time.mktime(time.strptime(cert_timeout, '%Y%m%d%H%M%S')))
except:
return int(time.time() + (86400 * 90))
# 下载证书
def downloadCert(self, index):
res = self.acmeRequest(
self.__config['orders'][index]['certificate_url'], "")
if res.status_code not in [200, 201]:
raise Exception("下载证书失败: {}".format(res.json()))
pem_certificate = res.content
if type(pem_certificate) == bytes:
pem_certificate = pem_certificate.decode('utf-8')
cert = self.splitCaData(pem_certificate)
cert['cert_timeout'] = self.getCertTimeout(cert['cert'])
cert['private_key'] = self.__config['orders'][index]['private_key']
cert['domains'] = self.__config['orders'][index]['domains']
del(self.__config['orders'][index]['private_key'])
del(self.__config['orders'][index]['auths'])
del(self.__config['orders'][index]['expires'])
del(self.__config['orders'][index]['authorizations'])
del(self.__config['orders'][index]['finalize'])
del(self.__config['orders'][index]['identifiers'])
if 'cert' in self.__config['orders'][index]:
del(self.__config['orders'][index]['cert'])
self.__config['orders'][index]['status'] = 'valid'
self.__config['orders'][index]['cert_timeout'] = cert['cert_timeout']
domain_name = self.__config['orders'][index]['domains'][0]
self.__config['orders'][index]['save_path'] = '{}/{}'.format(
self.__save_path, domain_name)
cert['save_path'] = self.__config['orders'][index]['save_path']
self.saveConfig()
self.saveCert(cert, index)
return cert
def applyCert(self, domains, auth_type='dns', auth_to='Dns_com|None|None', args={}):
writeLog("", "wb+")
try:
self.getApis()
index = None
if args.index:
index = args.index
if not index:
# 判断是否只想验证域名
writeLog("|-正在创建订单..")
index = self.createOrder(domains, auth_type, auth_to)
writeLog("|-正在获取验证信息..")
self.getAuths(index)
if auth_to == 'dns' and len(self.__config['orders'][index]['auths']) > 0:
return self.__config['orders'][index]
writeLog("|-正在验证域名..")
self.authDomain(index)
writeLog("|-正在发送CSR..")
self.sendCsr(index)
writeLog("|-正在下载证书..")
cert = self.downloadCert(index)
# 保存证书配置
self.saveConfig()
cert['status'] = True
cert['msg'] = '申请成功!'
writeLog("|-申请成功,正在部署到站点..")
return cert
except Exception as ex:
ex = str(ex)
if ex.find(">>>>") != -1:
msg = ex.split(">>>>")
msg[1] = json.loads(msg[1])
else:
msg = ex
writeLog(mw.getTracebackInfo())
return mw.returnJson(False, msg)
def renewCert(self, index):
writeLog("", "wb+")
self.D('renew_cert', index)
try:
order_index = []
if index:
if type(index) != str:
index = index.index
if not index in self.__config['orders']:
raise Exception("指定订单号不存在,无法续签!")
order_index.append(index)
self.D('renew_cert', order_index)
except Exception as e:
start_time = time.time() + (30 * 86400)
if not 'orders' in self.__config:
self.__config['orders'] = {}
self.D('renew_cert', e)
def do(self, args):
cert = None
try:
if not args.index:
if not args.domains:
echoErr("请在--domain参数中指定要申请证书的域名,多个以逗号(,)隔开")
if not args.auth_type in ['http', 'tls', 'dns']:
echoErr("请在--type参数中指定正确的验证类型,支持dns和http")
auth_to = ''
if args.auth_type in ['http', 'tls']:
if not args.path:
echo_err("请在--path参数中指定网站根目录!")
if not os.path.exists(args.path):
echo_err("指定网站根目录不存在,请检查:{}".format(args.path))
auth_to = args.path
else:
if args.dnsapi == '0':
auth_to = 'dns'
else:
if not args.key:
echo_err("使用dnsapi申请时请在--dns_key参数中指定dnsapi的key!")
if not args.secret:
echo_err(
"使用dnsapi申请时请在--dns_secret参数中指定dnsapi的secret!")
auth_to = "{}|{}|{}".format(
args.dnsapi, args.key, args.secret)
domains = args.domains.strip().split(',')
cert = self.applyCert(
domains, auth_type=args.auth_type, auth_to=auth_to, args=args)
else:
# 重新验证
cert = self.applyCert([], auth_type='dns',
auth_to='dns', index=args.index)
except Exception as e:
writeLog("|-{}".format(mw.getTracebackInfo()))
exit()
if not cert:
exit()
# print(cert)
writeLog("=" * 65)
writeLog("|-证书获取成功!")
writeLog("=" * 65)
writeLog("证书到期时间: {}".format(
mw.formatDate(times=cert['cert_timeout'])))
writeLog("证书已保存在: {}/".format(cert['save_path']))
# exp:
'''
python3 class/core/cert_request.py --domain=dev38.cachecha.com --type=http --path=/www/wwwroot/dev38.cachecha.com
python3 class/core/cert_request.py --domain=dev38.cachecha.com --type=http --path=/Users/midoks/Desktop/mwdev/wwwroot/test
'''
if __name__ == "__main__":
p = argparse.ArgumentParser(usage="必要的参数:--domain 域名列表,多个以逗号隔开!")
p.add_argument('--domain', default=None,
help="请指定要申请证书的域名", dest="domains")
p.add_argument('--type', default=None, help="请指定验证类型", dest="auth_type")
p.add_argument('--path', default=None, help="请指定网站根目录", dest="path")
p.add_argument('--dnsapi', default=None, help="请指定DNSAPI", dest="dnsapi")
p.add_argument('--dns_key', default=None, help="请指定DNSAPI的key", dest="key")
p.add_argument('--dns_secret', default=None,
help="请指定DNSAPI的secret", dest="secret")
p.add_argument('--index', default=None, help="指定订单索引", dest="index")
p.add_argument('--renew', default=None, help="续签证书", dest="renew")
p.add_argument('--revoke', default=None, help="吊销证书", dest="revoke")
args = p.parse_args()
cr = cert_request()
if args.revoke:
if not args.index:
echoErr("请在--index参数中传入要被吊销的订单索引")
result = cr.revokeOrder(args.index)
writeLog(result)
exit()
if args.renew:
cr.renewCert(args.index)
exit()
cr.do(args)