pull/597/head
Mr Chen 11 months ago
parent a542c95433
commit e7e160c8e1
  1. 400
      plugins/mysql-apt/index.py
  2. 1
      plugins/mysql/index.py

@ -2670,6 +2670,290 @@ def dumpMysqlData(version=''):
return 'ok'
return 'fail'
############### --- 重要 数据补足同步 ---- ###########
def getSyncMysqlDB(dbname,sign = ''):
conn = pSqliteDb('slave_sync_user')
if sign != '':
data = conn.field('ip,port,user,pass,mode,cmd').where('ip=?', (sign,)).find()
else:
data = conn.field('ip,port,user,pass,mode,cmd').find()
user = data['user']
apass = data['pass']
port = data['port']
ip = data['ip']
# 远程数据
sync_db = mw.getMyORM()
# MySQLdb |
sync_db.setPort(port)
sync_db.setHost(ip)
sync_db.setUser(user)
sync_db.setPwd(apass)
sync_db.setDbName(dbname)
sync_db.setTimeout(60)
return sync_db
def syncDatabaseRepairTempFile():
tmp_log = mw.getMWLogs()+ '/mysql-check.log'
return tmp_log
def syncDatabaseRepairLog(version=''):
import subprocess
args = getArgs()
data = checkArgs(args, ['db','sign','op'])
if not data[0]:
return data[1]
sync_args_db = args['db']
sync_args_sign = args['sign']
op = args['op']
tmp_log = syncDatabaseRepairTempFile()
cmd = 'cd '+mw.getServerDir()+'/mdserver-web && source bin/activate && python3 plugins/mysql/index.py sync_database_repair {"db":"'+sync_args_db+'","sign":"'+sync_args_sign+'"}'
# print(cmd)
if op == 'get':
log = mw.getLastLine(tmp_log, 15)
return mw.returnJson(True, log)
if op == 'cmd':
return mw.returnJson(True, 'ok', cmd)
if op == 'do':
os.system(' echo "开始执行" > '+ tmp_log)
os.system(cmd +' >> '+ tmp_log +' &')
return mw.returnJson(True, 'ok')
return mw.returnJson(False, '无效请求!')
def syncDatabaseRepair(version=''):
time_stats_s = time.time()
tmp_log = syncDatabaseRepairTempFile()
from pymysql.converters import escape_string
args = getArgs()
data = checkArgs(args, ['db','sign'])
if not data[0]:
return data[1]
sync_args_db = args['db']
sync_args_sign = args['sign']
# 本地数据
local_db = pMysqlDb()
# 远程数据
sync_db = getSyncMysqlDB(sync_args_db,sync_args_sign)
tables = local_db.query('show tables from `%s`' % sync_args_db)
table_key = "Tables_in_" + sync_args_db
inconsistent_table = []
tmp_dir = '/tmp/sync_db_repair'
mw.execShell('mkdir -p '+tmp_dir)
for tb in tables:
table_name = sync_args_db+'.'+tb[table_key]
table_check_file = tmp_dir+'/'+table_name+'.txt'
if os.path.exists(table_check_file):
# print(table_name+', 已检查OK')
continue
primary_key_sql = "SHOW INDEX FROM "+table_name+" WHERE Key_name = 'PRIMARY';";
primary_key_data = local_db.query(primary_key_sql)
# print(primary_key_sql,primary_key_data)
pkey_name = '*'
if len(primary_key_data) == 1:
pkey_name = primary_key_data[0]['Column_name']
# print(pkey_name)
if pkey_name != '*' :
# 智能校验(由于服务器同步可能会慢,比较总数总是对不上)
cmd_local_newpk_sql = 'select ' + pkey_name + ' from ' + table_name + " order by " + pkey_name + " desc limit 1"
cmd_local_newpk_data = local_db.query(cmd_local_newpk_sql)
# print(cmd_local_newpk_data)
if len(cmd_local_newpk_data) == 1:
# 比较总数
cmd_count_sql = 'select count('+pkey_name+') as num from '+table_name + ' where '+pkey_name + ' <= '+ str(cmd_local_newpk_data[0][pkey_name])
local_count_data = local_db.query(cmd_count_sql)
sync_count_data = sync_db.query(cmd_count_sql)
if local_count_data != sync_count_data:
print(cmd_count_sql)
print("all data compare: ",local_count_data, sync_count_data)
else:
print(table_name+' smart compare check ok.')
mw.writeFile(tmp_log, table_name+' smart compare check ok.\n','a+')
mw.execShell("echo 'ok' > "+table_check_file)
continue
# 比较总数
cmd_count_sql = 'select count('+pkey_name+') as num from '+table_name
local_count_data = local_db.query(cmd_count_sql)
sync_count_data = sync_db.query(cmd_count_sql)
if local_count_data != sync_count_data:
print("all data compare: ",local_count_data, sync_count_data)
inconsistent_table.append(table_name)
diff = sync_count_data[0]['num'] - local_count_data[0]['num']
print(table_name+', need sync. diff,'+str(diff))
mw.writeFile(tmp_log, table_name+', need sync. diff,'+str(diff)+'\n','a+')
else:
print(table_name+' check ok.')
mw.writeFile(tmp_log, table_name+' check ok.\n','a+')
mw.execShell("echo 'ok' > "+table_check_file)
# inconsistent_table = ['xx.xx']
# 数据对齐
for table_name in inconsistent_table:
is_break = False
while not is_break:
local_db.ping()
# 远程数据
sync_db.ping()
print("check table:"+table_name)
mw.writeFile(tmp_log, "check table:"+table_name+'\n','a+')
table_name_pos = 0
table_name_pos_file = tmp_dir+'/'+table_name+'.pos.txt'
primary_key_sql = "SHOW INDEX FROM "+table_name+" WHERE Key_name = 'PRIMARY';";
primary_key_data = local_db.query(primary_key_sql)
pkey_name = primary_key_data[0]['Column_name']
if os.path.exists(table_name_pos_file):
table_name_pos = mw.readFile(table_name_pos_file)
data_select_sql = 'select * from '+table_name + ' where '+pkey_name+' > '+str(table_name_pos)+' limit 10000'
print(data_select_sql)
local_select_data = local_db.query(data_select_sql)
time_s = time.time()
sync_select_data = sync_db.query(data_select_sql)
print(f'sync query cos:{time.time() - time_s:.4f}s')
mw.writeFile(tmp_log, f'sync query cos:{time.time() - time_s:.4f}s\n','a+')
# print(local_select_data)
# print(sync_select_data)
# print(len(local_select_data))
# print(len(sync_select_data))
print('pos:',str(table_name_pos),'local compare sync,',local_select_data == sync_select_data)
cmd_count_sql = 'select count('+pkey_name+') as num from '+table_name
local_count_data = local_db.query(cmd_count_sql)
time_s = time.time()
sync_count_data = sync_db.query(cmd_count_sql)
print(f'sync count data cos:{time.time() - time_s:.4f}s')
print(local_count_data,sync_count_data)
# 数据同步有延迟,相等即任务数据补足完成
if local_count_data[0]['num'] == sync_count_data[0]['num']:
is_break = True
break
diff = sync_count_data[0]['num'] - local_count_data[0]['num']
print("diff," + str(diff)+' line data!')
if local_select_data == sync_select_data:
data_count = len(local_select_data)
if data_count == 0:
# mw.writeFile(table_name_pos_file, '0')
print(table_name+",data is equal ok..")
is_break = True
break
# print(table_name,data_count)
pos = local_select_data[data_count-1][pkey_name]
print('pos',pos)
progress = pos/sync_count_data[0]['num']
print('progress,%.2f' % progress+'%')
mw.writeFile(table_name_pos_file, str(pos))
else:
sync_select_data_len = len(sync_select_data)
skip_idx = 0
# 主库PK -> 查询本地 | 保证一致
if sync_select_data_len > 0:
for idx in range(sync_select_data_len):
sync_idx_data = sync_select_data[idx]
local_idx_data = None
if idx in local_select_data:
local_idx_data = local_select_data[idx]
if sync_select_data[idx] == local_idx_data:
skip_idx = idx
pos = local_select_data[idx][pkey_name]
mw.writeFile(table_name_pos_file, str(pos))
# print(insert_data)
local_inquery_sql = 'select * from ' + table_name+ ' where ' +pkey_name+' = '+ str(sync_idx_data[pkey_name])
# print(local_inquery_sql)
ldata = local_db.query(local_inquery_sql)
# print('ldata:',ldata)
if len(ldata) == 0:
print("id:"+ str(sync_idx_data[pkey_name])+ " no exists, insert")
insert_sql = 'insert into ' + table_name
field_str = ''
value_str = ''
for field in sync_idx_data:
field_str += '`'+field+'`,'
value_str += '\''+escape_string(str(sync_idx_data[field]))+'\','
field_str = '(' +field_str.strip(',')+')'
value_str = '(' +value_str.strip(',')+')'
insert_sql = insert_sql+' '+field_str+' values'+value_str+';'
print(insert_sql)
r = local_db.execute(insert_sql)
print(r)
else:
# print('compare sync->local:',sync_idx_data == ldata[0] )
if ldata[0] == sync_idx_data:
continue
print("id:"+ str(sync_idx_data[pkey_name])+ " data is not equal, update")
update_sql = 'update ' + table_name
field_str = ''
value_str = ''
for field in sync_idx_data:
if field == pkey_name:
continue
field_str += '`'+field+'`=\''+escape_string(str(sync_idx_data[field]))+'\','
field_str = field_str.strip(',')
update_sql = update_sql+' set '+field_str+' where '+pkey_name+'=\''+str(sync_idx_data[pkey_name])+'\';'
print(update_sql)
r = local_db.execute(update_sql)
print(r)
# 本地PK -> 查询主库 | 保证一致
# local_select_data_len = len(local_select_data)
# if local_select_data_len > 0:
# for idx in range(local_select_data_len):
# if idx < skip_idx:
# continue
# local_idx_data = local_select_data[idx]
# print('local idx check', idx, skip_idx)
# local_inquery_sql = 'select * from ' + table_name+ ' where ' +pkey_name+' = '+ str(local_idx_data[pkey_name])
# print(local_inquery_sql)
# sdata = sync_db.query(local_inquery_sql)
# sdata_len = len(sdata)
# print('sdata:',sdata,sdata_len)
# if sdata_len == 0:
# delete_sql = 'delete from ' + table_name + ' where ' +pkey_name+' = '+ str(local_idx_data[pkey_name])
# print(delete_sql)
# r = local_db.execute(delete_sql)
# print(r)
# break
if is_break:
print("break all")
break
time.sleep(3)
print(f'data check cos:{time.time() - time_stats_s:.4f}s')
print("data supplementation completed")
mw.execShell('rm -rf '+tmp_dir)
return 'ok'
############### --- 重要 同步---- ###########
@ -2694,6 +2978,42 @@ def doFullSync(version=''):
if mode == 'sync-user':
return doFullSyncUser(version)
def isSimpleSyncCmd(sql):
new_sql = sql.lower()
if new_sql.find('master_auto_position') > 0:
return False
return True
def getChannelNameForCmd(cmd):
cmd = cmd.lower()
cmd_arr = cmd.split('channel')
if len(cmd_arr) == 2:
cmd_channel_info = cmd_arr[1]
channel_name = cmd_channel_info.strip()
channel_name = channel_name.strip(';')
channel_name = channel_name.strip("'")
return channel_name
return ''
def doFullSyncUserImportContentForChannel(file, channel_name):
# print(file, channel_name)
content = mw.readFile(file)
content = content.replace('STOP SLAVE;', "STOP SLAVE for channel '{}';".format(channel_name))
content = content.replace('START SLAVE;', "START SLAVE for channel '{}';".format(channel_name))
find_head = "CHANGE MASTER TO "
find_re = find_head+"(.*?);"
find_r = re.search(find_re, content, re.I|re.M)
if find_r:
find_rg = find_r.groups()
if len(find_rg)>0:
find_str = find_head+find_rg[0]
if find_str.lower().find('channel')==-1:
content = content.replace(find_str+';', find_str+" for channel '{}';".format(channel_name))
mw.writeFile(file,content)
return True
def doFullSyncUser(version=''):
args = getArgs()
@ -2727,8 +3047,14 @@ def doFullSyncUser(version=''):
apass = data['pass']
port = data['port']
ip = data['ip']
cmd = data['cmd']
channel_name = getChannelNameForCmd(cmd)
sync_mdb = getSyncMysqlDB(sync_db,sync_sign)
bak_file = '/tmp/tmp.sql'
if os.path.exists(bak_file):
os.system("rm -rf " + bak_file)
writeDbSyncStatus({'code': 0, 'msg': '开始同步...', 'progress': 0})
dmp_option = ''
@ -2736,32 +3062,88 @@ def doFullSyncUser(version=''):
if mode == 'gtid':
dmp_option = ' --set-gtid-purged=off '
writeDbSyncStatus({'code': 1, 'msg': '远程导出数据...', 'progress': 20})
time.sleep(1)
writeDbSyncStatus({'code': 1, 'msg': '正在停止从库...', 'progress': 15})
mdb8 = ['8.0','8.1','8.2','8.3','8.4']
if mw.inArray(mdb8,version):
db.query("stop slave user='{}' password='{}';".format(user, apass))
else:
db.query("stop slave")
time.sleep(1)
writeDbSyncStatus({'code': 2, 'msg': '远程导出数据...', 'progress': 20})
find_run_dump = mw.execShell('ps -ef | grep mysqldump | grep -v grep')
if find_run_dump[0] != "":
print("正在远程导出数据中,别着急...")
writeDbSyncStatus({'code': 3.1, 'msg': '正在远程导出数据中,别着急...', 'progress': 19})
return False
time_s = time.time()
if not os.path.exists(bak_file):
dump_sql_data = getServerDir() + "/bin/usr/bin/mysqldump " + dmp_option + " --force --opt --default-character-set=utf8 --single-transaction -h" + ip + " -P" + \
port + " -u" + user + " -p\"" + apass + \
"\" --ssl-mode=DISABLED " + sync_db + " > " + bak_file
# print(dump_sql_data)
mw.execShell(dump_sql_data)
if isSimpleSyncCmd(cmd):
dmp_option += " --master-data=1 --apply-slave-statements --include-master-host-port "
else:
dmp_option += ' '
dump_sql_data = getServerDir() + "/bin/usr/bin/mysqldump --single-transaction --default-character-set=utf8mb4 --compress -q " + dmp_option + " -h" + \
ip + " -P" + port + " -u" + user + ' -p"' + apass + '" --ssl-mode=DISABLED ' + sync_db + " > " + bak_file
print(dump_sql_data)
time_s = time.time()
r = mw.execShell(dump_sql_data)
print(r)
time_e = time.time()
export_cos = time_e - time_s
print("export cos:", export_cos)
writeDbSyncStatus({'code': 3, 'msg': '导出耗时:'+str(int(export_cos))+'秒,正在到本地导入数据中...', 'progress': 40})
find_run_import = mw.execShell('ps -ef | grep mysql| grep '+ bak_file +' | grep -v grep')
if find_run_import[0] != "":
print("正在导入数据中,别着急...")
writeDbSyncStatus({'code': 4.1, 'msg': '正在导入数据中,别着急...', 'progress': 39})
return False
writeDbSyncStatus({'code': 2, 'msg': '本地导入数据...', 'progress': 40})
time_s = time.time()
if os.path.exists(bak_file):
# 重置
db.execute('reset master')
if channel_name != '':
doFullSyncUserImportContentForChannel(bak_file, channel_name)
pwd = pSqliteDb('config').where('id=?', (1,)).getField('mysql_root')
sock = getSocketFile()
if is_exist_pv:
my_import_cmd = getServerDir() + '/bin/usr/bin/mysql -S ' + sock + " -uroot -p'" + pwd + "' " + sync_db_import
my_import_cmd = "pv -t -p " + bak_file + '|' + my_import_cmd
print(my_import_cmd)
os.system(my_import_cmd)
else:
my_import_cmd = getServerDir() + '/bin/usr/bin/mysql -S ' + sock + " -uroot -p'" + pwd + "' " + sync_db_import + ' < ' + bak_file
print(my_import_cmd)
mw.execShell(my_import_cmd)
my_import_cmd = getServerDir() + '/bin/usr/bin/mysql -S ' + sock + ' -uroot -p' + pwd + \
' ' + sync_db_import + ' < ' + bak_file
mw.execShell(my_import_cmd)
if version == '8.0':
if mw.inArray(mdb8, version):
db.query("start slave user='{}' password='{}';".format(user, apass))
else:
db.query("start slave")
writeDbSyncStatus({'code': 6, 'msg': '从库重启完成...', 'progress': 100})
db.query("start all slaves")
time_all_e = time.time()
cos = time_all_e - time_all_s
writeDbSyncStatus({'code': 6, 'msg': '总耗时:'+str(int(cos))+'秒,从库重启完成...', 'progress': 100})
if os.path.exists(bak_file):
os.system("rm -rf " + bak_file)
return True

@ -3536,7 +3536,6 @@ def doFullSyncUser(version=''):
# r = db.query(cmd)
# print(r)
mdb8 = ['8.0','8.1','8.2','8.3','8.4']
if mw.inArray(mdb8,version):
db.query("start slave user='{}' password='{}';".format(user, apass))
else:

Loading…
Cancel
Save