diff --git a/class/core/cert_request.py b/class/core/cert_request.py new file mode 100644 index 000000000..d8df0b90f --- /dev/null +++ b/class/core/cert_request.py @@ -0,0 +1,1177 @@ +# coding:utf-8 + +# --------------------------------------------------------------------------------- +# MW-Linux面板 +# --------------------------------------------------------------------------------- +# copyright (c) 2018-∞(https://github.com/midoks/mdserver-web) All rights reserved. +# --------------------------------------------------------------------------------- +# Author: midoks +# --------------------------------------------------------------------------------- + +# --------------------------------------------------------------------------------- +# 证书申请 +# --------------------------------------------------------------------------------- + +import re +import fcntl +import datetime +import binascii +import hashlib +import base64 +import json +import time +import os +import sys +import argparse + +import mw + +try: + import OpenSSL +except: + mw.execShell("pip install pyopenssl") + import OpenSSL +try: + import dns.resolver +except: + mw.execShell("pip install dnspython") + import dns.resolver + +# import http_requests as requests +# requests.DEFAULT_TYPE = 'curl' +import requests +requests.packages.urllib3.disable_warnings() + + +def echoErr(msg): + writeLog("\033[31m=" * 65) + writeLog("|-错误:{}\033[0m".format(msg)) + exit() + + +def writeLog(log_str, mode="ab+"): + # 写日志 + if __name__ == "__main__": + print(log_str) + return + _log_file = 'logs/letsencrypt.log' + f = open(_log_file, mode) + log_str += "\n" + f.write(log_str.encode('utf-8')) + f.close() + return True + + +class cert_request: + + __debug = True + __user_agent = "MW-Panel" + __apis = None + __url = None + __replay_nonce = None + __acme_timeout = 30 + __max_check_num = 5 + __wait_time = 5 + __bits = 2048 + __digest = "sha256" + __verify = False + __config = {} + __dns_class = None + __auto_wildcard = False + __mod_index = {True: "Staging", False: "Production"} + __cfg_file = 'data/letsencrypt.json' + + def __init__(self): + if self.__debug: + self.__url = 'https://acme-staging-v02.api.letsencrypt.org/directory' + else: + self.__url = 'https://acme-v02.api.letsencrypt.org/directory' + self.__config = self.readConfig() + + def D(self, name, val): + if self.__debug: + print('---------{} start--------'.format(name)) + print(val) + print('---------{} end--------'.format(name)) + + # 读配置文件 + def readConfig(self): + if not os.path.exists(self.__cfg_file): + self.__config['orders'] = {} + self.__config['account'] = {} + self.__config['apis'] = {} + self.__config['email'] = mw.M('users').where( + 'id=?', (1,)).getField('email') + if self.__config['email'] in ['midoks@163.com']: + self.__config['email'] = None + self.saveConfig() + return self.__config + tmp_config = mw.readFile(self.__cfg_file) + if not tmp_config: + return self.__config + try: + self.__config = json.loads(tmp_config) + except: + self.saveConfig() + return self.__config + return self.__config + + # 写配置文件 + def saveConfig(self): + fp = open(self.__cfg_file, 'w+') + fcntl.flock(fp, fcntl.LOCK_EX) # 加锁 + fp.write(json.dumps(self.__config)) + fcntl.flock(fp, fcntl.LOCK_UN) # 解锁 + fp.close() + return True + + def getApis(self): + if not self.__apis: + # 尝试从配置文件中获取 + api_index = self.__mod_index[self.__debug] + if not 'apis' in self.__config: + self.__config['apis'] = {} + + if api_index in self.__config['apis']: + if 'expires' in self.__config['apis'][api_index] and 'directory' in self.__config['apis'][api_index]: + if time.time() < self.__config['apis'][api_index]['expires']: + self.__apis = self.__config['apis'][ + api_index]['directory'] + return self.__apis + + # 尝试从官方获取获取 + try: + res = mw.httpGet(self.__url) + result = json.loads(res) + self.__apis = {} + self.__apis['newAccount'] = result['newAccount'] + self.__apis['newNonce'] = result['newNonce'] + self.__apis['newOrder'] = result['newOrder'] + self.__apis['revokeCert'] = result['revokeCert'] + self.__apis['keyChange'] = result['keyChange'] + + # 保存到配置文件 + self.__config['apis'][api_index] = {} + self.__config['apis'][api_index]['directory'] = self.__apis + self.__config['apis'][api_index]['expires'] = time.time() + \ + 86400 # 24小时后过期 + self.saveConfig() + except Exception as e: + raise Exception( + '服务因维护而关闭或发生内部错误,查看 https://letsencrypt.status.io/ 了解更多详细信息。') + return self.__apis + + # 系列化payload + def stringfyItems(self, payload): + if isinstance(payload, str): + return payload + + for k, v in payload.items(): + if isinstance(k, bytes): + k = k.decode("utf-8") + if isinstance(v, bytes): + v = v.decode("utf-8") + payload[k] = v + return payload + + # 构造域名列表 + def formatDomains(self, domains): + if type(domains) != list: + return [] + # 是否自动构造通配符 + if self.__auto_wildcard: + domains = self.autoWildcard(domains) + wildcard = [] + tmp_domains = [] + for domain in domains: + domain = domain.strip() + if domain in tmp_domains: + continue + # 将通配符域名转为验证正则表达式 + f_index = domain.find("*.") + if f_index not in [-1, 0]: + continue + if f_index == 0: + wildcard.append(domain.replace( + "*", r"^[\w-]+").replace(".", r"\.")) + # 添加到申请列表 + tmp_domains.append(domain) + + # 处理通配符包含 + apply_domains = tmp_domains[:] + for domain in tmp_domains: + for w in wildcard: + if re.match(w, domain): + apply_domains.pop(domain) + + return apply_domains + + # 转为无填充的Base64 + def calculateSafeBase64(self, un_encoded_data): + if sys.version_info[0] == 3: + if isinstance(un_encoded_data, str): + un_encoded_data = un_encoded_data.encode("utf8") + r = base64.urlsafe_b64encode(un_encoded_data).rstrip(b"=") + return r.decode("utf8") + + # 创建Key + def createKey(self, key_type=OpenSSL.crypto.TYPE_RSA): + key = OpenSSL.crypto.PKey() + key.generate_key(key_type, self.__bits) + private_key = OpenSSL.crypto.dump_privatekey( + OpenSSL.crypto.FILETYPE_PEM, key) + return private_key + + # 获用户取密钥对 + def getAccountKey(self): + if not 'account' in self.__config: + self.__config['account'] = {} + k = self.__mod_index[self.__debug] + if not k in self.__config['account']: + self.__config['account'][k] = {} + + if not 'key' in self.__config['account'][k]: + self.__config['account'][k]['key'] = self.createKey() + if type(self.__config['account'][k]['key']) == bytes: + self.__config['account'][k]['key'] = self.__config[ + 'account'][k]['key'].decode() + self.saveConfig() + return self.__config['account'][k]['key'] + + # 注册acme帐户 + def register(self, existing=False): + if not 'email' in self.__config: + self.__config['email'] = 'demo@bt.cn' + if existing: + payload = {"onlyReturnExisting": True} + elif self.__config['email']: + payload = { + "termsOfServiceAgreed": True, + "contact": ["mailto:{0}".format(self.__config['email'])], + } + else: + payload = {"termsOfServiceAgreed": True} + + res = self.acmeRequest(url=self.__apis['newAccount'], payload=payload) + + if res.status_code not in [201, 200, 409]: + raise Exception("注册ACME帐户失败: {}".format(res.json())) + kid = res.headers["Location"] + return kid + + # 获取kid + def getKid(self, force=False): + # 如果配置文件中不存在kid或force = True时则重新注册新的acme帐户 + if not 'account' in self.__config: + self.__config['account'] = {} + k = self.__mod_index[self.__debug] + if not k in self.__config['account']: + self.__config['account'][k] = {} + + if not 'kid' in self.__config['account'][k]: + self.__config['account'][k]['kid'] = self.register() + self.saveConfig() + time.sleep(3) + self.__config = self.readConfig() + return self.__config['account'][k]['kid'] + + # 获取随机数 + def getNonce(self, force=False): + # 如果没有保存上一次的随机数或force=True时则重新获取新的随机数 + if not self.__replay_nonce or force: + headers = {"User-Agent": self.__user_agent} + response = requests.get( + self.__apis['newNonce'], + timeout=self.__acme_timeout, + headers=headers, + verify=self.__verify + ) + self.__replay_nonce = response.headers["Replay-Nonce"] + return self.__replay_nonce + + # 获请ACME请求头 + def getAcmeHeader(self, url): + header = {"alg": "RS256", "nonce": self.getNonce(), "url": url} + if url in [self.__apis['newAccount'], 'GET_THUMBPRINT']: + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import serialization + private_key = serialization.load_pem_private_key( + self.getAccountKey().encode(), + password=None, + backend=default_backend(), + ) + public_key_public_numbers = private_key.public_key().public_numbers() + + exponent = "{0:x}".format(public_key_public_numbers.e) + exponent = "0{0}".format(exponent) if len( + exponent) % 2 else exponent + modulus = "{0:x}".format(public_key_public_numbers.n) + jwk = { + "kty": "RSA", + "e": self.calculateSafeBase64(binascii.unhexlify(exponent)), + "n": self.calculateSafeBase64(binascii.unhexlify(modulus)), + } + header["jwk"] = jwk + else: + header["kid"] = self.getKid() + return header + + # 计算signature + def signMessage(self, message): + pk = OpenSSL.crypto.load_privatekey( + OpenSSL.crypto.FILETYPE_PEM, self.getAccountKey().encode()) + return OpenSSL.crypto.sign(pk, message.encode("utf8"), self.__digest) + + def getSiteRunPathByid(self, site_id): + ''' + @name 通过site_id获取网站运行目录 + @author hwliang + @param site_id 网站标识 + @return None or string + ''' + if mw.M('sites').where('id=? and project_type=?', (site_id, 'PHP')).count() >= 1: + site_path = public.M('sites').where( + 'id=?', site_id).getField('path') + if not site_path: + return None + if not os.path.exists(site_path): + return None + args = public.dict_obj() + args.id = site_id + import panelSite + run_path = panelSite.panelSite().GetRunPath(args) + if run_path in ['/']: + run_path = '' + if run_path: + if run_path[0] == '/': + run_path = run_path[1:] + site_run_path = os.path.join(site_path, run_path) + if not os.path.exists(site_run_path): + return site_path + return site_run_path + else: + return False + + def getSiteRunPath(self, domains): + ''' + @name 通过域名列表获取网站运行目录 + @author hwliang + @param domains 域名列表 + @return None or string + ''' + site_id = 0 + for domain in domains: + site_id = mw.M('domain').where( + "name=?", domain).getField('pid') + if site_id: + break + + if not site_id: + return None + return self.getSiteRunPathByid(site_id) + + # 清理验证文件 + def claerAuthFile(self, index): + if not self.__config['orders'][index]['auth_type'] in ['http', 'tls']: + return True + acme_path = '{}/.well-known/acme-challenge'.format( + self.__config['orders'][index]['auth_to']) + writeLog("|-验证目录:{}".format(acme_path)) + if os.path.exists(acme_path): + mw.execShell("rm -f {}/*".format(acme_path)) + + acme_path = mw.getServerDir() + '/stop/.well-known/acme-challenge' + if os.path.exists(acme_path): + mw.execShell("rm -f {}/*".format(acme_path)) + + # 获取域名验证方式 + def getAuthType(self, index): + if not index in self.__config['orders']: + raise Exception('指定订单不存在!') + s_type = 'http-01' + if 'auth_type' in self.__config['orders'][index]: + if self.__config['orders'][index]['auth_type'] == 'dns': + s_type = 'dns-01' + elif self.__config['orders'][index]['auth_type'] == 'tls': + s_type = 'tls-alpn-01' + else: + s_type = 'http-01' + return s_type + + # 构造验证信息 + def getIdentifierAuth(self, index, url, auth_info): + s_type = self.getAuthType(index) + writeLog("|-验证类型:{}".format(s_type)) + domain = auth_info['identifier']['value'] + wildcard = False + # 处理通配符 + if 'wildcard' in auth_info: + wildcard = auth_info['wildcard'] + if wildcard: + domain = "*." + domain + + for auth in auth_info['challenges']: + if auth['type'] != s_type: + continue + identifier_auth = { + "domain": domain, + "url": url, + "wildcard": wildcard, + "token": auth['token'], + "dns_challenge_url": auth['url'], + } + return identifier_auth + return None + + # 构造域名验证头和验证值 + def getKeyauthorization(self, token): + acme_header_jwk_json = json.dumps( + self.getAcmeHeader("GET_THUMBPRINT")["jwk"], sort_keys=True, separators=(",", ":") + ) + acme_thumbprint = self.calculateSafeBase64( + hashlib.sha256(acme_header_jwk_json.encode("utf8")).digest() + ) + acme_keyauthorization = "{0}.{1}".format(token, acme_thumbprint) + base64_of_acme_keyauthorization = self.calculateSafeBase64( + hashlib.sha256(acme_keyauthorization.encode("utf8")).digest() + ) + + return acme_keyauthorization, base64_of_acme_keyauthorization + + # 写验证文件 + def writeAuthFile(self, auth_to, token, acme_keyauthorization): + try: + self.D('writeAuthFile', auth_to) + acme_path = '{}/.well-known/acme-challenge'.format(auth_to) + if not os.path.exists(acme_path): + os.makedirs(acme_path) + mw.setOwn(acme_path, 'www') + wellknown_path = '{}/{}'.format(acme_path, token) + mw.writeFile(wellknown_path, acme_keyauthorization) + mw.setOwn(wellknown_path, 'www') + + acme_path = mw.getServerDir() + '/stop/.well-known/acme-challenge' + if not os.path.exists(acme_path): + os.makedirs(acme_path) + mw.setOwn(acme_path, 'www') + wellknown_path = '{}/{}'.format(acme_path, token) + mw.writeFile(wellknown_path, acme_keyauthorization) + mw.setOwn(wellknown_path, 'www') + return True + except: + err = mw.getTracebackInfo() + print(err) + raise Exception("写入验证文件失败: {}".format(err)) + + # 设置验证信息 + def setAuthInfo(self, identifier_auth): + + # 是否手动验证DNS + if identifier_auth['auth_to'] == 'dns': + return None + + # 是否文件验证 + if identifier_auth['type'] in ['http', 'tls']: + self.writeAuthFile(identifier_auth['auth_to'], identifier_auth[ + 'token'], identifier_auth['acme_keyauthorization']) + + # 获取验证信息 + def getAuths(self, index): + if not index in self.__config['orders']: + raise Exception('指定订单不存在!') + + # 检查是否已经获取过授权信息 + if 'auths' in self.__config['orders'][index]: + # 检查授权信息是否过期 + if time.time() < self.__config['orders'][index]['auths'][0]['expires']: + return self.__config['orders'][index]['auths'] + if self.__config['orders'][index]['auth_type'] != 'dns': + site_run_path = self.getSiteRunPath( + self.__config['orders'][index]['domains']) + if site_run_path: + self.__config['orders'][index]['auth_to'] = site_run_path + + # 清理旧验证 + self.claerAuthFile(index) + + auths = [] + for auth_url in self.__config['orders'][index]['authorizations']: + res = self.acmeRequest(auth_url, "") + if res.status_code not in [200, 201]: + raise Exception("获取授权失败: {}".format(res.json())) + + s_body = res.json() + if 'status' in s_body: + if s_body['status'] in ['invalid']: + raise Exception("无效订单,此订单当前为验证失败状态!") + if s_body['status'] in ['valid']: # 跳过无需验证的域名 + continue + + s_body['expires'] = self.utcToTime(s_body['expires']) + identifier_auth = self.getIdentifierAuth(index, auth_url, s_body) + if not identifier_auth: + raise Exception("验证信息构造失败!{}") + + acme_keyauthorization, auth_value = self.getKeyauthorization(identifier_auth[ + 'token']) + identifier_auth['acme_keyauthorization'] = acme_keyauthorization + identifier_auth['auth_value'] = auth_value + identifier_auth['expires'] = s_body['expires'] + identifier_auth['auth_to'] = self.__config[ + 'orders'][index]['auth_to'] + identifier_auth['type'] = self.__config[ + 'orders'][index]['auth_type'] + + # print(identifier_auth) + # 设置验证信息 + self.setAuthInfo(identifier_auth) + auths.append(identifier_auth) + self.__config['orders'][index]['auths'] = auths + self.saveConfig() + return auths + + # 更新随机数 + def updateReplayNonce(self, res): + replay_nonce = res.headers.get('Replay-Nonce') + if replay_nonce: + self.__replay_nonce = replay_nonce + + # 请求到ACME接口 + def acmeRequest(self, url, payload): + headers = {"User-Agent": self.__user_agent} + payload = self.stringfyItems(payload) + + if payload == "": + payload64 = payload + else: + payload64 = self.calculateSafeBase64(json.dumps(payload)) + protected = self.getAcmeHeader(url) + protected64 = self.calculateSafeBase64(json.dumps(protected)) + signature = self.signMessage( + message="{0}.{1}".format(protected64, payload64)) # bytes + signature64 = self.calculateSafeBase64(signature) # str + data = json.dumps( + {"protected": protected64, "payload": payload64, "signature": signature64}) + headers.update({"Content-Type": "application/jose+json"}) + response = requests.post( + url, data=data.encode("utf8"), timeout=self.__acme_timeout, headers=headers, verify=self.__verify + ) + # 更新随机数 + self.updateReplayNonce(response) + return response + + # UTC时间转时间戳 + def utcToTime(self, utc_string): + try: + utc_string = utc_string.split('.')[0] + utc_date = datetime.datetime.strptime( + utc_string, "%Y-%m-%dT%H:%M:%S") + # 按北京时间返回 + return int(time.mktime(utc_date.timetuple())) + (3600 * 8) + except: + return int(time.time() + 86400 * 7) + + # 保存订单 + def saveOrder(self, order_object, index): + if not 'orders' in self.__config: + self.__config['orders'] = {} + renew = False + if not index: + index = mw.md5(json.dumps(order_object['identifiers'])) + else: + renew = True + order_object['certificate_url'] = self.__config[ + 'orders'][index]['certificate_url'] + order_object['save_path'] = self.__config[ + 'orders'][index]['save_path'] + + order_object['expires'] = self.utcToTime(order_object['expires']) + self.__config['orders'][index] = order_object + self.__config['orders'][index]['index'] = index + if not renew: + self.__config['orders'][index]['create_time'] = int(time.time()) + self.__config['orders'][index]['renew_time'] = 0 + self.saveConfig() + return index + + def getError(self, error): + # 格式化错误输出 + if error.find("Max checks allowed") >= 0: + return "CA无法验证您的域名,请检查域名解析是否正确,或等待5-10分钟后重试." + elif error.find("Max retries exceeded with") >= 0 or error.find('status_code=0 ') != -1: + return "CA服务器连接超时,请稍候重试." + elif error.find("The domain name belongs") >= 0: + return "域名不属于此DNS服务商,请确保域名填写正确." + elif error.find('login token ID is invalid') >= 0: + return 'DNS服务器连接失败,请检查密钥是否正确.' + elif error.find('Error getting validation data') != -1: + return '数据验证失败,CA无法从验证连接中获到正确的验证码.' + elif "too many certificates already issued for exact set of domains" in error: + return '签发失败,该域名%s超出了每周的重复签发次数限制!' % re.findall("exact set of domains: (.+):", error) + elif "Error creating new account :: too many registrations for this IP" in error: + return '签发失败,当前服务器IP已达到每3小时最多创建10个帐户的限制.' + elif "DNS problem: NXDOMAIN looking up A for" in error: + return '验证失败,没有解析域名,或解析未生效!' + elif "Invalid response from" in error: + return '验证失败,域名解析错误或验证URL无法被访问!' + elif error.find('TLS Web Server Authentication') != -1: + return "连接CA服务器失败,请稍候重试." + elif error.find('Name does not end in a public suffix') != -1: + return "不支持的域名%s,请检查域名是否正确!" % re.findall("Cannot issue for \"(.+)\":", error) + elif error.find('No valid IP addresses found for') != -1: + return "域名%s没有找到解析记录,请检查域名是否解析生效!" % re.findall("No valid IP addresses found for (.+)", error) + elif error.find('No TXT record found at') != -1: + return "没有在域名%s中找到有效的TXT解析记录,请检查是否正确解析TXT记录,如果是DNSAPI方式申请的,请10分钟后重试!" % re.findall("No TXT record found at (.+)", error) + elif error.find('Incorrect TXT record') != -1: + return "在%s上发现错误的TXT记录:%s,请检查TXT解析是否正确,如果是DNSAPI方式申请的,请10分钟后重试!" % (re.findall("found at (.+)", error), re.findall("Incorrect TXT record \"(.+)\"", error)) + elif error.find('Domain not under you or your user') != -1: + return "这个dnspod账户下面不存在这个域名,添加解析失败!" + elif error.find('SERVFAIL looking up TXT for') != -1: + return "没有在域名%s中找到有效的TXT解析记录,请检查是否正确解析TXT记录,如果是DNSAPI方式申请的,请10分钟后重试!" % re.findall("looking up TXT for (.+)", error) + elif error.find('Timeout during connect') != -1: + return "连接超时,CA服务器无法访问您的网站!" + elif error.find("DNS problem: SERVFAIL looking up CAA for") != -1: + return "域名%s当前被要求验证CAA记录,请手动解析CAA记录,或1小时后重新尝试申请!" % re.findall("looking up CAA for (.+)", error) + elif error.find("Read timed out.") != -1: + return "验证超时,请检查域名是否正确解析,若已正确解析,可能服务器与Let'sEncrypt连接异常,请稍候再重试!" + elif error.find('Cannot issue for') != -1: + return "无法为{}颁发证书,不能直接用域名后缀申请通配符证书!".format(re.findall(r'for\s+"(.+)"', error)) + elif error.find('too many failed authorizations recently'): + return '该帐户1小时内失败的订单次数超过5次,请等待1小时再重试!' + elif error.find("Error creating new order") != -1: + return "订单创建失败,请稍候重试!" + elif error.find("Too Many Requests") != -1: + return "1小时内超过5次验证失败,暂时禁止申请,请稍候重试!" + elif error.find('HTTP Error 400: Bad Request') != -1: + return "CA服务器拒绝访问,请稍候重试!" + elif error.find('Temporary failure in name resolution') != -1: + return '服务器DNS故障,无法解析域名,请使用Linux工具箱检查dns配置' + elif error.find('Too Many Requests') != -1: + return '该域名请求申请次数过多,请3小时后重试' + else: + return error + + # 创建订单 + def createOrder(self, domains, auth_type, auth_to, index=None): + domains = self.formatDomains(domains) + if not domains: + raise Exception("至少需要有一个域名") + # 构造标识 + identifiers = [] + for domain_name in domains: + identifiers.append({"type": 'dns', "value": domain_name}) + payload = {"identifiers": identifiers} + + # 请求创建订单 + res = self.acmeRequest(self.__apis['newOrder'], payload) + if not res.status_code in [201]: # 如果创建失败 + e_body = res.json() + if 'type' in e_body: + # 如果随机数失效 + if e_body['type'].find('error:badNonce') != -1: + self.getNonce(force=True) + res = self.acmeRequest(self.__apis['newOrder'], payload) + + # 如果帐户失效 + if e_body['detail'].find('KeyID header contained an invalid account URL') != -1: + k = self._mod_index[self.__debug] + del(self.__config['account'][k]) + self.getKid() + self.getNonce(force=True) + res = self.acmeRequest(self.__apis['newOrder'], payload) + if not res.status_code in [201]: + a_auth = res.json() + + ret_title = self.getError(str(a_auth)) + raise StopIteration("{0} >>>> {1}".format( + ret_title, json.dumps(a_auth))) + + # 返回验证地址和验证 + s_json = res.json() + s_json['auth_type'] = auth_type + s_json['domains'] = domains + s_json['auth_to'] = auth_to + index = self.saveOrder(s_json, index) + return index + + # 检查验证状态 + def checkAuthStatus(self, url, desired_status=None): + desired_status = desired_status or ["pending", "valid", "invalid"] + number_of_checks = 0 + while True: + if desired_status == ['valid', 'invalid']: + writeLog("|-第{}次查询验证结果..".format(number_of_checks + 1)) + time.sleep(self.__wait_time) + check_authorization_status_response = self.acmeRequest(url, "") + a_auth = check_authorization_status_response.json() + authorization_status = a_auth["status"] + number_of_checks += 1 + if authorization_status in desired_status: + if authorization_status == "invalid": + writeLog("|-验证失败!") + try: + if 'error' in a_auth['challenges'][0]: + ret_title = a_auth['challenges'][ + 0]['error']['detail'] + elif 'error' in a_auth['challenges'][1]: + ret_title = a_auth['challenges'][ + 1]['error']['detail'] + elif 'error' in a_auth['challenges'][2]: + ret_title = a_auth['challenges'][ + 2]['error']['detail'] + else: + ret_title = str(a_auth) + ret_title = self.getError(ret_title) + except: + ret_title = str(a_auth) + raise StopIteration("{0} >>>> {1}".format( + ret_title, json.dumps(a_auth))) + break + + if number_of_checks == self._max_check_num: + raise StopIteration( + "错误:已尝试验证{0}次. 最大验证次数为{1}. 验证时间间隔为{2}秒.".format( + number_of_checks, + self.__max_check_num, + self.__wait_time + ) + ) + if desired_status == ['valid', 'invalid']: + writeLog("|-验证成功!") + return check_authorization_status_response + + # 发送验证请求 + def respondToChallenge(self, auth): + payload = {"keyAuthorization": "{0}".format( + auth['acme_keyauthorization'])} + respond_to_challenge_response = self.acmeRequest( + auth['dns_challenge_url'], payload) + return respond_to_challenge_response + + def authDomain(self, index): + if not index in self.__config['orders']: + raise Exception('指定订单不存在!') + + # 开始验证 + for auth in self.__config['orders'][index]['auths']: + res = self.checkAuthStatus(auth['url']) # 检查是否需要验证 + if res.json()['status'] == 'pending': + if auth['type'] == 'dns': # 尝试提前验证dns解析 + self.check_dns( + "_acme-challenge.{}".format( + auth['domain'].replace('*.', '')), + auth['auth_value'], + "TXT" + ) + self.respondToChallenge(auth) + + # 检查验证结果 + for i in range(len(self.__config['orders'][index]['auths'])): + self.checkAuthStatus(self.__config['orders'][index]['auths'][i]['url'], [ + 'valid', 'invalid']) + self.__config['orders'][index]['status'] = 'valid' + + # 构造可选名称 + def getAltNames(self, index): + domain_name = self.__config['orders'][index]['domains'][0] + domain_alt_names = [] + if len(self.__config['orders'][index]['domains']) > 1: + domain_alt_names = self.__config['orders'][index]['domains'][1:] + return domain_name, domain_alt_names + + # 创建CSR + def createCsr(self, index): + if 'csr' in self.__config['orders'][index]: + return self.__config['orders']['csr'] + domain_name, domain_alt_names = self.getAltNames(index) + X509Req = OpenSSL.crypto.X509Req() + X509Req.get_subject().CN = domain_name + if domain_alt_names: + SAN = "DNS:{0}, ".format(domain_name).encode("utf8") + ", ".join( + "DNS:" + i for i in domain_alt_names + ).encode("utf8") + else: + SAN = "DNS:{0}".format(domain_name).encode("utf8") + + X509Req.add_extensions( + [ + OpenSSL.crypto.X509Extension( + "subjectAltName".encode("utf8"), critical=False, value=SAN + ) + ] + ) + pk = OpenSSL.crypto.load_privatekey( + OpenSSL.crypto.FILETYPE_PEM, self.create_certificate_key( + index).encode() + ) + X509Req.set_pubkey(pk) + X509Req.set_version(2) + X509Req.sign(pk, self._digest) + return OpenSSL.crypto.dump_certificate_request(OpenSSL.crypto.FILETYPE_ASN1, X509Req) + + # 发送CSR + def sendCsr(self, index): + csr = self.createCsr(index) + payload = {"csr": self.calculateSafeBase64(csr)} + send_csr_response = self.acmeRequest( + url=self.__config['orders'][index]['finalize'], payload=payload) + if send_csr_response.status_code not in [200, 201]: + raise ValueError( + "错误: 发送CSR: 响应状态{status_code} 响应值:{response}".format( + status_code=send_csr_response.status_code, + response=send_csr_response.json(), + ) + ) + send_csr_response_json = send_csr_response.json() + certificate_url = send_csr_response_json["certificate"] + self.__config['orders'][index]['certificate_url'] = certificate_url + self.saveConfig() + return certificate_url + + # 转换时间 + def strfDate(self, sdate): + return time.strftime('%Y-%m-%d', time.strptime(sdate, '%Y%m%d%H%M%S')) + + # 证书转为DER + def dumpDer(self, cert_path): + cert = OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_PEM, public.readFile(cert_path + '/cert.csr')) + return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert) + + # 证书转为pkcs12 + def dumpPkcs12(self, key_pem=None, cert_pem=None, ca_pem=None, friendly_name=None): + p12 = OpenSSL.crypto.PKCS12() + if cert_pem: + p12.set_certificate(OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_PEM, cert_pem.encode())) + if key_pem: + p12.set_privatekey(OpenSSL.crypto.load_privatekey( + OpenSSL.crypto.FILETYPE_PEM, key_pem.encode())) + if ca_pem: + p12.set_ca_certificates((OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_PEM, ca_pem.encode()),)) + if friendly_name: + p12.set_friendlyname(friendly_name.encode()) + return p12.export() + + # 拆分根证书 + def splitCaData(self, cert): + sp_key = '-----END CERTIFICATE-----\n' + datas = cert.split(sp_key) + return {"cert": datas[0] + sp_key, "root": sp_key.join(datas[1:])} + + # 获取指定证书基本信息 + def getCertInit(self, pem_file): + if not os.path.exists(pem_file): + return None + try: + result = {} + x509 = OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_PEM, public.readFile(pem_file)) + # 取产品名称 + issuer = x509.get_issuer() + result['issuer'] = '' + if hasattr(issuer, 'CN'): + result['issuer'] = issuer.CN + if not result['issuer']: + is_key = [b'0', '0'] + issue_comp = issuer.get_components() + if len(issue_comp) == 1: + is_key = [b'CN', 'CN'] + for iss in issue_comp: + if iss[0] in is_key: + result['issuer'] = iss[1].decode() + break + # 取到期时间 + result['notAfter'] = self.strfDate( + bytes.decode(x509.get_notAfter())[:-1]) + # 取申请时间 + result['notBefore'] = self.strfDate( + bytes.decode(x509.get_notBefore())[:-1]) + # 取可选名称 + result['dns'] = [] + for i in range(x509.get_extension_count()): + s_name = x509.get_extension(i) + if s_name.get_short_name() in [b'subjectAltName', 'subjectAltName']: + s_dns = str(s_name).split(',') + for d in s_dns: + result['dns'].append(d.split(':')[1]) + subject = x509.get_subject().get_components() + # 取主要认证名称 + if len(subject) == 1: + result['subject'] = subject[0][1].decode() + else: + result['subject'] = result['dns'][0] + return result + except: + return None + + # 替换服务器上的同域名同品牌证书 + def subAllCert(self, key_file, pem_file): + cert_init = self.get_cert_init(pem_file) # 获取新证书的基本信息 + paths = ['/www/server/mdserver-web/vhost/cert', + '/www/server/mdserver-web/vhost/ssl', + '/www/server/mdserver-web'] + is_panel = False + for path in paths: + if not os.path.exists(path): + continue + for p_name in os.listdir(path): + to_path = path + '/' + p_name + to_pem_file = to_path + '/fullchain.pem' + to_key_file = to_path + '/privkey.pem' + to_info = to_path + '/info.json' + # 判断目标证书是否存在 + if not os.path.exists(to_pem_file): + if not p_name in ['ssl']: + continue + to_pem_file = to_path + '/certificate.pem' + to_key_file = to_path + '/privateKey.pem' + if not os.path.exists(to_pem_file): + continue + if path == paths[-1]: + is_panel = True + # 获取目标证书的基本信息 + to_cert_init = self.getCertInit(to_pem_file) + # 判断证书品牌是否一致 + try: + if to_cert_init['issuer'] != cert_init['issuer'] and to_cert_init['issuer'].find("Let's Encrypt") == -1 and to_cert_init['issuer'] != 'R3': + continue + except: + continue + # 判断目标证书的到期时间是否较早 + if to_cert_init['notAfter'] > cert_init['notAfter']: + continue + # 判断认识名称是否一致 + if len(to_cert_init['dns']) != len(cert_init['dns']): + continue + is_copy = True + for domain in to_cert_init['dns']: + if not domain in cert_init['dns']: + is_copy = False + if not is_copy: + continue + + # 替换新的证书文件和基本信息 + mw.writeFile(to_pem_file, mw.readFile(pem_file, 'rb'), 'wb') + mw.writeFile(to_key_file, mw.readFile(key_file, 'rb'), 'wb') + mw.writeFile(to_info, json.dumps(cert_init)) + writeLog("|-检测到{}下的证书与本次申请的证书重叠,且到期时间较早,已替换为新证书!".format(to_path)) + # 重载web服务 + mw.restartWeb() + + # 保存证书到文件 + def saveCert(self, cert, index): + try: + domain_name = self.__config['orders'][index]['domains'][0] + path = self.__config['orders'][index]['save_path'] + if not os.path.exists(path): + os.makedirs(path, 384) + + # 存储证书 + key_file = path + "/privkey.pem" + pem_file = path + "/fullchain.pem" + mw.writeFile(key_file, cert['private_key']) + mw.writeFile(pem_file, cert['cert'] + cert['root']) + mw.writeFile(path + "/cert.csr", cert['cert']) + mw.writeFile(path + "/root_cert.csr", cert['root']) + + # 转为IIS证书 + pfx_buffer = self.dumpPkcs12( + cert['private_key'], cert['cert'] + cert['root'], cert['root'], domain_name) + mw.writeFile(path + "/fullchain.pfx", pfx_buffer, 'wb+') + + ps = '''文件说明: +privkey.pem 证书私钥 +fullchain.pem 包含证书链的PEM格式证书(nginx/apache) +root_cert.csr 根证书 +cert.csr 域名证书 +fullchain.pfx 用于IIS的证书格式 + +如何在宝塔面板使用: +privkey.pem 粘贴到密钥输入框 +fullchain.pem 粘贴到证书输入框 +''' + mw.writeFile(path + '/说明.txt', ps) + self.subAllCert(key_file, pem_file) + except: + writeLog(public.getTracebackInfo()) + + # 下载证书 + def downloadCert(self, index): + res = self.acmeRequest( + self.__config['orders'][index]['certificate_url'], "") + if res.status_code not in [200, 201]: + raise Exception("下载证书失败: {}".format(res.json())) + + pem_certificate = res.content + if type(pem_certificate) == bytes: + pem_certificate = pem_certificate.decode('utf-8') + cert = self.split_ca_data(pem_certificate) + cert['cert_timeout'] = self.get_cert_timeout(cert['cert']) + cert['private_key'] = self.__config['orders'][index]['private_key'] + cert['domains'] = self.__config['orders'][index]['domains'] + del(self.__config['orders'][index]['private_key']) + del(self.__config['orders'][index]['auths']) + del(self.__config['orders'][index]['expires']) + del(self.__config['orders'][index]['authorizations']) + del(self.__config['orders'][index]['finalize']) + del(self.__config['orders'][index]['identifiers']) + if 'cert' in self.__config['orders'][index]: + del(self.__config['orders'][index]['cert']) + self.__config['orders'][index]['status'] = 'valid' + self.__config['orders'][index]['cert_timeout'] = cert['cert_timeout'] + domain_name = self.__config['orders'][index]['domains'][0] + self.__config['orders'][index]['save_path'] = '{}/{}'.format( + self._save_path, domain_name) + cert['save_path'] = self.__config['orders'][index]['save_path'] + self.saveConfig() + self.saveCert(cert, index) + return cert + + def applyCert(self, domains, auth_type='dns', auth_to='Dns_com|None|None', args={}): + writeLog("", "wb+") + try: + self.getApis() + index = None + if args.index: + index = args.index + if not index: + # 判断是否只想验证域名 + writeLog("|-正在创建订单..") + index = self.createOrder(domains, auth_type, auth_to) + writeLog("|-正在获取验证信息..") + self.getAuths(index) + if auth_to == 'dns' and len(self.__config['orders'][index]['auths']) > 0: + return self.__config['orders'][index] + writeLog("|-正在验证域名..") + self.authDomain(index) + + writeLog("|-正在发送CSR..") + self.sendCsr(index) + + writeLog("|-正在下载证书..") + cert = self.downloadCert(index) + + # 保存证书配置 + self.saveConfig() + cert['status'] = True + cert['msg'] = '申请成功!' + writeLog("|-申请成功,正在部署到站点..") + return cert + except Exception as ex: + ex = str(ex) + if ex.find(">>>>") != -1: + msg = ex.split(">>>>") + msg[1] = json.loads(msg[1]) + else: + msg = ex + writeLog(mw.getTracebackInfo()) + return mw.returnJson(False, msg) + + def renewCert(self, index): + writeLog("", "wb+") + self.D('renew_cert', index) + try: + order_index = [] + if index: + if type(index) != str: + index = index.index + if not index in self.__config['orders']: + raise Exception("指定订单号不存在,无法续签!") + order_index.append(index) + self.D('renew_cert', order_index) + except Exception as e: + start_time = time.time() + (30 * 86400) + if not 'orders' in self.__config: + self.__config['orders'] = {} + self.D('renew_cert', e) + + def do(self, args): + cert = None + try: + if not args.index: + if not args.domains: + echoErr("请在--domain参数中指定要申请证书的域名,多个以逗号(,)隔开") + if not args.auth_type in ['http', 'tls', 'dns']: + echoErr("请在--type参数中指定正确的验证类型,支持dns和http") + + auth_to = '' + if args.auth_type in ['http', 'tls']: + if not args.path: + echo_err("请在--path参数中指定网站根目录!") + if not os.path.exists(args.path): + echo_err("指定网站根目录不存在,请检查:{}".format(args.path)) + auth_to = args.path + else: + if args.dnsapi == '0': + auth_to = 'dns' + else: + if not args.key: + echo_err("使用dnsapi申请时请在--dns_key参数中指定dnsapi的key!") + if not args.secret: + echo_err( + "使用dnsapi申请时请在--dns_secret参数中指定dnsapi的secret!") + auth_to = "{}|{}|{}".format( + args.dnsapi, args.key, args.secret) + + domains = args.domains.strip().split(',') + cert = self.applyCert( + domains, auth_type=args.auth_type, auth_to=auth_to, args=args) + else: + # 重新验证 + cert = self.applyCert([], auth_type='dns', + auth_to='dns', index=args.index) + except Exception as e: + writeLog("|-{}".format(mw.getTracebackInfo())) + exit() + + if not cert: + exit() + + # print(cert) + writeLog("=" * 65) + writeLog("|-证书获取成功!") + writeLog("=" * 65) + writeLog("证书到期时间: {}".format( + mw.formatDate(times=cert['cert_timeout']))) + writeLog("证书已保存在: {}/".format(cert['save_path'])) + + +# exp: +''' + +python3 class/core/cert_request.py --domain=dev38.cachecha.com --type=http --path=/www/wwwroot/ + +python3 class/core/cert_request.py --domain=dev38.cachecha.com --type=http --path=/Users/midoks/Desktop/mwdev/wwwroot/test + +''' +if __name__ == "__main__": + p = argparse.ArgumentParser(usage="必要的参数:--domain 域名列表,多个以逗号隔开!") + p.add_argument('--domain', default=None, + help="请指定要申请证书的域名", dest="domains") + p.add_argument('--type', default=None, help="请指定验证类型", dest="auth_type") + p.add_argument('--path', default=None, help="请指定网站根目录", dest="path") + p.add_argument('--dnsapi', default=None, help="请指定DNSAPI", dest="dnsapi") + p.add_argument('--dns_key', default=None, help="请指定DNSAPI的key", dest="key") + p.add_argument('--dns_secret', default=None, + help="请指定DNSAPI的secret", dest="secret") + p.add_argument('--index', default=None, help="指定订单索引", dest="index") + p.add_argument('--renew', default=None, help="续签证书", dest="renew") + p.add_argument('--revoke', default=None, help="吊销证书", dest="revoke") + + args = p.parse_args() + cr = cert_request() + + if args.revoke: + if not args.index: + echoErr("请在--index参数中传入要被吊销的订单索引") + result = cr.revokeOrder(args.index) + writeLog(result) + exit() + + if args.renew: + cr.renewCert(args.index) + exit() + + cr.do(args)