背景

需要爬github repo中的一些特定内容,利用了github提供的api。搞了整整两天才搞出来稍微优化的版本,主要因为github_api有速率限制,所以需要准备好token断连后换token续传、并发等处理。

参考文档

关于速率限制的官方最佳实践
有关各种请求的endpoints链接可以在文档首页下拉找到。
首页
pr’s endpoints

代码设计

嵌套文件夹完整下载

涉及并发,是笔者做得比较用心的一部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
async def download_file(file_url, file_path, session, semaphore, token, logger):
retries = 0
while retries < 50:
async with semaphore:
headers = {"Authorization": f"token {token}"}

async with session.get(file_url, headers=headers) as response:
if response.status == 200:
data = await response.json()
file_content = base64.b64decode(data["content"])
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as f:
f.write(file_content)
break
else:
retries += 1
if retries % 6 == 0:
await asyncio.sleep(60 * 15)
else:
token = random.choice(token_list)
if retries >= 50:
logger.error(f"{file_path}下载失败,{file_url}")
return True


async def async_download_dir(
directory_url, directory, session, semaphore, token,
):
headers = {"Authorization": f"token {token}"} if token else {}
async with session.get(directory_url, headers=headers) as response:
response.raise_for_status()
data = await response.json()
tree = data["tree"]

if len(tree) > 2000:
return False

tasks = []
for file in tree:
file_path = os.path.join(directory, file["path"])
file_url = file["url"]
if file["type"] == "tree":
tasks.append(
async_download_dir(
file_url, file_path, session, semaphore, token
)
)
else:
if not os.path.exists(file_path):
tasks.append(
download_file(
file_url, file_path, session, semaphore, token
)
)

await asyncio.gather(*tasks)
return True


def download_dir(directory_url, directory, token=None):
async def run():
async with aiohttp.ClientSession() as session:
semaphore = asyncio.Semaphore(10)
result = await async_download_dir(
directory_url, directory, session, semaphore, token
)
return result

return asyncio.run(run())

有并发,有异常处理,有细粒度的token轮换来保证断点续传,而且是有backoff的重试策略,我觉得还是很不错滴。

后台运行

  1. 启动

    1
    nohup python your_script.py > nohup.out

    默认日志就在nohup.out>是可以指定到其他文件中

  2. 打印日志

1
tail -f nohup.out
  1. 查找进程 ID (PID) 根据脚本名:

    使用 pgrep 命令直接根据脚本名查找进程 ID。

    1
    pgrep -f your_script.py

    -f 参数表示搜索全命令行,而不仅仅是进程名称,这样可以确保找到指定脚本的进程。

  2. 停止进程:

    使用 pkill 命令根据脚本名来停止进程:

    1
    pkill -f your_script.py

    同样地,-f 参数表示搜索全命令行。