在上次阿里云盘出现泄露之后,就已经不再想接着备份,但是有节假三万张照片,阿里云盘没有提供一键下载的功能,在简单爬取接口之后,以下脚本可能实现我的功能
需要获取三个数据,access_token,refresh_token
这两个参数的获取会稍微麻烦一点,浏览器登录阿里云盘,打开f12,点开应用,本地存储空间,里面的token,点开,
还需要一个drive_id,点开相册界面,出现这个search接口时,点开负载,就能看到这个id,填入下面主函数即可
import mimetypes import time import backoff import requests import hashlib import os import re from urllib.parse import urlparse, unquote # 定义全局变量来存储令牌 access_token = "" refresh_token = "" drive_id = "" # 新增全局变量 def update_headers(): """根据全局的 access_token 更新 headers""" return { "Authorization": f"Bearer {access_token}" } def save_tokens(new_access_token, new_refresh_token): """保存新的令牌,可以扩展为保存到文件或安全存储""" global access_token, refresh_token access_token = new_access_token refresh_token = new_refresh_token # 可在此处将令牌保存到文件 # with open('tokens.txt', 'w') as f: # f.write(f"{access_token}\n{refresh_token}") def load_tokens(): """加载令牌,可以扩展为从文件或安全存储中加载""" global access_token, refresh_token # 可在此处从文件加载令牌 # try: # with open('tokens.txt', 'r') as f: # access_token = f.readline().strip() # refresh_token = f.readline().strip() # except FileNotFoundError: # pass pass # 如果不从文件加载,可忽略 def refresh_token_func(): """刷新令牌的函数""" global access_token, refresh_token url = "https://auth.aliyundrive.com/v2/account/token" data = { "refresh_token": refresh_token, "grant_type": "refresh_token" } try: response = requests.post(url, json=data) if response.status_code == 200: resp = response.json() if 'refresh_token' in resp and 'access_token' in resp: access_token = resp['access_token'] refresh_token = resp['refresh_token'] save_tokens(access_token, refresh_token) print("令牌已刷新") return True else: print("刷新令牌失败,响应中缺少令牌信息") return False else: print(f"刷新令牌失败,状态码:{response.status_code},响应内容:{response.text}") return False except Exception as e: print(f"刷新令牌时发生异常:{e}") return False # 发送POST请求的通用函数,带重试机制 @backoff.on_exception( backoff.expo, # 使用指数退避策略 requests.exceptions.RequestException, # 对所有请求异常进行重试 max_tries=5, # 最大重试次数 giveup=lambda e: e.response is not None and e.response.status_code != 429 # 如果响应不是 429 错误,则不重试 ) def post_request(url, data): """发送 POST 请求""" headers = update_headers() try: response = requests.post(url, headers=headers, json=data) except Exception as e: print(f"请求失败:{e}") raise if response.status_code == 200: return response.json() # 返回 JSON 格式的数据 elif response.status_code == 401: print("401 未授权,尝试刷新令牌") if refresh_token_func(): headers = update_headers() response = requests.post(url, headers=headers, json=data) if response.status_code == 200: return response.json() else: print(f"重试请求失败,状态码:{response.status_code},响应内容:{response.text}") response.raise_for_status() else: print("刷新令牌失败,无法继续请求") response.raise_for_status() elif response.status_code == 429: print("请求过于频繁,等待后重试") raise requests.exceptions.RequestException("Too many requests - 429") # 当返回 429 错误时抛出异常 else: print(f"请求失败,状态码:{response.status_code},响应内容:{response.text}") response.raise_for_status() # 其他错误状态码抛出异常 return None # 获取下载链接的函数 def get_download_url(drive_id, file_id): """通过文件ID和DriveID获取下载链接""" url = "https://api.aliyundrive.com/v2/file/get_download_url" data = { "drive_id": drive_id, "file_id": file_id } result = post_request(url, data) if result: # 获取下载链接 download_url = result.get('url') return download_url return None # 获取文件信息的函数 def search_files(search_data): """搜索文件,获取文件列表""" url = "https://api.aliyundrive.com/adrive/v3/file/search" return post_request(url, search_data) # 检查本地文件是否存在,如果存在则比较哈希值 def check_local_file(filepath, remote_hash, hash_method='sha1'): """检查本地是否已存在相同文件,返回文件是否存在以及是否相同""" if os.path.exists(filepath): # 计算本地文件的哈希值 local_hash = calculate_hash(filepath, hash_method) if local_hash == remote_hash: return True, True # 文件存在且内容相同 else: return True, False # 文件存在但内容不同 return False, False # 文件不存在 # 计算文件的哈希值 def calculate_hash(filepath, hash_method='sha1'): """计算文件的哈希值""" if hash_method.lower() == 'sha1': hash_func = hashlib.sha1() elif hash_method.lower() == 'md5': hash_func = hashlib.md5() else: raise ValueError(f"不支持的哈希方法:{hash_method}") with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_func.update(chunk) return hash_func.hexdigest() # 下载文件的函数 def download_file(download_url, filepath, referer_url="https://www.alipan.com/"): """下载文件并保存""" # 解析下载链接,提取 Host 信息 parsed_url = urlparse(download_url) host = parsed_url.hostname # 设置请求头 headers = { "Referer": referer_url, "Connection": "keep-alive", "Host": host } # 发送 GET 请求下载文件 try: response = requests.get(download_url, headers=headers, stream=True) except Exception as e: print(f"下载请求失败:{e}") return False if response.status_code == 200: # 确保保存目录存在 save_dir = os.path.dirname(filepath) if not os.path.exists(save_dir): os.makedirs(save_dir) # 保存文件 with open(filepath, 'wb') as file: for chunk in response.iter_content(chunk_size=8192): if chunk: file.write(chunk) print(f"文件已下载:{filepath}") return True # 返回 True 表示成功下载 else: print(f"下载失败: {download_url},状态码: {response.status_code}") return False # 下载失败返回 False # 处理获取文件信息和下载链接的函数 def process_files(search_data): """处理文件列表并获取每个文件的下载链接""" success_count = 0 # 成功下载的文件计数 total_count = 0 # 总文件数 current_marker = search_data.get('marker', '') while True: try: # 获取文件列表 file_data = search_files(search_data) except Exception as e: print(f"获取文件列表失败:{e}") print(f"当前参数:{search_data}") break # 发生异常,退出循环 if not file_data or "items" not in file_data: print("未获取到文件数据") break # 如果没有文件数据,退出循环 # 提取文件信息 file_info = file_data['items'] for file in file_info: total_count += 1 drive_id = file['drive_id'] file_id = file['file_id'] file_name = file['name'] content_hash = file.get('content_hash') content_hash_name = file.get('content_hash_name') if not content_hash or not content_hash_name: print(f"文件缺少哈希信息,跳过:{file_name}") continue # 获取下载链接 try: download_url = get_download_url(drive_id, file_id) except Exception as e: print(f"获取下载链接失败:{e}") continue # 发生异常,继续处理下一个文件 if not download_url: print(f"无法获取下载链接,文件 ID:{file_id}") continue # 构造保存文件的路径 save_dir = "downloads" filepath = os.path.join(save_dir, file_name) # 检查本地文件 file_exists, is_same = check_local_file(filepath, content_hash, content_hash_name) if file_exists and is_same: print(f"文件已存在且相同,跳过下载:{filepath}") continue # 跳过下载 else: print(f"开始下载文件:{file_name}") # 下载文件 download_success = False retry_times = 0 max_retries = 3 while not download_success and retry_times < max_retries: try: download_success = download_file(download_url, filepath) if download_success: # 下载完成后验证哈希值 local_hash = calculate_hash(filepath, content_hash_name) remote_hash_lower = content_hash.lower() local_hash_lower = local_hash.lower() print(f"远程哈希值:{remote_hash_lower}") print(f"本地哈希值:{local_hash_lower}") if local_hash_lower != remote_hash_lower: print(f"文件哈希值不匹配,重新下载:{file_name}") download_success = False retry_times += 1 time.sleep(2) # 等待2秒再重试 continue else: success_count += 1 print(f"文件下载并验证成功:{file_name}") else: retry_times += 1 time.sleep(2) # 等待2秒再重试 except Exception as e: print(f"下载文件时发生异常:{e}") retry_times += 1 time.sleep(2) # 等待2秒再重试 if not download_success: print(f"文件下载失败:{file_name},已超过最大重试次数") # 检查是否有 next_marker 来获取下一页 next_marker = file_data.get('next_marker') if not next_marker: print("所有文件已处理完毕") break # 没有下一页数据,退出循环 else: search_data['marker'] = next_marker # 更新 marker 参数 current_marker = next_marker print(f"处理下一页数据,marker:{current_marker}") print(f"成功下载 {success_count} 个文件,共处理 {total_count} 个文件。") return # 示例:如何使用这个代码 if __name__ == "__main__": # 加载已有的令牌,若未保存则需要用户输入 load_tokens() if not access_token or not refresh_token: access_token = input("请输入您的 access_token:") refresh_token = input("请输入您的 refresh_token:") save_tokens(access_token, refresh_token) if not drive_id: drive_id = input("请输入您的 drive_id:") search_data = { "marker": "", # 初始 marker,可以设置为空字符串或根据响应更新 "query": 'type = "file"', "limit": 100, "order_by": "created_at DESC", "drive_id": drive_id # 使用用户输入的 drive_id } # 获取文件下载链接并下载 process_files(search_data)