pull/632/head
Mr Chen 6 months ago
parent 43420b7c38
commit d5a8efff0e
  1. 8
      web/admin/setting/notify_email.py
  2. 168
      web/core/mw.py
  3. 43
      web/utils/email.py

@ -56,8 +56,14 @@ def set_notify_email():
@blueprint.route('/set_notify_email_test', endpoint='set_notify_email_test', methods=['POST'])
@panel_login_required
def set_notify_email_test():
return mw.returnData(True,'设置成功')
tag = request.form.get('tag', '').strip()
tag_data = request.form.get('data', '').strip()
data = json.loads(tag_data)
test_pass = mw.emailNotifyTest(data)
if test_pass:
return mw.returnData(True, '验证成功')
return mw.returnData(False, '验证失败')
# 切换邮件开关
@blueprint.route('/set_notify_email_enable', endpoint='set_notify_email_enable', methods=['POST'])

@ -1106,6 +1106,174 @@ def getMyORM():
# 数据库 START
# ---------------------------------------------------------------------------------
##################### notify start #########################################
def initNotifyConfig():
p = getNotifyPath()
if not os.path.exists(p):
writeFile(p, '{}')
return True
def getNotifyPath():
path = 'data/notify.json'
return path
def getNotifyData(is_parse=False):
initNotifyConfig()
notify_file = getNotifyPath()
notify_data = readFile(notify_file)
data = json.loads(notify_data)
if is_parse:
tag_list = ['tgbot', 'email']
for t in tag_list:
if t in data and 'cfg' in data[t]:
data[t]['data'] = json.loads(deDoubleCrypt(t, data[t]['cfg']))
return data
def writeNotify(data):
p = getNotifyPath()
return writeFile(p, json.dumps(data))
def tgbotNotifyChatID():
data = getNotifyData(True)
if 'tgbot' in data and 'enable' in data['tgbot']:
if data['tgbot']['enable']:
t = data['tgbot']['data']
return t['chat_id']
return ''
def tgbotNotifyObject():
data = getNotifyData(True)
if 'tgbot' in data and 'enable' in data['tgbot']:
if data['tgbot']['enable']:
t = data['tgbot']['data']
import telebot
bot = telebot.TeleBot(app_token)
return True, bot
return False, None
def tgbotNotifyMessage(app_token, chat_id, msg):
import telebot
bot = telebot.TeleBot(app_token)
try:
data = bot.send_message(chat_id, msg)
return True
except Exception as e:
writeFileLog(str(e))
return False
def tgbotNotifyHttpPost(app_token, chat_id, msg):
try:
url = 'https://api.telegram.org/bot' + app_token + '/sendMessage'
post_data = {
'chat_id': chat_id,
'text': msg,
}
rdata = httpPost(url, post_data)
return True
except Exception as e:
writeFileLog(str(e))
return False
def tgbotNotifyTest(app_token, chat_id):
msg = 'MW-通知验证测试OK'
return tgbotNotifyHttpPost(app_token, chat_id, msg)
def emailNotifyMessage(data):
'''
邮件通知
'''
import utils.email as email
try:
if data['smtp_ssl'] == 'ssl':
email.sendSSL(data['smtp_host'], data['smtp_port'],
data['username'], data['password'],
data['to_mail_addr'], data['subject'], data['content'])
else:
email.send(data['smtp_host'], data['smtp_port'],
data['username'], data['password'],
data['to_mail_addr'], data['subject'], data['content'])
return True
except Exception as e:
print(getTracebackInfo())
return False
def emailNotifyTest(data):
# print(data)
data['subject'] = 'MW通知测试'
data['content'] = data['mail_test']
return emailNotifyMessage(data)
def notifyMessageTry(msg, stype='common', trigger_time=300, is_write_log=True):
lock_file = getPanelTmp() + '/notify_lock.json'
if not os.path.exists(lock_file):
writeFile(lock_file, '{}')
lock_data = json.loads(readFile(lock_file))
if stype in lock_data:
diff_time = time.time() - lock_data[stype]['do_time']
if diff_time >= trigger_time:
lock_data[stype]['do_time'] = time.time()
else:
return False
else:
lock_data[stype] = {'do_time': time.time()}
writeFile(lock_file, json.dumps(lock_data))
if is_write_log:
writeLog("通知管理[" + stype + "]", msg)
data = getNotifyData(True)
# tag_list = ['tgbot', 'email']
# tagbot
do_notify = False
if 'tgbot' in data and 'enable' in data['tgbot']:
if data['tgbot']['enable']:
t = data['tgbot']['data']
i = sys.version_info
# telebot 在python小于3.7无法使用
if i[0] < 3 or i[1] < 7:
do_notify = tgbotNotifyHttpPost(
t['app_token'], t['chat_id'], msg)
else:
do_notify = tgbotNotifyMessage(
t['app_token'], t['chat_id'], msg)
if 'email' in data and 'enable' in data['email']:
if data['email']['enable']:
t = data['email']['data']
t['subject'] = 'MW通知'
t['content'] = msg
do_notify = emailNotifyMessage(t)
return do_notify
def notifyMessage(msg, stype='common', trigger_time=300, is_write_log=True):
try:
return notifyMessageTry(msg, stype, trigger_time, is_write_log)
except Exception as e:
writeFileLog(getTracebackInfo())
return False
##################### notify end #########################################
# ---------------------------------------------------------------------------------
# 打印相关 START

@ -0,0 +1,43 @@
# coding: utf-8
import smtplib
from email import encoders
from email.header import Header
from email.mime.text import MIMEText
from email.utils import parseaddr, formataddr
def _format_addr(s):
name, addr = parseaddr(s)
return formataddr((Header(name, 'utf-8').encode(), addr))
def send(smtp_host, smtp_port, username, password, to_mail, subject, content):
smtp = smtplib.SMTP()
smtp.connect(smtp_host, port=smtp_port)
smtp.login(user=username, password=password)
msg = MIMEText(content, 'plain', 'utf-8')
msg['From'] = _format_addr(username)
msg['To'] = _format_addr(to_mail)
msg['Subject'] = Header(subject, 'utf-8').encode()
smtp.sendmail(from_addr=username, to_addrs=to_mail, msg=msg.as_string())
return True
def sendSSL(smtp_host, smtp_port, username, password, to_mail, subject, content):
smtp = smtplib.SMTP_SSL(smtp_host, port=smtp_port)
smtp.login(user=username, password=password)
msg = MIMEText(content, 'plain', 'utf-8')
msg['From'] = _format_addr(username)
msg['To'] = _format_addr(to_mail)
msg['Subject'] = Header(subject, 'utf-8').encode()
smtp.sendmail(from_addr=username, to_addrs=to_mail, msg=msg.as_string())
smtp.quit()
return True
Loading…
Cancel
Save