arch-wiki-docs/ArchWiki/downloader.py

158 lines
5.7 KiB
Python

#! /usr/bin/env python3
import os
import datetime
import requests
from requests.packages.urllib3.util.retry import Retry
class Downloader:
query_allpages = {
"action": "query",
"generator": "allpages",
"gaplimit": "max",
"gapfilterredir": "nonredirects",
"gapnamespace": "0",
"prop": "info",
"inprop": "url",
"continue": "",
}
query_allimages = {
"action": "query",
"list": "allimages",
"ailimit": "max",
"aiprop": "url|timestamp",
"continue": "",
}
css_links = {
"https://wiki.archlinux.org/load.php?lang=en&modules=site.styles|skins.vector.icons,styles|zzz.ext.archLinux.styles&only=styles&skin=vector-2022": "ArchWikiOffline.css",
}
def __init__(self, wiki, output_directory, epoch, *, optimizer=None):
""" Parameters:
@wiki: ArchWiki instance to work with
@output_directory: where to store the downloaded files
@epoch: force update of every file older than this date (must be instance
of 'datetime')
@optimizer: callback function for HTML post-processing
"""
self.wiki = wiki
self.output_directory = output_directory
self.epoch = epoch
self.optimizer = optimizer
# ensure output directory always exists
if not os.path.isdir(self.output_directory):
os.mkdir(self.output_directory)
# list of valid files
self.files = []
self.session = requests.Session()
# granular control over requests' retries: https://stackoverflow.com/a/35504626
retries = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
adapter = requests.adapters.HTTPAdapter(max_retries=retries)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
def needs_update(self, fname, timestamp):
""" determine if it is necessary to download a page
"""
if not os.path.exists(fname):
return True
local = datetime.datetime.utcfromtimestamp(os.path.getmtime(fname))
if local < timestamp or local < self.epoch:
return True
return False
def process_namespace(self, namespace):
""" walk all pages in given namespace, download if necessary
"""
print(f"Processing namespace {namespace}...")
query = self.query_allpages.copy()
query["gapnamespace"] = namespace
for pages_snippet in self.wiki.query_continue(query):
for page in sorted(pages_snippet["pages"].values(), key=lambda d: d["title"]):
title = page["title"]
fname = self.wiki.get_local_filename(title, self.output_directory)
if not fname:
print(f" [skipping] {title}")
continue
self.files.append(fname)
timestamp = self.wiki.parse_date(page["touched"])
if self.needs_update(fname, timestamp):
print(f" [downloading] {title}")
fullurl = page["fullurl"]
r = self.session.get(fullurl)
if self.optimizer is not None:
text = self.optimizer.optimize(fname, r.text)
else:
text = r.text
# ensure that target directory exists (necessary for subpages)
os.makedirs(os.path.dirname(fname), exist_ok=True)
with open(fname, "w") as fd:
fd.write(text)
else:
print(f" [up-to-date] {title}")
def download_css(self):
print("Downloading CSS...")
for link, dest in self.css_links.items():
print(" ", dest)
fname = os.path.join(self.output_directory, dest)
if fname:
self.files.append(fname)
r = self.session.get(link)
with open(fname, "w") as fd:
fd.write(r.text)
def download_images(self):
print("Downloading images...")
query = self.query_allimages.copy()
for images_snippet in self.wiki.query_continue(query):
for image in images_snippet["allimages"]:
title = image["title"]
fname = self.wiki.get_local_filename(title, self.output_directory)
if not fname:
print(f" [skipping] {title}")
continue
self.files.append(fname)
timestamp = self.wiki.parse_date(image["timestamp"])
if self.needs_update(fname, timestamp):
print(f" [downloading] {title}")
r = self.session.get(image["url"])
with open(fname, "wb") as fd:
fd.write(r.content)
else:
print(f" [up-to-date] {title}")
def clean_output_directory(self):
""" Walk output_directory and delete all files not found on the wiki.
Should be run _after_ downloading, otherwise all files will be deleted!
"""
print("Deleting unwanted files (deleted/moved on the wiki)...")
valid_files = self.files.copy()
for path, dirs, files in os.walk(self.output_directory, topdown=False):
# handle files
for f in files:
fpath = os.path.join(path, f)
if fpath not in valid_files:
print(f" [deleting] {fpath}")
os.unlink(fpath)
# remove empty directories
if len(os.listdir(path)) == 0:
print(f" [deleting] {path}/")
os.rmdir(path)