Simple Linux Panel
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mdserver-web/plugins/msonedrive/class/msodclient.py

821 lines
29 KiB

2 years ago
# coding:utf-8
import sys
import io
import os
import time
import re
import json
import io
sys.path.append(os.getcwd() + "/class/core")
import mw
import oauthlib
import requests
import datetime
from requests_oauthlib import OAuth2Session
DEBUG = False
def setDebug(d=False):
DEBUG = d
class UnauthorizedError(Exception):
pass
class ObjectNotFoundError(Exception):
pass
class msodclient:
plugin_dir = ''
server_dir = ''
credential_file = 'credentials.json'
user_conf = "user.conf"
token_file = 'token.pickle'
def __init__(self, plugin_dir, server_dir):
self.plugin_dir = plugin_dir
self.server_dir = server_dir
self.load()
def setDebug(self, d=False):
DEBUG = d
def load(self):
credential_path = os.path.join(self.plugin_dir, self.credential_file)
credential = json.loads(mw.readFile(credential_path))
# print(credential)
self.credential = credential["onedrive-international"]
self.authorize_url = '{0}{1}'.format(
self.credential['authority'],
self.credential['authorize_endpoint'])
self.token_url = '{0}{1}'.format(
self.credential['authority'],
self.credential['token_endpoint'])
self.token_path = os.path.join(self.server_dir, self.token_file)
self.root_uri = self.credential["api_uri"] + "/me/drive/root"
self.backup_path = 'backup'
def store_token(self, token):
"""存储token"""
enstr = mw.enDoubleCrypt('msodc', json.dumps(token))
mw.writeFile(self.token_path, enstr)
return True
def get_store_token(self):
rdata = mw.readFile(self.token_path)
destr = mw.deDoubleCrypt('msodc', rdata)
return json.loads(destr)
def clear_token(self):
"""清除token记录"""
try:
if os.path.isfile(self.token_path):
os.remove(self.token_path)
except:
if DEBUG:
print("清除token失败。")
def refresh_token(self, origin_token):
"""刷新token"""
os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1'
os.environ['OAUTHLIB_IGNORE_SCOPE_CHANGE'] = '1'
refresh_token = origin_token["refresh_token"]
aad_auth = OAuth2Session(
self.credential["client_id"],
scope=self.credential["scopes"],
redirect_uri=self.credential["redirect_uri"])
new_token = aad_auth.refresh_token(
self.token_url,
refresh_token=refresh_token,
client_id=self.credential["client_id"],
client_secret=self.credential["client_secret"])
return new_token
def get_token_from_authorized_url(self, authorized_url, expected_state=None):
"""通过授权编码获取访问token"""
# 忽略token scope与已请求的scope不一致
os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1'
os.environ['OAUTHLIB_IGNORE_SCOPE_CHANGE'] = '1'
aad_auth = OAuth2Session(self.credential["client_id"],
state=expected_state,
scope=self.credential['scopes'],
redirect_uri=self.credential['redirect_uri'])
token = aad_auth.fetch_token(
self.token_url,
client_secret=self.credential["client_secret"],
authorization_response=authorized_url)
return token
def get_token(self):
token = self.get_store_token()
now = time.time()
expire_time = token["expires_at"] - 300
if now >= expire_time:
new_token = self.refresh_token(token)
self.store_token(new_token)
return new_token
return token
def get_sign_in_url(self):
"""生成签名地址"""
# Initialize the OAuth client
aad_auth = OAuth2Session(self.credential["client_id"],
scope=self.credential['scopes'],
redirect_uri=self.credential['redirect_uri'])
sign_in_url, state = aad_auth.authorization_url(self.authorize_url,
prompt='login')
return sign_in_url, state
def get_authorized_header(self):
token_obj = self.get_token()
token = token_obj["access_token"]
header = {
"Authorization": "Bearer " + token,
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/67.0.3396.99 Safari/537.36'
}
return header
def get_user_from_ms(self):
"""查询用户信息"""
try:
headers = self.get_authorized_header()
user_api_base = self.credential["api_uri"] + "/me"
# select_user_info_uri = self.build_uri(base=user_api_base)
response = requests.get(user_api_base, headers=headers)
if DEBUG:
print("Debug get user:")
print(response.status_code)
print(response.text)
if response.status_code == 200:
response_data = response.json()
user_principal_name = response_data["userPrincipalName"]
return user_principal_name
except oauthlib.oauth2.rfc6749.errors.InvalidGrantError:
self.clear_auth()
if DEBUG:
print("用户授权已过期。")
return None
def clear_auth(self):
self.clear_token()
self.clear_user()
def clear_user(self):
try:
# 清空user
path = os.path.join(self.server_dir, self.user_conf)
if os.path.isfile(path):
os.remove(path)
except:
if DEBUG:
print("清除user失败。")
def store_user(self):
"""更新并存储用户信息"""
user = self.get_user_from_ms()
if user:
path = os.path.join(self.server_dir, self.user_conf)
mw.writeFile(path, user)
else:
raise RuntimeError("无法获取用户信息。")
# --------------------- 文件操作功能 ----------------------
# 取目录路径
def get_path(self, path):
sep = ":"
if path == '/':
path = ''
if path[-1:] == '/':
path = path[:-1]
if path[:1] != "/" and path[:1] != sep:
path = "/" + path
if path == '/':
path = ''
# if path[:1] != sep:
# path = sep + path
try:
from urllib.parse import quote
except:
from urllib import quote
# path = quote(path)
return path.replace('//', '/')
def build_uri(self, path="", operate=None, base=None):
"""构建请求URL
API请求URI格式参考:
https://graph.microsoft.com/v1.0/me/drive/root:/bt_backup/:content
--------------------------------------------- ---------- --------
base path operate
各部分之间用连接
:param path 子资源路径
:param operate 对文件进行的操作比如content,children
:return 请求url
"""
if base is None:
base = self.root_uri
path = self.get_path(path)
sep = ":"
if operate:
if operate[:1] != "/":
operate = "/" + operate
if path:
uri = base + sep + path
if operate:
uri += sep + operate
else:
uri = base
if operate:
uri += operate
return uri
def get_list(self, path="/"):
"""获取存储空间中的所有文件对象"""
list_uri = self.build_uri(path, operate="/children")
if DEBUG:
print("List uri:")
print(list_uri)
data = []
response = requests.get(list_uri, headers=self.get_authorized_header())
status_code = response.status_code
if status_code == 200:
if DEBUG:
print("DEBUG:")
print(response.json())
response_data = response.json()
drive_items = response_data["value"]
for item in drive_items:
tmp = {}
tmp['name'] = item["name"]
tmp['size'] = item["size"]
if "folder" in item:
# print("{} is folder:".format(item["name"]))
# print(item["folder"])
tmp["type"] = None
tmp['download'] = ""
if "file" in item:
tmp["type"] = "File"
tmp['download'] = item["@microsoft.graph.downloadUrl"]
# print("{} is file:".format(item["name"]))
# print(item["file"])
formats = ["%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ"]
t = None
for time_format in formats:
try:
t = datetime.datetime.strptime(
item["lastModifiedDateTime"], time_format)
break
except:
continue
t += datetime.timedelta(hours=8)
ts = int(
(time.mktime(t.timetuple()) + t.microsecond / 1000000.0))
tmp['time'] = ts
data.append(tmp)
mlist = {'path': path, 'list': data}
return mlist
def get_object(self, object_name):
"""查询对象信息"""
try:
get_uri = self.build_uri(path=object_name)
if DEBUG:
print("Get uri:")
print(get_uri)
response = requests.get(get_uri,
headers=self.get_authorized_header())
if response.status_code in [200]:
response_data = response.json()
if DEBUG:
print("Object info:")
print(response_data)
return response_data
if response.status_code == 404:
if DEBUG:
print("对象不存在。")
if DEBUG:
print("Get Object debug:")
print(response.status_code)
print(response.text)
except Exception as e:
if DEBUG:
print("Get object has excepiton:")
print(e)
return None
def is_folder(self, obj):
if "folder" in obj:
return True
return False
def delete_object_by_os(self, object_name):
"""删除对象
:param object_name:
:return: True 删除成功
其他 删除失败
"""
obj = self.get_object(object_name)
if obj is None:
if DEBUG:
print("对象不存在,删除操作未执行。")
return True
if self.is_folder(obj):
child_count = obj["folder"]["childCount"]
if child_count > 0:
if DEBUG:
print("文件夹不是空文件夹无法删除。")
return False
headers = self.get_authorized_header()
delete_uri = self.build_uri(object_name)
response = requests.delete(delete_uri, headers=headers)
if response.status_code == 204:
if DEBUG:
print("对象: {} 已被删除。".format(object_name))
return True
return False
def delete_object(self, object_name, retries=2):
"""删除对象
:param object_name:
:param retries: 重试次数默认2次
:return: True 删除成功
其他 删除失败
"""
try:
return self.delete_object_by_os(object_name)
except Exception as e:
print("删除文件异常:")
print(e)
# 重试
if retries > 0:
print("重新尝试删除文件{}...".format(object_name))
return self.delete_object(
object_name,
retries=retries - 1)
return False
def build_object_name(self, data_type, file_name):
"""根据数据类型构建对象存储名称
:param data_type:
:param file_name:
:return:
"""
import re
prefix_dict = {
"site": "web",
"database": "db",
"path": "path",
}
file_regx = prefix_dict.get(data_type) + "_(.+)_20\d+_\d+(?:\.|_)"
sub_search = re.search(file_regx, file_name)
sub_path_name = ""
if sub_search:
sub_path_name = sub_search.groups()[0]
sub_path_name += '/'
# 构建OS存储路径
object_name = self.backup_path + '/' + \
data_type + '/' + \
sub_path_name + \
file_name
if object_name[:1] == "/":
object_name = object_name[1:]
return object_name
def delete_file(self, file_name, data_type=None):
"""删除文件
根据传入的文件名称和文件数据类型构建对象名称再删除
:param file_name:
:param data_type: 数据类型 site/database/path
:return: True 删除成功
其他 删除失败
"""
object_name = self.build_object_name(data_type, file_name)
return self.delete_object(object_name)
def create_dir_by_step(self, parent_folder, sub_folder):
create_uri = self.build_uri(path=parent_folder, operate="/children")
if DEBUG:
print("Create dir uri:")
print(create_uri)
post_data = {
"name": sub_folder,
"folder": {"@odata.type": "microsoft.graph.folder"},
"@microsoft.graph.conflictBehavior": "fail"
}
headers = self.get_authorized_header()
headers.update({"Content-type": "application/json"})
response = requests.post(create_uri, headers=headers, json=post_data)
if response.status_code in [201, 409]:
if DEBUG:
if response.status_code == 409:
print("目录:{} 已经存在。".format(sub_folder))
return True
else:
if DEBUG:
print("目录:{} 创建失败:".format(sub_folder))
print(response.status_code)
print(response.text)
return False
def create_dir(self, dir_name):
"""创建远程目录
# API 请求结构
# POST /me/drive/root/children
# or
# POST /me/drive/root:/bt_backup/:/children
# Content - Type: application / json
# {
# "name": "New Folder",
# "folder": {},
# "@microsoft.graph.conflictBehavior": "rename"
# }
# Response: status code == 201 新创建/ 409 已存在
# @microsoft.graph.conflictBehavior: fail/rename/replace
:param dir_name: 目录名称
:param parent_id: 父目录ID
:return: True/False
"""
dir_name = self.get_path(dir_name.strip())
onedrive_business_reserved = r"[\*<>?:|#%]"
if re.search(onedrive_business_reserved, dir_name) \
or dir_name[-1] == "." or dir_name[:1] == "~":
if DEBUG:
print("文件夹名称包含非法字符。")
return False
parent_folder = self.get_path(os.path.split(dir_name)[0])
sub_folder = os.path.split(dir_name)[1]
# print("create_dir:", dir_name)
obj = self.get_object(dir_name)
# 判断对象是否存在
if obj is None:
if not self.create_dir_by_step(parent_folder, sub_folder):
# 兼容OneDrive 商业版文件夹创建
folder_array = dir_name.split("/")
parent_folder = self.get_path(folder_array[0])
for i in range(1, len(folder_array)):
sub_folder = folder_array[i]
if DEBUG:
print("Parent folder: {}".format(parent_folder))
print("Sub folder: {}".format(sub_folder))
if self.create_dir_by_step(parent_folder, sub_folder):
parent_folder += "/" + folder_array[i]
else:
return False
return True
else:
if self.is_folder(obj):
if DEBUG:
print("文件夹已存在。")
return True
def resumable_upload(self,
local_file_name,
object_name=None,
progress_callback=None,
progress_file_name=None,
multipart_threshold=1024 * 1024 * 2,
part_size=1024 * 1024 * 5,
store_dir="/tmp",
auto_cancel=True,
retries=5,
):
"""断点续传
:param local_file_name: 本地文件名称
:param object_name: 指定OS中存储的对象名称
:param part_size: 指定分片上传的每个分片的大小必须是320*1024的整数倍
:param multipart_threshold: 文件长度大于该值时则用分片上传
:param progress_callback: 进度回调函数默认是把进度信息输出到标准输出
:param progress_file_name: 进度信息保存文件进度格式参见[report_progress]
:param store_dir: 上传分片存储目录, 默认/tmp
:param auto_cancel: 当备份失败是否自动取消上传记录
:param retries: 上传重试次数
:return: True上传成功/False or None上传失败
"""
try:
file_size_separation_value = 4 * 1024 * 1024
if part_size % 320 != 0:
if DEBUG:
print("Part size 必须是320的整数倍。")
return False
if object_name is None:
temp_file_name = os.path.split(local_file_name)[1]
object_name = os.path.join(self.backup_path, temp_file_name)
# if progress_file_name:
# os.environ[PROGRESS_FILE_NAME] = progress_file_name
# progress_callback = report_progress
print("|-正在上传到 {}...".format(object_name))
dir_name = os.path.split(object_name)[0]
if not self.create_dir(dir_name):
if DEBUG:
print("目录创建失败!")
return False
local_file_size = os.path.getsize(local_file_name)
# if local_file_size < file_size_separation_value:
if False:
# 小文件上传
upload_uri = self.build_uri(path=object_name,
operate="/content")
if DEBUG:
print("Upload uri:")
print(upload_uri)
headers = self.get_authorized_header()
# headers.update({
# "Content-Type": "application/octet-stream"
# })
# files = {"file": (object_name, open(local_file_name, "rb"))}
file_data = open(local_file_name, "rb")
response = requests.put(upload_uri,
headers=headers,
data=file_data)
if DEBUG:
print("status code:")
print(response.status_code)
# print(response.text)
if response.status_code in [201, 200]:
if DEBUG:
print("文件上传成功!")
return True
else:
# 大文件上传
# 1. 创建上传session
create_session_uri = self.build_uri(
path=object_name,
operate="createUploadSession")
headers = self.get_authorized_header()
response = requests.post(create_session_uri, headers=headers)
if response.status_code == 200:
response_data = response.json()
upload_url = response_data["uploadUrl"]
expiration_date_time = response_data["expirationDateTime"]
if DEBUG:
print("上传session已建立。")
print("Upload url: {}".format(upload_url))
print("Expiration datetime: {}".format(
expiration_date_time))
# 2. 分片上传文件
requests.adapters.DEFAULT_RETRIES = 1
session = requests.session()
session.keep_alive = False
# 开始分片上传
import math
parts = int(math.ceil(local_file_size / part_size))
for i in range(parts):
if DEBUG:
if i == parts - 1:
num = "最后"
else:
num = "{}".format(i + 1)
print("正在上传{}部分...".format(num))
upload_range_start = i * part_size
upload_range_end = min(upload_range_start + part_size,
local_file_size)
content_length = upload_range_end - upload_range_start
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/67.0.3396.99 Safari/537.36'
}
# 开发记录
# Content-Range和标准的http请求头中的Range作用有所不同
# Content-Range是OneDrive自定义的分片上传标识,格式也不一样
headers.update({
"Content-Length": repr(content_length),
"Content-Range": "bytes {}-{}/{}".format(
upload_range_start,
upload_range_end - 1,
local_file_size),
"Content-Type": "application/octet-stream"
})
if DEBUG:
print("Headers:")
print(headers)
'''# TODO 优化read的读取占用内存'''
f = io.open(local_file_name, "rb")
f.seek(upload_range_start)
upload_data = f.read(content_length)
sub_response = session.put(upload_url,
headers=headers,
data=upload_data)
expected_status_code = [200, 201, 202]
if sub_response.status_code in expected_status_code:
if DEBUG:
print("Response status code: {}, "
"bytes {}-{} 已上传成功。".format(
sub_response.status_code,
upload_range_start,
upload_range_end - 1)
)
print(sub_response.text)
if sub_response.status_code in [200, 201]:
if DEBUG:
print("文件 {} 上传成功。".format(object_name))
return True
else:
print(sub_response.status_code)
print(sub_response.text)
_error_msg = "Bytes {}-{} 分片上传失败。".format(
upload_range_start,
upload_range_end
)
if self.error_msg:
self.error_msg += r"\n"
self.error_msg += _error_msg
raise RuntimeError(_error_msg)
time.sleep(0.5)
else:
raise RuntimeError("session创建失败。")
except UnauthorizedError as e:
_error_msg = str(e)
if self.error_msg:
self.error_msg += r"\n"
self.error_msg += _error_msg
print(_error_msg)
return False
except Exception as e:
print("文件上传出现错误:")
print(e)
if self.error_msg:
self.error_msg += r"\n"
self.error_msg += "文件{}上传出现错误:{}".format(object_name, str(e))
try:
if upload_url:
if DEBUG:
print("正在清理上传session.")
session.delete(upload_url)
except:
pass
finally:
try:
f.close()
except:
pass
try:
session.close()
except:
pass
# 重试断点续传
if retries > 0:
print("重试上传文件....")
return self.resumable_upload(
local_file_name,
object_name=object_name,
store_dir=store_dir,
part_size=part_size,
multipart_threshold=multipart_threshold,
progress_callback=progress_callback,
progress_file_name=progress_file_name,
retries=retries - 1,
)
else:
if self.error_msg:
self.error_msg += r"\n"
self.error_msg += "文件{}上传失败。".format(object_name)
return False
def upload_abs_file(self, file_name, remote_dir, *args, **kwargs):
"""按照数据类型上传文件
:param file_name: 上传文件名称
:param data_type: 数据类型 site/database/path
:return: True/False
"""
try:
import re
# 根据数据类型提取子分类名称
# 比如data_type=database,子分类名称是数据库的名称。
# 提取方式是从file_name中利用正则规则去提取。
self.error_msg = ""
file_name = os.path.abspath(file_name)
temp_name = os.path.split(file_name)[1]
object_name = 'backup/' + temp_name
print(file_name)
print(object_name)
return self.resumable_upload(file_name,
object_name=object_name,
*args,
**kwargs)
except Exception as e:
if self.error_msg:
self.error_msg += r"\n"
self.error_msg += "文件上传出现错误:{}".format(str(e))
return False
def upload_file(self, file_name, data_type, *args, **kwargs):
"""按照数据类型上传文件
:param file_name: 上传文件名称
:param data_type: 数据类型 site/database/path
:return: True/False
"""
try:
import re
# 根据数据类型提取子分类名称
# 比如data_type=database,子分类名称是数据库的名称。
# 提取方式是从file_name中利用正则规则去提取。
self.error_msg = ""
if not file_name or not data_type:
_error_msg = "文件参数错误。"
print(_error_msg)
self.error_msg = _error_msg
return False
file_name = os.path.abspath(file_name)
temp_name = os.path.split(file_name)[1]
object_name = self.build_object_name(data_type, temp_name)
# dir_name = os.path.dirname(object_name)
# self.create_dir(dir_name)
if DEBUG:
print(file_name)
print(object_name)
print(dir_name)
return self.resumable_upload(file_name,
object_name=object_name,
*args,
**kwargs)
except Exception as e:
if self.error_msg:
self.error_msg += r"\n"
self.error_msg += "文件上传出现错误:{}".format(str(e))
return False