阿里云盘相册下载脚本

在上次阿里云盘出现泄露之后,就已经不再想接着备份,但是有节假三万张照片,阿里云盘没有提供一键下载的功能,在简单爬取接口之后,以下脚本可能实现我的功能
需要获取三个数据,access_token,refresh_token
这两个参数的获取会稍微麻烦一点,浏览器登录阿里云盘,打开f12,点开应用,本地存储空间,里面的token,点开,image.png
还需要一个drive_id,点开相册界面,出现这个search接口时,点开负载,就能看到这个id,填入下面主函数即可
image.png

import mimetypes
import time
import backoff
import requests
import hashlib
import os
import re
from urllib.parse import urlparse, unquote

# 定义全局变量来存储令牌
access_token = ""
refresh_token = ""
drive_id = ""  # 新增全局变量

def update_headers():
    """根据全局的 access_token 更新 headers"""
    return {
        "Authorization": f"Bearer {access_token}"
    }

def save_tokens(new_access_token, new_refresh_token):
    """保存新的令牌,可以扩展为保存到文件或安全存储"""
    global access_token, refresh_token
    access_token = new_access_token
    refresh_token = new_refresh_token
    # 可在此处将令牌保存到文件
    # with open('tokens.txt', 'w') as f:
    #     f.write(f"{access_token}\n{refresh_token}")

def load_tokens():
    """加载令牌,可以扩展为从文件或安全存储中加载"""
    global access_token, refresh_token
    # 可在此处从文件加载令牌
    # try:
    #     with open('tokens.txt', 'r') as f:
    #         access_token = f.readline().strip()
    #         refresh_token = f.readline().strip()
    # except FileNotFoundError:
    #     pass
    pass  # 如果不从文件加载,可忽略

def refresh_token_func():
    """刷新令牌的函数"""
    global access_token, refresh_token
    url = "https://auth.aliyundrive.com/v2/account/token"
    data = {
        "refresh_token": refresh_token,
        "grant_type": "refresh_token"
    }
    try:
        response = requests.post(url, json=data)
        if response.status_code == 200:
            resp = response.json()
            if 'refresh_token' in resp and 'access_token' in resp:
                access_token = resp['access_token']
                refresh_token = resp['refresh_token']
                save_tokens(access_token, refresh_token)
                print("令牌已刷新")
                return True
            else:
                print("刷新令牌失败,响应中缺少令牌信息")
                return False
        else:
            print(f"刷新令牌失败,状态码:{response.status_code},响应内容:{response.text}")
            return False
    except Exception as e:
        print(f"刷新令牌时发生异常:{e}")
        return False

# 发送POST请求的通用函数,带重试机制
@backoff.on_exception(
    backoff.expo,  # 使用指数退避策略
    requests.exceptions.RequestException,  # 对所有请求异常进行重试
    max_tries=5,  # 最大重试次数
    giveup=lambda e: e.response is not None and e.response.status_code != 429  # 如果响应不是 429 错误,则不重试
)
def post_request(url, data):
    """发送 POST 请求"""
    headers = update_headers()
    try:
        response = requests.post(url, headers=headers, json=data)
    except Exception as e:
        print(f"请求失败:{e}")
        raise

    if response.status_code == 200:
        return response.json()  # 返回 JSON 格式的数据
    elif response.status_code == 401:
        print("401 未授权,尝试刷新令牌")
        if refresh_token_func():
            headers = update_headers()
            response = requests.post(url, headers=headers, json=data)
            if response.status_code == 200:
                return response.json()
            else:
                print(f"重试请求失败,状态码:{response.status_code},响应内容:{response.text}")
                response.raise_for_status()
        else:
            print("刷新令牌失败,无法继续请求")
            response.raise_for_status()
    elif response.status_code == 429:
        print("请求过于频繁,等待后重试")
        raise requests.exceptions.RequestException("Too many requests - 429")  # 当返回 429 错误时抛出异常
    else:
        print(f"请求失败,状态码:{response.status_code},响应内容:{response.text}")
        response.raise_for_status()  # 其他错误状态码抛出异常
    return None

# 获取下载链接的函数
def get_download_url(drive_id, file_id):
    """通过文件ID和DriveID获取下载链接"""
    url = "https://api.aliyundrive.com/v2/file/get_download_url"
    data = {
        "drive_id": drive_id,
        "file_id": file_id
    }
    result = post_request(url, data)
    if result:
        # 获取下载链接
        download_url = result.get('url')
        return download_url
    return None

# 获取文件信息的函数
def search_files(search_data):
    """搜索文件,获取文件列表"""
    url = "https://api.aliyundrive.com/adrive/v3/file/search"
    return post_request(url, search_data)

# 检查本地文件是否存在,如果存在则比较哈希值
def check_local_file(filepath, remote_hash, hash_method='sha1'):
    """检查本地是否已存在相同文件,返回文件是否存在以及是否相同"""
    if os.path.exists(filepath):
        # 计算本地文件的哈希值
        local_hash = calculate_hash(filepath, hash_method)
        if local_hash == remote_hash:
            return True, True  # 文件存在且内容相同
        else:
            return True, False  # 文件存在但内容不同
    return False, False  # 文件不存在

# 计算文件的哈希值
def calculate_hash(filepath, hash_method='sha1'):
    """计算文件的哈希值"""
    if hash_method.lower() == 'sha1':
        hash_func = hashlib.sha1()
    elif hash_method.lower() == 'md5':
        hash_func = hashlib.md5()
    else:
        raise ValueError(f"不支持的哈希方法:{hash_method}")

    with open(filepath, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_func.update(chunk)
    return hash_func.hexdigest()

# 下载文件的函数
def download_file(download_url, filepath, referer_url="https://www.alipan.com/"):
    """下载文件并保存"""
    # 解析下载链接,提取 Host 信息
    parsed_url = urlparse(download_url)
    host = parsed_url.hostname

    # 设置请求头
    headers = {
        "Referer": referer_url,
        "Connection": "keep-alive",
        "Host": host
    }

    # 发送 GET 请求下载文件
    try:
        response = requests.get(download_url, headers=headers, stream=True)
    except Exception as e:
        print(f"下载请求失败:{e}")
        return False

    if response.status_code == 200:
        # 确保保存目录存在
        save_dir = os.path.dirname(filepath)
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

        # 保存文件
        with open(filepath, 'wb') as file:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    file.write(chunk)

        print(f"文件已下载:{filepath}")
        return True  # 返回 True 表示成功下载
    else:
        print(f"下载失败: {download_url},状态码: {response.status_code}")
        return False  # 下载失败返回 False

# 处理获取文件信息和下载链接的函数
def process_files(search_data):
    """处理文件列表并获取每个文件的下载链接"""
    success_count = 0  # 成功下载的文件计数
    total_count = 0  # 总文件数
    current_marker = search_data.get('marker', '')

    while True:
        try:
            # 获取文件列表
            file_data = search_files(search_data)
        except Exception as e:
            print(f"获取文件列表失败:{e}")
            print(f"当前参数:{search_data}")
            break  # 发生异常,退出循环

        if not file_data or "items" not in file_data:
            print("未获取到文件数据")
            break  # 如果没有文件数据,退出循环

        # 提取文件信息
        file_info = file_data['items']

        for file in file_info:
            total_count += 1
            drive_id = file['drive_id']
            file_id = file['file_id']
            file_name = file['name']
            content_hash = file.get('content_hash')
            content_hash_name = file.get('content_hash_name')

            if not content_hash or not content_hash_name:
                print(f"文件缺少哈希信息,跳过:{file_name}")
                continue

            # 获取下载链接
            try:
                download_url = get_download_url(drive_id, file_id)
            except Exception as e:
                print(f"获取下载链接失败:{e}")
                continue  # 发生异常,继续处理下一个文件

            if not download_url:
                print(f"无法获取下载链接,文件 ID:{file_id}")
                continue

            # 构造保存文件的路径
            save_dir = "downloads"
            filepath = os.path.join(save_dir, file_name)

            # 检查本地文件
            file_exists, is_same = check_local_file(filepath, content_hash, content_hash_name)
            if file_exists and is_same:
                print(f"文件已存在且相同,跳过下载:{filepath}")
                continue  # 跳过下载
            else:
                print(f"开始下载文件:{file_name}")

            # 下载文件
            download_success = False
            retry_times = 0
            max_retries = 3
            while not download_success and retry_times < max_retries:
                try:
                    download_success = download_file(download_url, filepath)
                    if download_success:
                        # 下载完成后验证哈希值
                        local_hash = calculate_hash(filepath, content_hash_name)
                        remote_hash_lower = content_hash.lower()
                        local_hash_lower = local_hash.lower()
                        print(f"远程哈希值:{remote_hash_lower}")
                        print(f"本地哈希值:{local_hash_lower}")
                        if local_hash_lower != remote_hash_lower:
                            print(f"文件哈希值不匹配,重新下载:{file_name}")
                            download_success = False
                            retry_times += 1
                            time.sleep(2)  # 等待2秒再重试
                            continue
                        else:
                            success_count += 1
                            print(f"文件下载并验证成功:{file_name}")
                    else:
                        retry_times += 1
                        time.sleep(2)  # 等待2秒再重试
                except Exception as e:
                    print(f"下载文件时发生异常:{e}")
                    retry_times += 1
                    time.sleep(2)  # 等待2秒再重试

            if not download_success:
                print(f"文件下载失败:{file_name},已超过最大重试次数")

        # 检查是否有 next_marker 来获取下一页
        next_marker = file_data.get('next_marker')
        if not next_marker:
            print("所有文件已处理完毕")
            break  # 没有下一页数据,退出循环
        else:
            search_data['marker'] = next_marker  # 更新 marker 参数
            current_marker = next_marker
            print(f"处理下一页数据,marker:{current_marker}")

    print(f"成功下载 {success_count} 个文件,共处理 {total_count} 个文件。")
    return

# 示例:如何使用这个代码

if __name__ == "__main__":
    # 加载已有的令牌,若未保存则需要用户输入
    load_tokens()
    if not access_token or not refresh_token:
        access_token = input("请输入您的 access_token:")
        refresh_token = input("请输入您的 refresh_token:")
        save_tokens(access_token, refresh_token)

    if not drive_id:
        drive_id = input("请输入您的 drive_id:")

    search_data = {
        "marker": "",  # 初始 marker,可以设置为空字符串或根据响应更新
        "query": 'type = "file"',
        "limit": 100,
        "order_by": "created_at DESC",
        "drive_id": drive_id  # 使用用户输入的 drive_id
    }

    # 获取文件下载链接并下载
    process_files(search_data)
版权声明:除特殊说明,博客文章均为栋dong原创,依据CC BY-SA 4.0许可证进行授权,转载请附上出处链接及本声明。
如有需要,请在留言板留言,或者添加我的QQ或者微信
我只是一个学生,如有错误或者侵权,请联系我,谢!
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇