18c7792935
Replaces the upreza-derived 4K dungeon textures + AtlasLoot boss-coord overlay (which had a consistent positional offset against texture skulls) with keystone.guru's z=4 tile pyramid stitched to 6144x4096 WebP per floor. kg's split_floors.js gives per-dungeon enemies, packs (polygons), patrols (polylines), and map icons calibrated to those tiles, so overlays align pixel-perfectly. 27/29 classic dungeons now have full enemy/pack data; ZG + Sunken Temple have maps only. Pipeline: tools/kg_fetch.py -> tools/kg_stitch.py -> tools/kg_build_data.py.
185 lines
6.5 KiB
Python
185 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Fetch keystone.guru tiles + enemy data for every classic dungeon listed in
|
|
data/kg_dungeons.json.
|
|
|
|
Outputs:
|
|
data/kg/<tile_key>/floor<n>/z4/<x>_<y>.png raw tiles
|
|
data/kg/<tile_key>/<data_slug>_split_floors.js
|
|
data/kg/<tile_key>/<data_slug>_lang.js
|
|
|
|
The compiled-data path includes a build hash; we discover it once from a
|
|
known route page and use it for every fetch in this run.
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse
|
|
import concurrent.futures
|
|
import json
|
|
import re
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
KG_OUT = ROOT / "data" / "kg"
|
|
REGISTRY = ROOT / "data" / "kg_dungeons.json"
|
|
|
|
UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
|
|
TILE_BASE = "https://assets.keystone.guru/tiles"
|
|
DATA_BASE = "https://assets.keystone.guru/compiled"
|
|
ROUTE_PROBE_URL = "https://aws.keystone.guru/route/razorfen-downs/2bhiRi8/ascension-m-rfd/1"
|
|
|
|
HASH_RE = re.compile(r"compiled/([0-9a-f]{40})/")
|
|
|
|
|
|
def http_get(url: str, timeout: int = 15) -> bytes:
|
|
req = urllib.request.Request(url, headers={"User-Agent": UA})
|
|
with urllib.request.urlopen(req, timeout=timeout) as r:
|
|
return r.read()
|
|
|
|
|
|
def http_head_ok(url: str, timeout: int = 5) -> bool:
|
|
req = urllib.request.Request(url, headers={"User-Agent": UA}, method="HEAD")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as r:
|
|
return r.status == 200
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def discover_compiled_hash() -> str:
|
|
html = http_get(ROUTE_PROBE_URL).decode("utf-8", errors="replace")
|
|
m = HASH_RE.search(html)
|
|
if not m:
|
|
raise RuntimeError("could not find compiled-asset hash on route page")
|
|
return m.group(1)
|
|
|
|
|
|
def discover_floors(tile_key: str, expansion: str, max_zoom: int = 4) -> list[int]:
|
|
"""Probe floor numbers 1..N until the first miss. Uses zoom=1 (the
|
|
lowest-zoom layer kg ships) for the existence check."""
|
|
floors = []
|
|
for f in range(1, 20):
|
|
if http_head_ok(f"{TILE_BASE}/{expansion}/{tile_key}/{f}/1/0_0.png"):
|
|
floors.append(f)
|
|
else:
|
|
break
|
|
return floors
|
|
|
|
|
|
def discover_grid(tile_key: str, expansion: str, floor: int, z: int) -> tuple[int, int]:
|
|
"""Find max x and max y at given zoom."""
|
|
max_x = 0
|
|
while http_head_ok(f"{TILE_BASE}/{expansion}/{tile_key}/{floor}/{z}/{max_x + 1}_0.png"):
|
|
max_x += 1
|
|
max_y = 0
|
|
while http_head_ok(f"{TILE_BASE}/{expansion}/{tile_key}/{floor}/{z}/0_{max_y + 1}.png"):
|
|
max_y += 1
|
|
return max_x + 1, max_y + 1 # counts (cols, rows)
|
|
|
|
|
|
def fetch_tile(args) -> tuple[Path, bool]:
|
|
url, dest = args
|
|
if dest.exists() and dest.stat().st_size > 0:
|
|
return dest, True
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
data = http_get(url, timeout=30)
|
|
if not data or data[:4] != b"\x89PNG":
|
|
return dest, False
|
|
dest.write_bytes(data)
|
|
return dest, True
|
|
except Exception:
|
|
return dest, False
|
|
|
|
|
|
def fetch_dungeon_tiles(d: dict, expansion: str, max_zoom: int, workers: int) -> dict:
|
|
"""For one dungeon, discover floors + grid, parallel-download all tiles."""
|
|
tile_key = d["tile_key"]
|
|
name = d["name"]
|
|
out_root = KG_OUT / tile_key
|
|
out_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
floors = discover_floors(tile_key, expansion)
|
|
info = {"tile_key": tile_key, "name": name, "expansion": expansion,
|
|
"max_zoom": max_zoom, "floors": []}
|
|
if not floors:
|
|
print(f" WARN no floors for {tile_key}", file=sys.stderr)
|
|
return info
|
|
|
|
jobs = []
|
|
for f in floors:
|
|
cols, rows = discover_grid(tile_key, expansion, f, max_zoom)
|
|
info["floors"].append({"index": f, "cols": cols, "rows": rows})
|
|
floor_dir = out_root / f"floor{f}" / f"z{max_zoom}"
|
|
for x in range(cols):
|
|
for y in range(rows):
|
|
url = f"{TILE_BASE}/{expansion}/{tile_key}/{f}/{max_zoom}/{x}_{y}.png"
|
|
dest = floor_dir / f"{x}_{y}.png"
|
|
jobs.append((url, dest))
|
|
|
|
ok = 0
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool:
|
|
for path, success in pool.map(fetch_tile, jobs):
|
|
if success:
|
|
ok += 1
|
|
print(f" {tile_key}: {ok}/{len(jobs)} tiles, floors={[f['index'] for f in info['floors']]}, "
|
|
f"grid=" + ", ".join(f"f{f['index']}:{f['cols']}x{f['rows']}" for f in info["floors"]))
|
|
return info
|
|
|
|
|
|
def fetch_dungeon_data(d: dict, compiled_hash: str) -> bool:
|
|
"""Download split_floors.js + en_US.js for one dungeon."""
|
|
if not d.get("data_slug") or not d.get("mapping_id"):
|
|
return False
|
|
slug = d["data_slug"]; mid = d["mapping_id"]; tile_key = d["tile_key"]
|
|
out_root = KG_OUT / tile_key
|
|
out_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
splits_url = f"{DATA_BASE}/{compiled_hash}/mapcontext/data/{slug}/{mid}/split_floors.js"
|
|
lang_url = f"{DATA_BASE}/{compiled_hash}/mapcontext/data/{slug}/en_US.js"
|
|
try:
|
|
(out_root / "split_floors.js").write_bytes(http_get(splits_url))
|
|
(out_root / "lang.js").write_bytes(http_get(lang_url))
|
|
return True
|
|
except Exception as e:
|
|
print(f" data fetch failed for {tile_key}: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--workers", type=int, default=24)
|
|
ap.add_argument("--zoom", type=int, default=4)
|
|
ap.add_argument("--dungeon", help="only fetch this tile_key")
|
|
args = ap.parse_args()
|
|
|
|
registry = json.loads(REGISTRY.read_text())
|
|
expansion = registry.get("_expansion", "classic")
|
|
|
|
compiled_hash = discover_compiled_hash()
|
|
print(f"compiled hash: {compiled_hash}")
|
|
|
|
targets = registry["dungeons"]
|
|
if args.dungeon:
|
|
targets = [d for d in targets if d["tile_key"] == args.dungeon]
|
|
if not targets:
|
|
print(f"no dungeon with tile_key={args.dungeon}", file=sys.stderr)
|
|
return 2
|
|
|
|
summary = {"compiled_hash": compiled_hash, "dungeons": []}
|
|
for d in targets:
|
|
print(f"==> {d['name']} ({d['tile_key']})")
|
|
info = fetch_dungeon_tiles(d, expansion, args.zoom, args.workers)
|
|
info["data_fetched"] = fetch_dungeon_data(d, compiled_hash)
|
|
summary["dungeons"].append(info)
|
|
|
|
(KG_OUT / "_summary.json").write_text(json.dumps(summary, indent=2))
|
|
print(f"\nwrote {KG_OUT}/_summary.json — {len(summary['dungeons'])} dungeons")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|