1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| async def download_file(file_url, file_path, session, semaphore, token, logger): retries = 0 while retries < 50: async with semaphore: headers = {"Authorization": f"token {token}"}
async with session.get(file_url, headers=headers) as response: if response.status == 200: data = await response.json() file_content = base64.b64decode(data["content"]) os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "wb") as f: f.write(file_content) break else: retries += 1 if retries % 6 == 0: await asyncio.sleep(60 * 15) else: token = random.choice(token_list) if retries >= 50: logger.error(f"{file_path}下载失败,{file_url}") return True
async def async_download_dir( directory_url, directory, session, semaphore, token, ): headers = {"Authorization": f"token {token}"} if token else {} async with session.get(directory_url, headers=headers) as response: response.raise_for_status() data = await response.json() tree = data["tree"]
if len(tree) > 2000: return False
tasks = [] for file in tree: file_path = os.path.join(directory, file["path"]) file_url = file["url"] if file["type"] == "tree": tasks.append( async_download_dir( file_url, file_path, session, semaphore, token ) ) else: if not os.path.exists(file_path): tasks.append( download_file( file_url, file_path, session, semaphore, token ) )
await asyncio.gather(*tasks) return True
def download_dir(directory_url, directory, token=None): async def run(): async with aiohttp.ClientSession() as session: semaphore = asyncio.Semaphore(10) result = await async_download_dir( directory_url, directory, session, semaphore, token ) return result
return asyncio.run(run())
|