mirror of https://github.com/midoks/mdserver-web
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1596 lines
67 KiB
1596 lines
67 KiB
# coding:utf-8
|
|
|
|
# ---------------------------------------------------------------------------------
|
|
# MW-Linux面板
|
|
# ---------------------------------------------------------------------------------
|
|
# copyright (c) 2018-∞(https://github.com/midoks/mdserver-web) All rights reserved.
|
|
# ---------------------------------------------------------------------------------
|
|
# Author: midoks <midoks@163.com>
|
|
# ---------------------------------------------------------------------------------
|
|
|
|
# ---------------------------------------------------------------------------------
|
|
# 证书申请
|
|
# ---------------------------------------------------------------------------------
|
|
|
|
import re
|
|
import fcntl
|
|
import datetime
|
|
import binascii
|
|
import hashlib
|
|
import base64
|
|
import json
|
|
import time
|
|
import os
|
|
import sys
|
|
import argparse
|
|
|
|
if os.path.exists('/www/server/mdserver-web'):
|
|
os.chdir('/www/server/mdserver-web')
|
|
|
|
import mw
|
|
|
|
try:
|
|
import OpenSSL
|
|
except:
|
|
mw.execShell("pip install pyopenssl")
|
|
import OpenSSL
|
|
|
|
|
|
def echoErr(msg):
|
|
writeLog("\033[31m=" * 65)
|
|
writeLog("|-错误:{}\033[0m".format(msg))
|
|
exit()
|
|
|
|
|
|
def writeLog(log_str, mode="ab+"):
|
|
# 写日志
|
|
if __name__ == "__main__":
|
|
print(log_str)
|
|
return
|
|
_log_file = 'logs/letsencrypt.log'
|
|
f = open(_log_file, mode)
|
|
log_str += "\n"
|
|
f.write(log_str.encode('utf-8'))
|
|
f.close()
|
|
return True
|
|
|
|
|
|
class cert_api:
|
|
__debug = False
|
|
__user_agent = "MW-Panel"
|
|
__apis = None
|
|
__url = None
|
|
__replay_nonce = None
|
|
__acme_timeout = 30
|
|
__max_check_num = 5
|
|
__wait_time = 5
|
|
__bits = 2048
|
|
__digest = "sha256"
|
|
__verify = False
|
|
__config = {}
|
|
__dns_class = None
|
|
__auto_wildcard = False
|
|
__mod_index = {True: "Staging", False: "Production"}
|
|
__save_path = 'data/letsencrypt'
|
|
__cfg_file = 'data/letsencrypt.json'
|
|
|
|
def __init__(self):
|
|
self.__user_agent = 'MW-Panel:' + mw.getRandomString(8)
|
|
self.__save_path = mw.getServerDir() + '/web_conf/letsencrypt'
|
|
if not os.path.exists(self.__save_path):
|
|
os.makedirs(self.__save_path)
|
|
if self.__debug:
|
|
self.__url = 'https://acme-staging-v02.api.letsencrypt.org/directory'
|
|
else:
|
|
self.__url = 'https://acme-v02.api.letsencrypt.org/directory'
|
|
self.__config = self.readConfig()
|
|
|
|
def D(self, name, val):
|
|
if self.__debug:
|
|
print('---------{} start--------'.format(name))
|
|
print(val)
|
|
print('---------{} end--------'.format(name))
|
|
|
|
# 读配置文件
|
|
def readConfig(self):
|
|
if not os.path.exists(self.__cfg_file):
|
|
self.__config['orders'] = {}
|
|
self.__config['account'] = {}
|
|
self.__config['apis'] = {}
|
|
self.__config['email'] = mw.M('users').where(
|
|
'id=?', (1,)).getField('email')
|
|
if self.__config['email'] in ['midoks@163.com']:
|
|
self.__config['email'] = None
|
|
self.saveConfig()
|
|
return self.__config
|
|
tmp_config = mw.readFile(self.__cfg_file)
|
|
if not tmp_config:
|
|
return self.__config
|
|
try:
|
|
self.__config = json.loads(tmp_config)
|
|
except:
|
|
self.saveConfig()
|
|
return self.__config
|
|
return self.__config
|
|
|
|
def saveConfig(self):
|
|
# 写配置文件
|
|
fp = open(self.__cfg_file, 'w+')
|
|
fcntl.flock(fp, fcntl.LOCK_EX) # 加锁
|
|
fp.write(json.dumps(self.__config))
|
|
fcntl.flock(fp, fcntl.LOCK_UN) # 解锁
|
|
fp.close()
|
|
return True
|
|
|
|
def createCertCron(self):
|
|
|
|
if mw.isAppleSystem():
|
|
print('在Macos上不支持创建证书自动续签任务!')
|
|
return ''
|
|
|
|
# 创建证书自动续签任务
|
|
try:
|
|
import crontab_api
|
|
api = crontab_api.crontab_api()
|
|
|
|
echo = mw.md5(mw.md5('panel_renew_lets_cron'))
|
|
cron_id = mw.M('crontab').where('echo=?', (echo,)).getField('id')
|
|
|
|
cron_path = mw.getServerDir() + '/cron'
|
|
if not os.path.exists(cron_path):
|
|
mw.execShell('mkdir -p ' + cron_path)
|
|
|
|
shell = 'python3 -u {}/class/core/cert_api.py --renew=1'.format(
|
|
mw.getRunDir())
|
|
|
|
logs_file = cron_path + '/' + echo + '.log'
|
|
|
|
cmd = '''#!/bin/bash
|
|
PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin
|
|
export PATH
|
|
|
|
dst_dir=%s
|
|
logs_file=%s
|
|
cd $dst_dir
|
|
|
|
if [ -f bin/activate ];then
|
|
source bin/activate
|
|
fi
|
|
|
|
''' % (mw.getRunDir(), logs_file)
|
|
cmd += 'echo "★【`date +"%Y-%m-%d %H:%M:%S"`】 STSRT★" >> $logs_file' + "\n"
|
|
cmd += 'echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" >> $logs_file' + "\n"
|
|
cmd += 'cd $dst_dir && ' + shell + ' >> $logs_file 2>&1' + "\n"
|
|
cmd += 'echo "【`date +"%Y-%m-%d %H:%M:%S"`】 END★" >> $logs_file' + "\n"
|
|
cmd += 'echo "<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" >> $logs_file' + "\n"
|
|
|
|
file = cron_path + '/' + echo
|
|
|
|
if type(cron_id) != int:
|
|
|
|
mw.writeFile(file, cmd)
|
|
mw.execShell('chmod 750 ' + file)
|
|
|
|
info = {}
|
|
info['type'] = 'day'
|
|
info['minute'] = '10'
|
|
info['hour'] = '0'
|
|
shell_cron, rinfo, name = api.getCrondCycle(info)
|
|
shell_cron += ' ' + cron_path + '/' + echo + \
|
|
' >> ' + logs_file + ' 2>&1'
|
|
|
|
api.writeShell(shell_cron)
|
|
|
|
insert_id = mw.M('crontab').add('name,type,where1,where_hour,where_minute,echo,addtime,status,save,backup_to,stype,sname,sbody,urladdress', (
|
|
"[勿删]续签Let's Encrypt证书", 'day', '', '0', '10', echo, time.strftime('%Y-%m-%d %X', time.localtime()), '1', '', 'localhost', 'toShell', '', cmd, ''))
|
|
|
|
if insert_id > 0:
|
|
print('创建证书自动续签任务成功!')
|
|
else:
|
|
mw.writeFile(file, cmd)
|
|
mw.execShell('chmod 750 ' + file)
|
|
mw.M('crontab').where('id=?', (cron_id)).save('sbody', (cmd,))
|
|
except Exception as e:
|
|
print(mw.getTracebackInfo())
|
|
|
|
def getApis(self):
|
|
if not self.__apis:
|
|
# 尝试从配置文件中获取
|
|
api_index = self.__mod_index[self.__debug]
|
|
if not 'apis' in self.__config:
|
|
self.__config['apis'] = {}
|
|
|
|
if api_index in self.__config['apis']:
|
|
if 'expires' in self.__config['apis'][api_index] and 'directory' in self.__config['apis'][api_index]:
|
|
if time.time() < self.__config['apis'][api_index]['expires']:
|
|
self.__apis = self.__config[
|
|
'apis'][api_index]['directory']
|
|
return self.__apis
|
|
|
|
# 尝试从官方获取获取
|
|
try:
|
|
res = mw.httpGet(self.__url)
|
|
result = json.loads(res)
|
|
self.__apis = {}
|
|
self.__apis['newAccount'] = result['newAccount']
|
|
self.__apis['newNonce'] = result['newNonce']
|
|
self.__apis['newOrder'] = result['newOrder']
|
|
self.__apis['revokeCert'] = result['revokeCert']
|
|
self.__apis['keyChange'] = result['keyChange']
|
|
|
|
# 保存到配置文件
|
|
self.__config['apis'][api_index] = {}
|
|
self.__config['apis'][api_index]['directory'] = self.__apis
|
|
self.__config['apis'][api_index]['expires'] = time.time() + \
|
|
86400 # 24小时后过期
|
|
self.saveConfig()
|
|
except Exception as e:
|
|
raise Exception(
|
|
'服务因维护而关闭或发生内部错误,查看 <a href="https://letsencrypt.status.io/" target="_blank" class="btlink">https://letsencrypt.status.io/</a> 了解更多详细信息。')
|
|
return self.__apis
|
|
|
|
# 系列化payload
|
|
def stringfyItems(self, payload):
|
|
if isinstance(payload, str):
|
|
return payload
|
|
|
|
for k, v in payload.items():
|
|
if isinstance(k, bytes):
|
|
k = k.decode("utf-8")
|
|
if isinstance(v, bytes):
|
|
v = v.decode("utf-8")
|
|
payload[k] = v
|
|
return payload
|
|
|
|
# 构造域名列表
|
|
def formatDomains(self, domains):
|
|
if type(domains) != list:
|
|
return []
|
|
# 是否自动构造通配符
|
|
if self.__auto_wildcard:
|
|
domains = self.autoWildcard(domains)
|
|
wildcard = []
|
|
tmp_domains = []
|
|
for domain in domains:
|
|
domain = domain.strip()
|
|
if domain in tmp_domains:
|
|
continue
|
|
# 将通配符域名转为验证正则表达式
|
|
f_index = domain.find("*.")
|
|
if f_index not in [-1, 0]:
|
|
continue
|
|
if f_index == 0:
|
|
wildcard.append(domain.replace(
|
|
"*", r"^[\w-]+").replace(".", r"\."))
|
|
# 添加到申请列表
|
|
tmp_domains.append(domain)
|
|
|
|
# 处理通配符包含
|
|
apply_domains = tmp_domains[:]
|
|
for domain in tmp_domains:
|
|
for w in wildcard:
|
|
if re.match(w, domain):
|
|
apply_domains.pop(domain)
|
|
|
|
return apply_domains
|
|
|
|
# 转为无填充的Base64
|
|
def calculateSafeBase64(self, un_encoded_data):
|
|
if sys.version_info[0] == 3:
|
|
if isinstance(un_encoded_data, str):
|
|
un_encoded_data = un_encoded_data.encode("utf8")
|
|
r = base64.urlsafe_b64encode(un_encoded_data).rstrip(b"=")
|
|
return r.decode("utf8")
|
|
|
|
# 创建Key
|
|
def createKey(self, key_type=OpenSSL.crypto.TYPE_RSA):
|
|
key = OpenSSL.crypto.PKey()
|
|
key.generate_key(key_type, self.__bits)
|
|
private_key = OpenSSL.crypto.dump_privatekey(
|
|
OpenSSL.crypto.FILETYPE_PEM, key)
|
|
return private_key
|
|
|
|
# 获用户取密钥对
|
|
def getAccountKey(self):
|
|
if not 'account' in self.__config:
|
|
self.__config['account'] = {}
|
|
k = self.__mod_index[self.__debug]
|
|
if not k in self.__config['account']:
|
|
self.__config['account'][k] = {}
|
|
|
|
if not 'key' in self.__config['account'][k]:
|
|
self.__config['account'][k]['key'] = self.createKey()
|
|
if type(self.__config['account'][k]['key']) == bytes:
|
|
self.__config['account'][k]['key'] = self.__config[
|
|
'account'][k]['key'].decode()
|
|
self.saveConfig()
|
|
return self.__config['account'][k]['key']
|
|
|
|
# 注册acme帐户
|
|
def register(self, existing=False):
|
|
if not 'email' in self.__config:
|
|
self.__config['email'] = 'mdioks@163.com'
|
|
if existing:
|
|
payload = {"onlyReturnExisting": True}
|
|
elif self.__config['email']:
|
|
payload = {
|
|
"termsOfServiceAgreed": True,
|
|
"contact": ["mailto:{0}".format(self.__config['email'])],
|
|
}
|
|
else:
|
|
payload = {"termsOfServiceAgreed": True}
|
|
|
|
res = self.acmeRequest(url=self.__apis['newAccount'], payload=payload)
|
|
|
|
if res.status not in [201, 200, 409]:
|
|
raise Exception("注册ACME帐户失败: {}".format(res.json()))
|
|
kid = res.headers["Location"]
|
|
return kid
|
|
|
|
# 获取kid
|
|
def getKid(self, force=False):
|
|
# 如果配置文件中不存在kid或force = True时则重新注册新的acme帐户
|
|
if not 'account' in self.__config:
|
|
self.__config['account'] = {}
|
|
k = self.__mod_index[self.__debug]
|
|
if not k in self.__config['account']:
|
|
self.__config['account'][k] = {}
|
|
|
|
if not 'kid' in self.__config['account'][k]:
|
|
self.__config['account'][k]['kid'] = self.register()
|
|
self.saveConfig()
|
|
time.sleep(3)
|
|
self.__config = self.readConfig()
|
|
return self.__config['account'][k]['kid']
|
|
|
|
def requestsGet(self, url, timeout):
|
|
try:
|
|
import urllib.request
|
|
import ssl
|
|
try:
|
|
ssl._create_default_https_context = ssl._create_unverified_context
|
|
except:
|
|
pass
|
|
|
|
headers = {"User-Agent": self.__user_agent}
|
|
req = urllib.request.Request(url=url, headers=headers)
|
|
req = urllib.request.urlopen(url, timeout=timeout)
|
|
return req
|
|
except Exception as ex:
|
|
raise Exception("requestsGet: {}".format(str(ex)))
|
|
|
|
def requestsPost(self, url, data, timeout):
|
|
try:
|
|
import urllib.request
|
|
import ssl
|
|
try:
|
|
ssl._create_default_https_context = ssl._create_unverified_context
|
|
except:
|
|
pass
|
|
|
|
headers = {"User-Agent": self.__user_agent}
|
|
headers.update({"Content-Type": "application/jose+json"})
|
|
data = bytes(data, encoding="utf8")
|
|
req = urllib.request.Request(
|
|
url, data, headers=headers, method='POST')
|
|
response = urllib.request.urlopen(req, timeout=timeout)
|
|
return response
|
|
except Exception as ex:
|
|
# self.getError()
|
|
raise Exception("异常: {}".format(self.getError(str(ex))))
|
|
|
|
def getRequestJson(self, response):
|
|
try:
|
|
data = response.read().decode('utf-8')
|
|
return json.loads(data)
|
|
except Exception as ex:
|
|
raise Exception("getRequestJson: {}".format(str(ex)))
|
|
|
|
def getNonce(self, force=False):
|
|
# 获取随机数
|
|
# 如果没有保存上一次的随机数或force=True时则重新获取新的随机数
|
|
if not self.__replay_nonce or force:
|
|
try:
|
|
response = self.requestsGet(
|
|
self.__apis['newNonce'], timeout=self.__acme_timeout)
|
|
self.__replay_nonce = response.headers["replay-nonce"]
|
|
except Exception as e:
|
|
raise Exception("获取随机数失败: {}".format(str(e)))
|
|
|
|
return self.__replay_nonce
|
|
|
|
# 获请ACME请求头
|
|
def getAcmeHeader(self, url):
|
|
nonce = self.getNonce()
|
|
|
|
header = {"alg": "RS256", "nonce": nonce, "url": url}
|
|
if url in [self.__apis['newAccount'], 'GET_THUMBPRINT']:
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import serialization
|
|
private_key = serialization.load_pem_private_key(
|
|
self.getAccountKey().encode(),
|
|
password=None,
|
|
backend=default_backend(),
|
|
)
|
|
public_key_public_numbers = private_key.public_key().public_numbers()
|
|
|
|
exponent = "{0:x}".format(public_key_public_numbers.e)
|
|
exponent = "0{0}".format(exponent) if len(
|
|
exponent) % 2 else exponent
|
|
modulus = "{0:x}".format(public_key_public_numbers.n)
|
|
jwk = {
|
|
"kty": "RSA",
|
|
"e": self.calculateSafeBase64(binascii.unhexlify(exponent)),
|
|
"n": self.calculateSafeBase64(binascii.unhexlify(modulus)),
|
|
}
|
|
header["jwk"] = jwk
|
|
else:
|
|
header["kid"] = self.getKid()
|
|
return header
|
|
|
|
# 计算signature
|
|
def signMessage(self, message):
|
|
pk = OpenSSL.crypto.load_privatekey(
|
|
OpenSSL.crypto.FILETYPE_PEM, self.getAccountKey().encode())
|
|
return OpenSSL.crypto.sign(pk, message.encode("utf8"), self.__digest)
|
|
|
|
def getSiteRunPathByid(self, site_id):
|
|
if mw.M('sites').where('id=?', (site_id,)).count() >= 1:
|
|
site_path = mw.M('sites').where('id=?', site_id).getField('path')
|
|
if not site_path:
|
|
return None
|
|
if not os.path.exists(site_path):
|
|
return None
|
|
return site_path
|
|
else:
|
|
return False
|
|
|
|
def getSiteRunPath(self, domains):
|
|
site_id = 0
|
|
for domain in domains:
|
|
site_id = mw.M('domain').where("name=?", domain).getField('pid')
|
|
if site_id:
|
|
break
|
|
|
|
if not site_id:
|
|
return None
|
|
return self.getSiteRunPathByid(site_id)
|
|
|
|
# 清理验证文件
|
|
def clearAuthFile(self, index):
|
|
if not self.__config['orders'][index]['auth_type'] in ['http', 'tls']:
|
|
return True
|
|
acme_path = '{}/.well-known/acme-challenge'.format(
|
|
self.__config['orders'][index]['auth_to'])
|
|
writeLog("|-验证目录:{}".format(acme_path))
|
|
if os.path.exists(acme_path):
|
|
mw.execShell("rm -f {}/*".format(acme_path))
|
|
|
|
acme_path = mw.getServerDir() + '/stop/.well-known/acme-challenge'
|
|
if os.path.exists(acme_path):
|
|
mw.execShell("rm -f {}/*".format(acme_path))
|
|
|
|
# 获取域名验证方式
|
|
def getAuthType(self, index):
|
|
if not index in self.__config['orders']:
|
|
raise Exception('指定订单不存在!')
|
|
s_type = 'http-01'
|
|
if 'auth_type' in self.__config['orders'][index]:
|
|
if self.__config['orders'][index]['auth_type'] == 'dns':
|
|
s_type = 'dns-01'
|
|
elif self.__config['orders'][index]['auth_type'] == 'tls':
|
|
s_type = 'tls-alpn-01'
|
|
else:
|
|
s_type = 'http-01'
|
|
return s_type
|
|
|
|
# 构造验证信息
|
|
def getIdentifierAuth(self, index, url, auth_info):
|
|
s_type = self.getAuthType(index)
|
|
writeLog("|-验证类型:{}".format(s_type))
|
|
domain = auth_info['identifier']['value']
|
|
wildcard = False
|
|
# 处理通配符
|
|
if 'wildcard' in auth_info:
|
|
wildcard = auth_info['wildcard']
|
|
if wildcard:
|
|
domain = "*." + domain
|
|
|
|
for auth in auth_info['challenges']:
|
|
if auth['type'] != s_type:
|
|
continue
|
|
identifier_auth = {
|
|
"domain": domain,
|
|
"url": url,
|
|
"wildcard": wildcard,
|
|
"token": auth['token'],
|
|
"dns_challenge_url": auth['url'],
|
|
}
|
|
return identifier_auth
|
|
return None
|
|
|
|
# 构造域名验证头和验证值
|
|
def getKeyauthorization(self, token):
|
|
acme_header_jwk_json = json.dumps(
|
|
self.getAcmeHeader("GET_THUMBPRINT")["jwk"], sort_keys=True, separators=(",", ":")
|
|
)
|
|
acme_thumbprint = self.calculateSafeBase64(
|
|
hashlib.sha256(acme_header_jwk_json.encode("utf8")).digest()
|
|
)
|
|
acme_keyauthorization = "{0}.{1}".format(token, acme_thumbprint)
|
|
base64_of_acme_keyauthorization = self.calculateSafeBase64(
|
|
hashlib.sha256(acme_keyauthorization.encode("utf8")).digest()
|
|
)
|
|
|
|
return acme_keyauthorization, base64_of_acme_keyauthorization
|
|
|
|
# 写验证文件
|
|
def writeAuthFile(self, auth_to, token, acme_keyauthorization):
|
|
try:
|
|
# self.D('writeAuthFile', auth_to)
|
|
acme_path = '{}/.well-known/acme-challenge'.format(auth_to)
|
|
if not os.path.exists(acme_path):
|
|
os.makedirs(acme_path)
|
|
mw.setOwn(acme_path, 'www')
|
|
wellknown_path = '{}/{}'.format(acme_path, token)
|
|
mw.writeFile(wellknown_path, acme_keyauthorization)
|
|
mw.setOwn(wellknown_path, 'www')
|
|
|
|
acme_path = mw.getServerDir() + '/stop/.well-known/acme-challenge'
|
|
if not os.path.exists(acme_path):
|
|
os.makedirs(acme_path)
|
|
mw.setOwn(acme_path, 'www')
|
|
wellknown_path = '{}/{}'.format(acme_path, token)
|
|
mw.writeFile(wellknown_path, acme_keyauthorization)
|
|
mw.setOwn(wellknown_path, 'www')
|
|
return True
|
|
except:
|
|
err = mw.getTracebackInfo()
|
|
raise Exception("写入验证文件失败: {}".format(err))
|
|
|
|
# 设置验证信息
|
|
def setAuthInfo(self, identifier_auth):
|
|
|
|
# 是否手动验证DNS
|
|
if identifier_auth['auth_to'] == 'dns':
|
|
return None
|
|
|
|
# 是否文件验证
|
|
if identifier_auth['type'] in ['http', 'tls']:
|
|
self.writeAuthFile(identifier_auth['auth_to'], identifier_auth[
|
|
'token'], identifier_auth['acme_keyauthorization'])
|
|
|
|
# 获取验证信息
|
|
def getAuths(self, index):
|
|
if not index in self.__config['orders']:
|
|
raise Exception('指定订单不存在!')
|
|
|
|
# 检查是否已经获取过授权信息
|
|
if 'auths' in self.__config['orders'][index]:
|
|
# 检查授权信息是否过期
|
|
if time.time() < self.__config['orders'][index]['auths'][0]['expires']:
|
|
return self.__config['orders'][index]['auths']
|
|
if self.__config['orders'][index]['auth_type'] != 'dns':
|
|
site_run_path = self.getSiteRunPath(
|
|
self.__config['orders'][index]['domains'])
|
|
if site_run_path:
|
|
self.__config['orders'][index]['auth_to'] = site_run_path
|
|
|
|
# 清理旧验证
|
|
self.clearAuthFile(index)
|
|
|
|
auths = []
|
|
for auth_url in self.__config['orders'][index]['authorizations']:
|
|
res = self.acmeRequest(auth_url, "")
|
|
if res.status not in [200, 201]:
|
|
raise Exception("获取授权失败: {}".format(res.json()))
|
|
|
|
s_body = self.getRequestJson(res)
|
|
if 'status' in s_body:
|
|
if s_body['status'] in ['invalid']:
|
|
raise Exception("无效订单,此订单当前为验证失败状态!")
|
|
if s_body['status'] in ['valid']: # 跳过无需验证的域名
|
|
continue
|
|
|
|
s_body['expires'] = self.utcToTime(s_body['expires'])
|
|
identifier_auth = self.getIdentifierAuth(index, auth_url, s_body)
|
|
if not identifier_auth:
|
|
raise Exception("验证信息构造失败!{}")
|
|
|
|
acme_keyauthorization, auth_value = self.getKeyauthorization(identifier_auth[
|
|
'token'])
|
|
identifier_auth['acme_keyauthorization'] = acme_keyauthorization
|
|
identifier_auth['auth_value'] = auth_value
|
|
identifier_auth['expires'] = s_body['expires']
|
|
identifier_auth['auth_to'] = self.__config[
|
|
'orders'][index]['auth_to']
|
|
identifier_auth['type'] = self.__config[
|
|
'orders'][index]['auth_type']
|
|
|
|
# print(identifier_auth)
|
|
# 设置验证信息
|
|
self.setAuthInfo(identifier_auth)
|
|
auths.append(identifier_auth)
|
|
self.__config['orders'][index]['auths'] = auths
|
|
self.saveConfig()
|
|
return auths
|
|
|
|
# 更新随机数
|
|
def updateReplayNonce(self, res):
|
|
replay_nonce = res.headers.get('replay-nonce')
|
|
if replay_nonce:
|
|
self.__replay_nonce = replay_nonce
|
|
|
|
# 请求到ACME接口
|
|
def acmeRequest(self, url, payload):
|
|
headers = {"User-Agent": self.__user_agent}
|
|
payload = self.stringfyItems(payload)
|
|
|
|
if payload == "":
|
|
payload64 = payload
|
|
else:
|
|
payload64 = self.calculateSafeBase64(json.dumps(payload))
|
|
protected = self.getAcmeHeader(url)
|
|
protected64 = self.calculateSafeBase64(json.dumps(protected))
|
|
signature = self.signMessage(
|
|
message="{0}.{1}".format(protected64, payload64)) # bytes
|
|
signature64 = self.calculateSafeBase64(signature) # str
|
|
data = json.dumps({
|
|
"protected": protected64,
|
|
"payload": payload64,
|
|
"signature": signature64
|
|
})
|
|
|
|
headers.update({"Content-Type": "application/jose+json"})
|
|
response = self.requestsPost(
|
|
url, data=data, timeout=self.__acme_timeout)
|
|
# 更新随机数
|
|
self.updateReplayNonce(response)
|
|
return response
|
|
|
|
# UTC时间转时间戳
|
|
def utcToTime(self, utc_string):
|
|
try:
|
|
utc_string = utc_string.split('.')[0]
|
|
utc_date = datetime.datetime.strptime(
|
|
utc_string, "%Y-%m-%dT%H:%M:%S")
|
|
# 按北京时间返回
|
|
return int(time.mktime(utc_date.timetuple())) + (3600 * 8)
|
|
except:
|
|
return int(time.time() + 86400 * 7)
|
|
|
|
# 保存订单
|
|
def saveOrder(self, order_object, index):
|
|
if not 'orders' in self.__config:
|
|
self.__config['orders'] = {}
|
|
renew = False
|
|
if not index:
|
|
index = mw.md5(json.dumps(order_object['identifiers']))
|
|
else:
|
|
renew = True
|
|
order_object['certificate_url'] = self.__config[
|
|
'orders'][index]['certificate_url']
|
|
order_object['save_path'] = self.__config[
|
|
'orders'][index]['save_path']
|
|
|
|
order_object['expires'] = self.utcToTime(order_object['expires'])
|
|
self.__config['orders'][index] = order_object
|
|
self.__config['orders'][index]['index'] = index
|
|
if not renew:
|
|
self.__config['orders'][index]['create_time'] = int(time.time())
|
|
self.__config['orders'][index]['renew_time'] = 0
|
|
self.saveConfig()
|
|
return index
|
|
|
|
def getError(self, error):
|
|
# 格式化错误输出
|
|
if error.find("Max checks allowed") >= 0:
|
|
return "CA无法验证您的域名,请检查域名解析是否正确,或等待5-10分钟后重试."
|
|
elif error.find("Max retries exceeded with") >= 0 or error.find('status_code=0 ') != -1:
|
|
return "CA服务器连接超时,请稍候重试."
|
|
elif error.find("The domain name belongs") >= 0:
|
|
return "域名不属于此DNS服务商,请确保域名填写正确."
|
|
elif error.find('login token ID is invalid') >= 0:
|
|
return 'DNS服务器连接失败,请检查密钥是否正确.'
|
|
elif error.find('Error getting validation data') != -1:
|
|
return '数据验证失败,CA无法从验证连接中获到正确的验证码.'
|
|
elif "too many certificates already issued for exact set of domains" in error:
|
|
return '签发失败,该域名%s超出了每周的重复签发次数限制!' % re.findall("exact set of domains: (.+):", error)
|
|
elif "Error creating new account :: too many registrations for this IP" in error:
|
|
return '签发失败,当前服务器IP已达到每3小时最多创建10个帐户的限制.'
|
|
elif "DNS problem: NXDOMAIN looking up A for" in error:
|
|
return '验证失败,没有解析域名,或解析未生效!'
|
|
elif "Invalid response from" in error:
|
|
return '验证失败,域名解析错误或验证URL无法被访问!'
|
|
elif error.find('TLS Web Server Authentication') != -1:
|
|
return "连接CA服务器失败,请稍候重试."
|
|
elif error.find('Name does not end in a public suffix') != -1:
|
|
return "不支持的域名%s,请检查域名是否正确!" % re.findall("Cannot issue for \"(.+)\":", error)
|
|
elif error.find('No valid IP addresses found for') != -1:
|
|
return "域名%s没有找到解析记录,请检查域名是否解析生效!" % re.findall("No valid IP addresses found for (.+)", error)
|
|
elif error.find('No TXT record found at') != -1:
|
|
return "没有在域名%s中找到有效的TXT解析记录,请检查是否正确解析TXT记录,如果是DNSAPI方式申请的,请10分钟后重试!" % re.findall("No TXT record found at (.+)", error)
|
|
elif error.find('Incorrect TXT record') != -1:
|
|
return "在%s上发现错误的TXT记录:%s,请检查TXT解析是否正确,如果是DNSAPI方式申请的,请10分钟后重试!" % (re.findall("found at (.+)", error), re.findall("Incorrect TXT record \"(.+)\"", error))
|
|
elif error.find('Domain not under you or your user') != -1:
|
|
return "这个dnspod账户下面不存在这个域名,添加解析失败!"
|
|
elif error.find('SERVFAIL looking up TXT for') != -1:
|
|
return "没有在域名%s中找到有效的TXT解析记录,请检查是否正确解析TXT记录,如果是DNSAPI方式申请的,请10分钟后重试!" % re.findall("looking up TXT for (.+)", error)
|
|
elif error.find('Timeout during connect') != -1:
|
|
return "连接超时,CA服务器无法访问您的网站!"
|
|
elif error.find("DNS problem: SERVFAIL looking up CAA for") != -1:
|
|
return "域名%s当前被要求验证CAA记录,请手动解析CAA记录,或1小时后重新尝试申请!" % re.findall("looking up CAA for (.+)", error)
|
|
elif error.find("Read timed out.") != -1:
|
|
return "验证超时,请检查域名是否正确解析,若已正确解析,可能服务器与Let'sEncrypt连接异常,请稍候再重试!"
|
|
elif error.find('Cannot issue for') != -1:
|
|
return "无法为{}颁发证书,不能直接用域名后缀申请通配符证书!".format(re.findall(r'for\s+"(.+)"', error))
|
|
elif error.find('too many failed authorizations recently'):
|
|
return '该帐户1小时内失败的订单次数超过5次,请等待1小时再重试!'
|
|
elif error.find("Error creating new order") != -1:
|
|
return "订单创建失败,请稍候重试!"
|
|
elif error.find("Too Many Requests") != -1:
|
|
return "1小时内超过5次验证失败,暂时禁止申请,请稍候重试!"
|
|
elif error.find('HTTP Error 400: Bad Request') != -1:
|
|
return "CA服务器拒绝访问,请稍候重试!"
|
|
elif error.find('Temporary failure in name resolution') != -1:
|
|
return '服务器DNS故障,无法解析域名,请使用Linux工具箱检查dns配置'
|
|
elif error.find('Too Many Requests') != -1:
|
|
return '该域名请求申请次数过多,请3小时后重试'
|
|
else:
|
|
return error
|
|
|
|
# 创建订单
|
|
def createOrder(self, domains, auth_type, auth_to, index=None):
|
|
domains = self.formatDomains(domains)
|
|
if not domains:
|
|
raise Exception("至少需要有一个域名")
|
|
# 构造标识
|
|
identifiers = []
|
|
for domain_name in domains:
|
|
identifiers.append({"type": 'dns', "value": domain_name})
|
|
payload = {"identifiers": identifiers}
|
|
|
|
# 请求创建订单
|
|
res = self.acmeRequest(self.__apis['newOrder'], payload)
|
|
if not res.status in [201]: # 如果创建失败
|
|
e_body = self.getRequestJson(res)
|
|
if 'type' in e_body:
|
|
# 如果随机数失效
|
|
if e_body['type'].find('error:badNonce') != -1:
|
|
self.getNonce(force=True)
|
|
res = self.acmeRequest(self.__apis['newOrder'], payload)
|
|
|
|
# 如果帐户失效
|
|
if e_body['detail'].find('KeyID header contained an invalid account URL') != -1:
|
|
k = self._mod_index[self.__debug]
|
|
del(self.__config['account'][k])
|
|
self.getKid()
|
|
self.getNonce(force=True)
|
|
res = self.acmeRequest(self.__apis['newOrder'], payload)
|
|
if not res.status in [201]:
|
|
a_auth = e_body
|
|
|
|
ret_title = self.getError(str(a_auth))
|
|
raise StopIteration("{0} >>>> {1}".format(
|
|
ret_title, json.dumps(a_auth)))
|
|
|
|
# 返回验证地址和验证
|
|
s_json = self.getRequestJson(res)
|
|
s_json['auth_type'] = auth_type
|
|
s_json['domains'] = domains
|
|
s_json['auth_to'] = auth_to
|
|
index = self.saveOrder(s_json, index)
|
|
return index
|
|
|
|
# 检查验证状态
|
|
def checkAuthStatus(self, url, desired_status=None):
|
|
|
|
desired_status = desired_status or ["pending", "valid", "invalid"]
|
|
number_of_checks = 0
|
|
rdata = None
|
|
while True:
|
|
if desired_status == ['valid', 'invalid']:
|
|
writeLog("|-第{}次查询验证结果..".format(number_of_checks + 1))
|
|
time.sleep(self.__wait_time)
|
|
check_authorization_status_response = self.acmeRequest(url, "")
|
|
a_auth = rdata = self.getRequestJson(
|
|
check_authorization_status_response)
|
|
authorization_status = a_auth["status"]
|
|
number_of_checks += 1
|
|
if authorization_status in desired_status:
|
|
if authorization_status == "invalid":
|
|
writeLog("|-验证失败!")
|
|
try:
|
|
if 'error' in a_auth['challenges'][0]:
|
|
ret_title = a_auth['challenges'][
|
|
0]['error']['detail']
|
|
elif 'error' in a_auth['challenges'][1]:
|
|
ret_title = a_auth['challenges'][
|
|
1]['error']['detail']
|
|
elif 'error' in a_auth['challenges'][2]:
|
|
ret_title = a_auth['challenges'][
|
|
2]['error']['detail']
|
|
else:
|
|
ret_title = str(a_auth)
|
|
ret_title = self.getError(ret_title)
|
|
except:
|
|
ret_title = str(a_auth)
|
|
raise StopIteration("{0} >>>> {1}".format(
|
|
ret_title, json.dumps(a_auth)))
|
|
break
|
|
|
|
if number_of_checks == self.__max_check_num:
|
|
raise StopIteration(
|
|
"错误:已尝试验证{0}次. 最大验证次数为{1}. 验证时间间隔为{2}秒.".format(
|
|
number_of_checks,
|
|
self.__max_check_num,
|
|
self.__wait_time
|
|
)
|
|
)
|
|
if desired_status == ['valid', 'invalid']:
|
|
writeLog("|-验证成功!")
|
|
return rdata
|
|
|
|
# 发送验证请求
|
|
def respondToChallenge(self, auth):
|
|
payload = {"keyAuthorization": "{0}".format(
|
|
auth['acme_keyauthorization'])}
|
|
respond_to_challenge_response = self.acmeRequest(
|
|
auth['dns_challenge_url'], payload)
|
|
return respond_to_challenge_response
|
|
|
|
# 检查DNS记录
|
|
def checkDns(self, domain, value, s_type='TXT'):
|
|
writeLog(
|
|
"|-尝试本地验证DNS记录,域名: {} , 类型: {} 记录值: {}".format(domain, s_type, value))
|
|
time.sleep(10)
|
|
n = 0
|
|
while n < 20:
|
|
n += 1
|
|
try:
|
|
import dns.resolver
|
|
ns = dns.resolver.query(domain, s_type)
|
|
for j in ns.response.answer:
|
|
for i in j.items:
|
|
txt_value = i.to_text().replace('"', '').strip()
|
|
writeLog("|-第 {} 次验证值: {}".format(n, txt_value))
|
|
if txt_value == value:
|
|
write_log("|-本地验证成功!")
|
|
return True
|
|
except:
|
|
try:
|
|
import dns.resolver
|
|
except:
|
|
return False
|
|
time.sleep(3)
|
|
writeLog("|-本地验证失败!")
|
|
return True
|
|
|
|
def authDomain(self, index):
|
|
if not index in self.__config['orders']:
|
|
raise Exception('指定订单不存在!')
|
|
|
|
# 开始验证
|
|
for auth in self.__config['orders'][index]['auths']:
|
|
res = self.checkAuthStatus(auth['url']) # 检查是否需要验证
|
|
if res['status'] == 'pending':
|
|
if auth['type'] == 'dns': # 尝试提前验证dns解析
|
|
self.checkDns(
|
|
"_acme-challenge.{}".format(
|
|
auth['domain'].replace('*.', '')),
|
|
auth['auth_value'],
|
|
"TXT"
|
|
)
|
|
self.respondToChallenge(auth)
|
|
|
|
# 检查验证结果
|
|
for i in range(len(self.__config['orders'][index]['auths'])):
|
|
self.checkAuthStatus(self.__config['orders'][index]['auths'][i]['url'], [
|
|
'valid', 'invalid'])
|
|
self.__config['orders'][index]['status'] = 'valid'
|
|
|
|
# 构造可选名称
|
|
def getAltNames(self, index):
|
|
domain_name = self.__config['orders'][index]['domains'][0]
|
|
domain_alt_names = []
|
|
if len(self.__config['orders'][index]['domains']) > 1:
|
|
domain_alt_names = self.__config['orders'][index]['domains'][1:]
|
|
return domain_name, domain_alt_names
|
|
|
|
# 创建CSR
|
|
def createCsr(self, index):
|
|
if 'csr' in self.__config['orders'][index]:
|
|
return self.__config['orders']['csr']
|
|
domain_name, domain_alt_names = self.getAltNames(index)
|
|
X509Req = OpenSSL.crypto.X509Req()
|
|
X509Req.get_subject().CN = domain_name
|
|
if domain_alt_names:
|
|
SAN = "DNS:{0}, ".format(domain_name).encode("utf8") + ", ".join(
|
|
"DNS:" + i for i in domain_alt_names
|
|
).encode("utf8")
|
|
else:
|
|
SAN = "DNS:{0}".format(domain_name).encode("utf8")
|
|
|
|
X509Req.add_extensions(
|
|
[
|
|
OpenSSL.crypto.X509Extension(
|
|
"subjectAltName".encode("utf8"), critical=False, value=SAN
|
|
)
|
|
]
|
|
)
|
|
pk = OpenSSL.crypto.load_privatekey(
|
|
OpenSSL.crypto.FILETYPE_PEM, self.createCertificateKey(
|
|
index).encode()
|
|
)
|
|
X509Req.set_pubkey(pk)
|
|
# X509Req.set_version(2)
|
|
X509Req.set_version(0)
|
|
X509Req.sign(pk, self.__digest)
|
|
return OpenSSL.crypto.dump_certificate_request(OpenSSL.crypto.FILETYPE_ASN1, X509Req)
|
|
|
|
# 获取证书密钥对
|
|
def createCertificateKey(self, index):
|
|
# 判断是否已经创建private_key
|
|
if 'private_key' in self.__config['orders'][index]:
|
|
return self.__config['orders'][index]['private_key']
|
|
# 创建新的私钥
|
|
private_key = self.createKey()
|
|
if type(private_key) == bytes:
|
|
private_key = private_key.decode()
|
|
# 保存私钥到订单配置文件
|
|
self.__config['orders'][index]['private_key'] = private_key
|
|
self.saveConfig()
|
|
return private_key
|
|
|
|
# 发送CSR
|
|
def sendCsr(self, index):
|
|
csr = self.createCsr(index)
|
|
payload = {"csr": self.calculateSafeBase64(csr)}
|
|
send_csr_response = self.acmeRequest(
|
|
url=self.__config['orders'][index]['finalize'], payload=payload)
|
|
send_csr_response_json = self.getRequestJson(send_csr_response)
|
|
if send_csr_response.status not in [200, 201]:
|
|
raise ValueError(
|
|
"错误: 发送CSR: 响应状态{status_code} 响应值:{response}".format(
|
|
status_code=send_csr_response.status,
|
|
response=send_csr_response_json,
|
|
)
|
|
)
|
|
certificate_url = send_csr_response_json["certificate"]
|
|
self.__config['orders'][index]['certificate_url'] = certificate_url
|
|
self.saveConfig()
|
|
return certificate_url
|
|
|
|
# 转换时间
|
|
def strfDate(self, sdate):
|
|
return time.strftime('%Y-%m-%d', time.strptime(sdate, '%Y%m%d%H%M%S'))
|
|
|
|
# 证书转为DER
|
|
def dumpDer(self, cert_path):
|
|
cert = OpenSSL.crypto.load_certificate(
|
|
OpenSSL.crypto.FILETYPE_PEM, mw.readFile(cert_path + '/cert.csr'))
|
|
return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)
|
|
|
|
# 证书转为pkcs12
|
|
def dumpPkcs12(self, key_pem=None, cert_pem=None, ca_pem=None, friendly_name=None):
|
|
p12 = OpenSSL.crypto.PKCS12()
|
|
if cert_pem:
|
|
p12.set_certificate(OpenSSL.crypto.load_certificate(
|
|
OpenSSL.crypto.FILETYPE_PEM, cert_pem.encode()))
|
|
if key_pem:
|
|
p12.set_privatekey(OpenSSL.crypto.load_privatekey(
|
|
OpenSSL.crypto.FILETYPE_PEM, key_pem.encode()))
|
|
if ca_pem:
|
|
p12.set_ca_certificates((OpenSSL.crypto.load_certificate(
|
|
OpenSSL.crypto.FILETYPE_PEM, ca_pem.encode()),))
|
|
if friendly_name:
|
|
p12.set_friendlyname(friendly_name.encode())
|
|
return p12.export()
|
|
|
|
# 拆分根证书
|
|
def splitCaData(self, cert):
|
|
sp_key = '-----END CERTIFICATE-----\n'
|
|
datas = cert.split(sp_key)
|
|
return {"cert": datas[0] + sp_key, "root": sp_key.join(datas[1:])}
|
|
|
|
# 获取指定证书基本信息
|
|
def getCertInit(self, pem_file):
|
|
if not os.path.exists(pem_file):
|
|
return None
|
|
try:
|
|
result = {}
|
|
x509 = OpenSSL.crypto.load_certificate(
|
|
OpenSSL.crypto.FILETYPE_PEM, mw.readFile(pem_file))
|
|
# 取产品名称
|
|
issuer = x509.get_issuer()
|
|
result['issuer'] = ''
|
|
if hasattr(issuer, 'CN'):
|
|
result['issuer'] = issuer.CN
|
|
if not result['issuer']:
|
|
is_key = [b'0', '0']
|
|
issue_comp = issuer.get_components()
|
|
if len(issue_comp) == 1:
|
|
is_key = [b'CN', 'CN']
|
|
for iss in issue_comp:
|
|
if iss[0] in is_key:
|
|
result['issuer'] = iss[1].decode()
|
|
break
|
|
# 取到期时间
|
|
result['notAfter'] = self.strfDate(
|
|
bytes.decode(x509.get_notAfter())[:-1])
|
|
# 取申请时间
|
|
result['notBefore'] = self.strfDate(
|
|
bytes.decode(x509.get_notBefore())[:-1])
|
|
# 取可选名称
|
|
result['dns'] = []
|
|
for i in range(x509.get_extension_count()):
|
|
s_name = x509.get_extension(i)
|
|
if s_name.get_short_name() in [b'subjectAltName', 'subjectAltName']:
|
|
s_dns = str(s_name).split(',')
|
|
for d in s_dns:
|
|
result['dns'].append(d.split(':')[1])
|
|
subject = x509.get_subject().get_components()
|
|
# 取主要认证名称
|
|
if len(subject) == 1:
|
|
result['subject'] = subject[0][1].decode()
|
|
else:
|
|
result['subject'] = result['dns'][0]
|
|
return result
|
|
except:
|
|
return None
|
|
|
|
# 替换服务器上的同域名同品牌证书
|
|
def subAllCert(self, key_file, pem_file):
|
|
cert_init = self.getCertInit(pem_file) # 获取新证书的基本信息
|
|
paths = ['/www/server/mdserver-web/data/letsencrypt']
|
|
is_panel = False
|
|
for path in paths:
|
|
if not os.path.exists(path):
|
|
continue
|
|
for p_name in os.listdir(path):
|
|
to_path = path + '/' + p_name
|
|
to_pem_file = to_path + '/fullchain.pem'
|
|
to_key_file = to_path + '/privkey.pem'
|
|
to_info = to_path + '/info.json'
|
|
# 判断目标证书是否存在
|
|
if not os.path.exists(to_pem_file):
|
|
if not p_name in ['ssl']:
|
|
continue
|
|
to_pem_file = to_path + '/certificate.pem'
|
|
to_key_file = to_path + '/privateKey.pem'
|
|
if not os.path.exists(to_pem_file):
|
|
continue
|
|
if path == paths[-1]:
|
|
is_panel = True
|
|
# 获取目标证书的基本信息
|
|
to_cert_init = self.getCertInit(to_pem_file)
|
|
# 判断证书品牌是否一致
|
|
try:
|
|
if to_cert_init['issuer'] != cert_init['issuer'] and to_cert_init['issuer'].find("Let's Encrypt") == -1 and to_cert_init['issuer'] != 'R3':
|
|
continue
|
|
except:
|
|
continue
|
|
# 判断目标证书的到期时间是否较早
|
|
if to_cert_init['notAfter'] > cert_init['notAfter']:
|
|
continue
|
|
# 判断认识名称是否一致
|
|
if len(to_cert_init['dns']) != len(cert_init['dns']):
|
|
continue
|
|
is_copy = True
|
|
for domain in to_cert_init['dns']:
|
|
if not domain in cert_init['dns']:
|
|
is_copy = False
|
|
if not is_copy:
|
|
continue
|
|
|
|
# 替换新的证书文件和基本信息
|
|
mw.writeFile(to_pem_file, mw.readFile(pem_file, 'rb'), 'wb')
|
|
mw.writeFile(to_key_file, mw.readFile(key_file, 'rb'), 'wb')
|
|
mw.writeFile(to_info, json.dumps(cert_init))
|
|
writeLog("|-检测到{}下的证书与本次申请的证书重叠,且到期时间较早,已替换为新证书!".format(to_path))
|
|
# 重载web服务
|
|
mw.restartWeb()
|
|
|
|
# 保存证书到文件
|
|
def saveCert(self, cert, index):
|
|
try:
|
|
domain_name = self.__config['orders'][index]['domains'][0]
|
|
path = self.__config['orders'][index]['save_path']
|
|
if not os.path.exists(path):
|
|
os.makedirs(path, 384)
|
|
|
|
# 存储证书
|
|
key_file = path + "/privkey.pem"
|
|
pem_file = path + "/fullchain.pem"
|
|
mw.writeFile(key_file, cert['private_key'])
|
|
mw.writeFile(pem_file, cert['cert'] + cert['root'])
|
|
mw.writeFile(path + "/cert.csr", cert['cert'])
|
|
mw.writeFile(path + "/root_cert.csr", cert['root'])
|
|
|
|
# 转为IIS证书
|
|
pfx_buffer = self.dumpPkcs12(
|
|
cert['private_key'], cert['cert'] + cert['root'], cert['root'], domain_name)
|
|
mw.writeFile(path + "/fullchain.pfx", pfx_buffer, 'wb+')
|
|
|
|
ps = '''文件说明:
|
|
privkey.pem 证书私钥
|
|
fullchain.pem 包含证书链的PEM格式证书(nginx/apache)
|
|
root_cert.csr 根证书
|
|
cert.csr 域名证书
|
|
fullchain.pfx 用于IIS的证书格式
|
|
|
|
如何在MW面板使用:
|
|
privkey.pem 粘贴到密钥输入框
|
|
fullchain.pem 粘贴到证书输入框
|
|
'''
|
|
mw.writeFile(path + '/readme.txt', ps)
|
|
self.subAllCert(key_file, pem_file)
|
|
except:
|
|
writeLog(mw.getTracebackInfo())
|
|
|
|
# 获取证书到期时间
|
|
def getCertTimeout(self, cret_data):
|
|
try:
|
|
x509 = OpenSSL.crypto.load_certificate(
|
|
OpenSSL.crypto.FILETYPE_PEM, cret_data)
|
|
cert_timeout = bytes.decode(x509.get_notAfter())[:-1]
|
|
return int(time.mktime(time.strptime(cert_timeout, '%Y%m%d%H%M%S')))
|
|
except:
|
|
return int(time.time() + (86400 * 90))
|
|
|
|
# 下载证书
|
|
def downloadCert(self, index):
|
|
res = self.acmeRequest(
|
|
self.__config['orders'][index]['certificate_url'], "")
|
|
|
|
pem_certificate = res.read().decode('utf-8')
|
|
if res.status not in [200, 201]:
|
|
raise Exception("下载证书失败: {}".format(json.loads(pem_certificate)))
|
|
|
|
cert = self.splitCaData(pem_certificate)
|
|
cert['cert_timeout'] = self.getCertTimeout(cert['cert'])
|
|
cert['private_key'] = self.__config['orders'][index]['private_key']
|
|
cert['domains'] = self.__config['orders'][index]['domains']
|
|
del(self.__config['orders'][index]['private_key'])
|
|
del(self.__config['orders'][index]['auths'])
|
|
del(self.__config['orders'][index]['expires'])
|
|
del(self.__config['orders'][index]['authorizations'])
|
|
del(self.__config['orders'][index]['finalize'])
|
|
del(self.__config['orders'][index]['identifiers'])
|
|
if 'cert' in self.__config['orders'][index]:
|
|
del(self.__config['orders'][index]['cert'])
|
|
self.__config['orders'][index]['status'] = 'valid'
|
|
self.__config['orders'][index]['cert_timeout'] = cert['cert_timeout']
|
|
domain_name = self.__config['orders'][index]['domains'][0]
|
|
self.__config['orders'][index]['save_path'] = '{}/{}'.format(
|
|
self.__save_path, domain_name)
|
|
cert['save_path'] = self.__config['orders'][index]['save_path']
|
|
self.saveConfig()
|
|
self.saveCert(cert, index)
|
|
return cert
|
|
|
|
def applyCert(self, domains, auth_type='http', auth_to='Dns_com|None|None', args={}):
|
|
writeLog("", "wb+")
|
|
try:
|
|
self.getApis()
|
|
index = None
|
|
if 'index' in args and args.index:
|
|
index = args.index
|
|
if not index:
|
|
# 判断是否只想验证域名
|
|
writeLog("|-正在创建订单..")
|
|
index = self.createOrder(domains, auth_type, auth_to)
|
|
writeLog("|-正在获取验证信息..")
|
|
self.getAuths(index)
|
|
if auth_to == 'dns' and len(self.__config['orders'][index]['auths']) > 0:
|
|
return self.__config['orders'][index]
|
|
writeLog("|-正在验证域名..")
|
|
self.authDomain(index)
|
|
|
|
writeLog("|-正在发送CSR..")
|
|
self.sendCsr(index)
|
|
|
|
writeLog("|-正在下载证书..")
|
|
cert = self.downloadCert(index)
|
|
|
|
# 保存证书配置
|
|
self.saveConfig()
|
|
cert['status'] = True
|
|
cert['msg'] = '申请成功!'
|
|
writeLog("|-申请成功,正在部署到站点..")
|
|
|
|
self.clearAuthFile(index)
|
|
|
|
if os.path.exists(auth_to):
|
|
mw.execShell("rm -rf {}/.well-known".format(auth_to))
|
|
return cert
|
|
except Exception as ex:
|
|
ex = str(ex)
|
|
if ex.find(">>>>") != -1:
|
|
msg = ex.split(">>>>")
|
|
msg[1] = json.loads(msg[1])
|
|
else:
|
|
msg = ex
|
|
writeLog(mw.getTracebackInfo())
|
|
|
|
cert = {}
|
|
cert['status'] = False
|
|
cert['msg'] = msg
|
|
return cert
|
|
|
|
# 取根域名和记录值
|
|
def extractZone(self, domain_name):
|
|
top_domain_list = ['.ac.cn', '.ah.cn', '.bj.cn', '.com.cn', '.cq.cn', '.fj.cn', '.gd.cn', '.gov.cn', '.gs.cn',
|
|
'.gx.cn', '.gz.cn', '.ha.cn', '.hb.cn', '.he.cn', '.hi.cn', '.hk.cn', '.hl.cn', '.hn.cn',
|
|
'.jl.cn', '.js.cn', '.jx.cn', '.ln.cn', '.mo.cn', '.net.cn', '.nm.cn', '.nx.cn', '.org.cn',
|
|
'.my.id', '.com.ac', '.com.ad', '.com.ae', '.com.af', '.com.ag', '.com.ai', '.com.al', '.com.am',
|
|
'.com.an', '.com.ao', '.com.aq', '.com.ar', '.com.as', '.com.as', '.com.at', '.com.au', '.com.aw',
|
|
'.com.az', '.com.ba', '.com.bb', '.com.bd', '.com.be', '.com.bf', '.com.bg', '.com.bh', '.com.bi',
|
|
'.com.bj', '.com.bm', '.com.bn', '.com.bo', '.com.br', '.com.bs', '.com.bt', '.com.bv', '.com.bw',
|
|
'.com.by', '.com.bz', '.com.ca', '.com.ca', '.com.cc', '.com.cd', '.com.cf', '.com.cg', '.com.ch',
|
|
'.com.ci', '.com.ck', '.com.cl', '.com.cm', '.com.cn', '.com.co', '.com.cq', '.com.cr', '.com.cu',
|
|
'.com.cv', '.com.cx', '.com.cy', '.com.cz', '.com.de', '.com.dj', '.com.dk', '.com.dm', '.com.do',
|
|
'.com.dz', '.com.ec', '.com.ee', '.com.eg', '.com.eh', '.com.es', '.com.et', '.com.eu', '.com.ev',
|
|
'.com.fi', '.com.fj', '.com.fk', '.com.fm', '.com.fo', '.com.fr', '.com.ga', '.com.gb', '.com.gd',
|
|
'.com.ge', '.com.gf', '.com.gh', '.com.gi', '.com.gl', '.com.gm', '.com.gn', '.com.gp', '.com.gr',
|
|
'.com.gt', '.com.gu', '.com.gw', '.com.gy', '.com.hm', '.com.hn', '.com.hr', '.com.ht', '.com.hu',
|
|
'.com.id', '.com.id', '.com.ie', '.com.il', '.com.il', '.com.in', '.com.io', '.com.iq', '.com.ir',
|
|
'.com.is', '.com.it', '.com.jm', '.com.jo', '.com.jp', '.com.ke', '.com.kg', '.com.kh', '.com.ki',
|
|
'.com.km', '.com.kn', '.com.kp', '.com.kr', '.com.kw', '.com.ky', '.com.kz', '.com.la', '.com.lb',
|
|
'.com.lc', '.com.li', '.com.lk', '.com.lr', '.com.ls', '.com.lt', '.com.lu', '.com.lv', '.com.ly',
|
|
'.com.ma', '.com.mc', '.com.md', '.com.me', '.com.mg', '.com.mh', '.com.ml', '.com.mm', '.com.mn',
|
|
'.com.mo', '.com.mp', '.com.mq', '.com.mr', '.com.ms', '.com.mt', '.com.mv', '.com.mw', '.com.mx',
|
|
'.com.my', '.com.mz', '.com.na', '.com.nc', '.com.ne', '.com.nf', '.com.ng', '.com.ni', '.com.nl',
|
|
'.com.no', '.com.np', '.com.nr', '.com.nr', '.com.nt', '.com.nu', '.com.nz', '.com.om', '.com.pa',
|
|
'.com.pe', '.com.pf', '.com.pg', '.com.ph', '.com.pk', '.com.pl', '.com.pm', '.com.pn', '.com.pr',
|
|
'.com.pt', '.com.pw', '.com.py', '.com.qa', '.com.re', '.com.ro', '.com.rs', '.com.ru', '.com.rw',
|
|
'.com.sa', '.com.sb', '.com.sc', '.com.sd', '.com.se', '.com.sg', '.com.sh', '.com.si', '.com.sj',
|
|
'.com.sk', '.com.sl', '.com.sm', '.com.sn', '.com.so', '.com.sr', '.com.st', '.com.su', '.com.sy',
|
|
'.com.sz', '.com.tc', '.com.td', '.com.tf', '.com.tg', '.com.th', '.com.tj', '.com.tk', '.com.tl',
|
|
'.com.tm', '.com.tn', '.com.to', '.com.tp', '.com.tr', '.com.tt', '.com.tv', '.com.tw', '.com.tz',
|
|
'.com.ua', '.com.ug', '.com.uk', '.com.uk', '.com.us', '.com.uy', '.com.uz', '.com.va', '.com.vc',
|
|
'.com.ve', '.com.vg', '.com.vn', '.com.vu', '.com.wf', '.com.ws', '.com.ye', '.com.za', '.com.zm',
|
|
'.com.zw', '.mil.cn', '.qh.cn', '.sc.cn', '.sd.cn', '.sh.cn', '.sx.cn', '.tj.cn', '.tw.cn', '.tw.cn',
|
|
'.xj.cn', '.xz.cn', '.yn.cn', '.zj.cn', '.bj.cn', '.edu.kg'
|
|
]
|
|
old_domain_name = domain_name
|
|
top_domain = "." + ".".join(domain_name.rsplit('.')[-2:])
|
|
new_top_domain = "." + top_domain.replace(".", "")
|
|
is_tow_top = False
|
|
if top_domain in top_domain_list:
|
|
is_tow_top = True
|
|
domain_name = domain_name[:-len(top_domain)] + new_top_domain
|
|
|
|
if domain_name.count(".") > 1:
|
|
zone, middle, last = domain_name.rsplit(".", 2)
|
|
if is_tow_top:
|
|
last = top_domain[1:]
|
|
root = ".".join([middle, last])
|
|
else:
|
|
zone = ""
|
|
root = old_domain_name
|
|
return root, zone
|
|
|
|
# 外部API - START ----------------------------------------------------------
|
|
def getHostConf(self, siteName):
|
|
return mw.getServerDir() + '/web_conf/nginx/vhost/' + siteName + '.conf'
|
|
|
|
def getSitePath(self, siteName):
|
|
file = self.getHostConf(siteName)
|
|
if os.path.exists(file):
|
|
conf = mw.readFile(file)
|
|
rep = r'\s*root\s*(.+);'
|
|
path = re.search(rep, conf).groups()[0]
|
|
return path
|
|
return ''
|
|
|
|
def applyCertApi(self, args):
|
|
'''
|
|
申请证书 - api
|
|
'''
|
|
return self.applyCert(args['domains'], args['auth_type'], args['auth_to'])
|
|
# 外部API - END ----------------------------------------------------------
|
|
|
|
def getSiteNameByDomains(self, domains):
|
|
# 通过域名获取网站名称
|
|
sql = mw.M('domain')
|
|
site_sql = mw.M('sites')
|
|
siteName = None
|
|
for domain in domains:
|
|
pid = sql.where('name=?', domain).getField('pid')
|
|
if pid:
|
|
siteName = site_sql.where('id=?', pid).getField('name')
|
|
break
|
|
return siteName
|
|
|
|
def renewCertTo(self, domains, auth_type, auth_to, index=None):
|
|
site_name = None
|
|
cert = {}
|
|
|
|
import site_api
|
|
api = site_api.site_api()
|
|
if os.path.exists(auth_to):
|
|
if mw.M('sites').where('path=?', auth_to).count() == 1:
|
|
site_id = m.M('sites').where('path=?', auth_to).getField('id')
|
|
site_name = m.M('sites').where(
|
|
'path=?', auth_to).getField('name')
|
|
|
|
rdata = api.getSiteRunPath(site_id)
|
|
runPath = rdata['runPath']
|
|
if runPath and not runPath in ['/']:
|
|
path = auth_to + '/' + runPath
|
|
if os.path.exists(path):
|
|
auth_to = path.replace('//', '/')
|
|
|
|
else:
|
|
site_name = self.getSiteNameByDomains(domains)
|
|
is_rep = api.httpToHttps(site_name)
|
|
try:
|
|
index = self.createOrder(
|
|
domains,
|
|
auth_type,
|
|
auth_to.replace('//', '/'),
|
|
index
|
|
)
|
|
|
|
writeLog("|-正在获取验证信息..")
|
|
self.getAuths(index)
|
|
writeLog("|-正在验证域名..")
|
|
self.authDomain(index)
|
|
writeLog("|-正在发送CSR..")
|
|
self.sendCsr(index)
|
|
writeLog("|-正在下载证书..")
|
|
cert = self.downloadCert(index)
|
|
self.__config['orders'][index]['renew_time'] = int(time.time())
|
|
|
|
# 清理失败重试记录
|
|
self.__config['orders'][index]['retry_count'] = 0
|
|
self.__config['orders'][index]['next_retry_time'] = 0
|
|
|
|
# 保存证书配置
|
|
self.saveConfig()
|
|
cert['status'] = True
|
|
cert['msg'] = '续签成功!'
|
|
writeLog("|-续签成功!")
|
|
except Exception as e:
|
|
if str(e).find('429') > -1:
|
|
msg = '至少超过7天,才能续期!'
|
|
writeLog("|-" + msg)
|
|
return mw.returnJson(False, msg)
|
|
|
|
if str(e).find('请稍候重试') == -1: # 受其它证书影响和连接CA失败的的不记录重试次数
|
|
if index:
|
|
# 设置下次重试时间
|
|
self.__config['orders'][index][
|
|
'next_retry_time'] = int(time.time() + (86400 * 2))
|
|
# 记录重试次数
|
|
if not 'retry_count' in self.__config['orders'][index].keys():
|
|
self.__config['orders'][index]['retry_count'] = 1
|
|
self.__config['orders'][index]['retry_count'] += 1
|
|
# 保存证书配置
|
|
self.saveConfig()
|
|
msg = str(e).split('>>>>')[0]
|
|
writeLog("|-" + msg)
|
|
return mw.returnJson(False, msg)
|
|
finally:
|
|
is_rep_decode = json.loads(is_rep)
|
|
if is_rep_decode['status']:
|
|
api.closeToHttps(site_name)
|
|
writeLog("-" * 70)
|
|
return cert
|
|
|
|
def renewCert(self, index):
|
|
writeLog("", "wb+")
|
|
# self.D('renew_cert', index)
|
|
try:
|
|
order_index = []
|
|
if index:
|
|
if type(index) != str:
|
|
index = index.index
|
|
if not index in self.__config['orders']:
|
|
raise Exception("指定订单号不存在,无法续签!")
|
|
order_index.append(index)
|
|
else:
|
|
# 测试一天过期
|
|
now_time = time.time()
|
|
# print(self.__config)
|
|
if not 'orders' in self.__config:
|
|
self.__config['orders'] = {}
|
|
|
|
for i in self.__config['orders'].keys():
|
|
# print(self.__config['orders'][i])
|
|
if not 'save_path' in self.__config['orders'][i]:
|
|
continue
|
|
|
|
if 'cert' in self.__config['orders'][i]:
|
|
self.__config['orders'][i]['cert_timeout'] = self.__config['orders'][i]['cert']['cert_timeout']
|
|
|
|
if not 'cert_timeout' in self.__config['orders'][i]:
|
|
self.__config['orders'][i]['cert_timeout'] = int(time.time())
|
|
|
|
# print(self.__config['orders'][i]['domains'][0], (self.__config['orders'][i]['cert_timeout'] - now_time)/86400, now_time)
|
|
if self.__config['orders'][i]['cert_timeout'] - now_time > 83*86400:
|
|
msg = "|-本次跳过域名: {},未过期!".format(self.__config['orders'][i]['domains'][0])
|
|
writeLog(msg)
|
|
continue
|
|
|
|
# 已删除的网站直接跳过续签
|
|
if self.__config['orders'][i]['auth_to'].find('|') == -1 and self.__config['orders'][i]['auth_to'].find('/') != -1:
|
|
if not os.path.exists(self.__config['orders'][i]['auth_to']):
|
|
auth_to = self.__config['orders'][i]['auth_to']
|
|
if not auth_to:
|
|
continue
|
|
|
|
# 域名不存在?
|
|
for domain in self.__config['orders'][i]['domains']:
|
|
if domain.find('*') != -1:
|
|
break
|
|
if not mw.M('domain').where("name=?", (domain,)).count() and not mw.M('binding').where("domain=?", domain).count():
|
|
auth_to = None
|
|
msg = "|-跳过被删除的域名: {}".format(
|
|
self.__config['orders'][i]['domains'])
|
|
writeLog(msg)
|
|
if not auth_to:
|
|
continue
|
|
|
|
self.__config['orders'][i]['auth_to'] = auth_to
|
|
|
|
# 是否到了允许重试的时间
|
|
if 'next_retry_time' in self.__config['orders'][i]:
|
|
timeout = self.__config['orders'][i][
|
|
'next_retry_time'] - int(time.time())
|
|
if timeout > 0:
|
|
msg = '|-本次跳过域名:{},因第上次续签失败,还需要等待{}小时后再重试'.format(
|
|
self.__config['orders'][i]['domains'], int(timeout / 60 / 60))
|
|
writeLog(msg)
|
|
continue
|
|
|
|
# 是否到了最大重试次数
|
|
if 'retry_count' in self.__config['orders'][i]:
|
|
if self.__config['orders'][i]['retry_count'] >= 5:
|
|
msg = '|-本次跳过域名:{},因连续5次续签失败,不再续签此证书(可尝试手动续签此证书,成功后错误次数将被重置)'.format(
|
|
self.__config['orders'][i]['domains'])
|
|
writeLog(msg)
|
|
continue
|
|
|
|
# 加入到续签订单
|
|
order_index.append(i)
|
|
if not order_index:
|
|
writeLog("|-所有任务已处理完成!")
|
|
return
|
|
writeLog("|-共需要续签 {} 张证书".format(len(order_index)))
|
|
|
|
n = 0
|
|
self.getApis()
|
|
cert = None
|
|
for index in order_index:
|
|
n += 1
|
|
writeLog("|-正在续签第 {} 张,域名: {}..".format(n,
|
|
self.__config['orders'][index]['domains']))
|
|
writeLog("|-正在创建订单..")
|
|
cert = self.renewCertTo(self.__config['orders'][index]['domains'], self.__config[
|
|
'orders'][index]['auth_type'], self.__config['orders'][index]['auth_to'], index)
|
|
self.clearAuthFile(index)
|
|
return cert
|
|
except Exception as ex:
|
|
ex = str(ex)
|
|
if ex.find(">>>>") != -1:
|
|
msg = ex.split(">>>>")
|
|
msg[1] = json.loads(msg[1])
|
|
else:
|
|
msg = ex
|
|
writeLog(mw.getTracebackInfo())
|
|
return mw.returnJson(False, msg)
|
|
|
|
def revokeOrder(self, index):
|
|
if not index in self.__config['orders']:
|
|
raise Exception("指定订单不存在!")
|
|
cert_path = self.__config['orders'][index]['save_path']
|
|
if not os.path.exists(cert_path):
|
|
raise Exception("指定订单没有找到可用的证书!")
|
|
cert = self.dumpDer(cert_path)
|
|
if not cert:
|
|
raise Exception("证书读取失败!")
|
|
payload = {
|
|
"certificate": self.calculateSafeBase64(cert),
|
|
"reason": 4
|
|
}
|
|
|
|
self.getApis()
|
|
res = self.acmeRequest(self.__apis['revokeCert'], payload)
|
|
if res.status in [200, 201]:
|
|
if os.path.exists(cert_path):
|
|
mw.execShell("rm -rf {}".format(cert_path))
|
|
del(self.__config['orders'][index])
|
|
self.saveConfig()
|
|
return mw.returnJson(True, "证书吊销成功!")
|
|
return res.json()
|
|
|
|
def do(self, args):
|
|
cert = None
|
|
try:
|
|
if not args.index:
|
|
if not args.domains:
|
|
echoErr("请在--domain参数中指定要申请证书的域名,多个以逗号(,)隔开")
|
|
if not args.auth_type in ['http', 'tls']:
|
|
echoErr("请在--type参数中指定正确的验证类型,http")
|
|
|
|
auth_to = ''
|
|
if args.auth_type in ['http', 'tls']:
|
|
if not args.path:
|
|
echoErr("请在--path参数中指定网站根目录!")
|
|
if not os.path.exists(args.path):
|
|
echoErr("指定网站根目录不存在,请检查:{}".format(args.path))
|
|
auth_to = args.path
|
|
else:
|
|
echoErr("仅支持文件验证!")
|
|
exit()
|
|
|
|
domains = args.domains.strip().split(',')
|
|
cert = self.applyCert(
|
|
domains, auth_type=args.auth_type, auth_to=auth_to, args=args)
|
|
else:
|
|
# 重新验证
|
|
cert = self.applyCert([], auth_type='dns',
|
|
auth_to='dns', index=args.index)
|
|
except Exception as e:
|
|
writeLog("|-{}".format(mw.getTracebackInfo()))
|
|
exit()
|
|
|
|
if not cert:
|
|
exit()
|
|
|
|
# print(cert)
|
|
if not cert['status']:
|
|
writeLog('|-' + cert['msg'][0])
|
|
exit()
|
|
writeLog("=" * 65)
|
|
writeLog("|-证书获取成功!")
|
|
writeLog("=" * 65)
|
|
writeLog("证书到期时间: {}".format(
|
|
mw.formatDate(times=cert['cert_timeout'])))
|
|
writeLog("证书已保存在: {}/".format(cert['save_path']))
|
|
|
|
|
|
# exp:
|
|
'''
|
|
// create
|
|
python3 class/core/cert_api.py --domain=dev38.cachecha.com --type=http --path=/www/wwwroot/dev38.cachecha.com
|
|
// renew
|
|
cd /www/server/mdserver-web && python3 class/core/cert_api.py --renew=1 --index=46c558aa1fae96facf36ac7df8eb4750
|
|
// revoke
|
|
cd /www/server/mdserver-web && python3 class/core/cert_api.py --revoke=1 --index=370423ed29481b2caf22e36d90a6894a
|
|
|
|
|
|
|
|
python3 class/core/cert_request.py --domain=dev38.cachecha.com --type=http --path=/Users/midoks/Desktop/mwdev/wwwroot/test
|
|
|
|
python3 class/core/cert_api.py --domain=dev38.cachecha.com --type=http --path=/Users/midoks/Desktop/mwdev/wwwroot/test
|
|
python3 class/core/cert_api.py --renew=1
|
|
python3 class/core/cert_api.py --revoke=1 --index=370423ed29481b2caf22e36d90a6894a
|
|
'''
|
|
if __name__ == "__main__":
|
|
p = argparse.ArgumentParser(usage="必要的参数:--domain 域名列表,多个以逗号隔开!")
|
|
p.add_argument('--domain', default=None,
|
|
help="请指定要申请证书的域名", dest="domains")
|
|
p.add_argument('--type', default=None, help="请指定验证类型", dest="auth_type")
|
|
p.add_argument('--path', default=None, help="请指定网站根目录", dest="path")
|
|
p.add_argument('--index', default=None, help="指定订单索引", dest="index")
|
|
p.add_argument('--renew', default=None, help="续签证书", dest="renew")
|
|
p.add_argument('--revoke', default=None, help="吊销证书", dest="revoke")
|
|
|
|
args = p.parse_args()
|
|
cr = cert_api()
|
|
|
|
if args.revoke:
|
|
if not args.index:
|
|
echoErr("请在--index参数中传入要被吊销的订单索引")
|
|
result = cr.revokeOrder(args.index)
|
|
writeLog(result)
|
|
exit()
|
|
|
|
if args.renew:
|
|
cr.renewCert(args.index)
|
|
exit()
|
|
|
|
cr.do(args)
|
|
|