#!/usr/bin/env python3 """Import CC-licensed plant images from Wikimedia Commons via Wikidata into HerbAPI.""" import json import os import re import subprocess import sys import time import urllib.parse import urllib.request # Force unbuffered output sys.stdout.reconfigure(line_buffering=True) sys.stderr.reconfigure(line_buffering=True) # --- Configuration --- S3_ENDPOINT = "http://garage.sub-net.at:3900" S3_BUCKET = "herbapi" S3_ACCESS_KEY = "GK1a89859373a6ac56bf11958f" S3_SECRET_KEY = "bea45a333b5c7b1efdd7466bdbcac54d8642fa19f0c617ca2fd64bd07951b899" S3_REGION = "garage" DB_HOST = "10.31.3.90" DB_USER = "herbapi" DB_PASS = "_6Qo_jEFhE9LZOEbwLynEWoLbc6B4Ipj" DB_NAME = "herbapi" USER_AGENT = "HerbAPI/1.0 (https://herbapi.naturalised.at; florian.berthold@sub-net.at)" THUMB_WIDTH = 800 REQUEST_DELAY = 0.3 ALLOWED_LICENSES = { "cc0", "cc-zero", "cc0 1.0", "cc-zero 1.0", "public domain", "pd", "pd-self", "pd-old", "pd-old-auto", "pd-old-100", "pd-us", "pd-usgov", "pd-author", "cc by 1.0", "cc by 2.0", "cc by 2.5", "cc by 3.0", "cc by 4.0", "cc-by-1.0", "cc-by-2.0", "cc-by-2.5", "cc-by-3.0", "cc-by-4.0", "cc by-sa 1.0", "cc by-sa 2.0", "cc by-sa 2.5", "cc by-sa 3.0", "cc by-sa 4.0", "cc-by-sa-1.0", "cc-by-sa-2.0", "cc-by-sa-2.5", "cc-by-sa-3.0", "cc-by-sa-4.0", } def slugify(name: str) -> str: """Convert scientific name to a URL-safe slug.""" return re.sub(r'[^a-z0-9]+', '-', name.lower()).strip('-') def psql(query: str) -> str: """Run a psql query and return output.""" env = os.environ.copy() env["PGPASSWORD"] = DB_PASS result = subprocess.run( ["psql", "-h", DB_HOST, "-U", DB_USER, DB_NAME, "-t", "-A", "-c", query], capture_output=True, text=True, env=env ) if result.returncode != 0: print(f" psql error: {result.stderr.strip()}", file=sys.stderr) return result.stdout.strip() def fetch_json(url: str) -> dict | None: """Fetch JSON from a URL with proper User-Agent.""" req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) try: with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read()) except Exception as e: print(f" HTTP error fetching {url}: {e}") return None def get_wikidata_image(qid: str) -> str | None: """Query Wikidata SPARQL for P18 image filename.""" sparql = f"SELECT ?image WHERE {{ wd:{qid} wdt:P18 ?image }} LIMIT 1" url = "https://query.wikidata.org/sparql?" + urllib.parse.urlencode({ "query": sparql, "format": "json" }) data = fetch_json(url) if not data: return None bindings = data.get("results", {}).get("bindings", []) if not bindings: return None image_url = bindings[0]["image"]["value"] # URL like http://commons.wikimedia.org/wiki/Special:FilePath/Filename.jpg filename = urllib.parse.unquote(image_url.rsplit("/", 1)[-1]) return filename def get_commons_info(filename: str) -> dict | None: """Get image info from Wikimedia Commons API.""" url = "https://commons.wikimedia.org/w/api.php?" + urllib.parse.urlencode({ "action": "query", "titles": f"File:{filename}", "prop": "imageinfo", "iiprop": "url|extmetadata", "iiurlwidth": str(THUMB_WIDTH), "format": "json", }) data = fetch_json(url) if not data: return None pages = data.get("query", {}).get("pages", {}) for page_id, page in pages.items(): if page_id == "-1": return None imageinfo = page.get("imageinfo", []) if not imageinfo: return None info = imageinfo[0] meta = info.get("extmetadata", {}) thumb_url = info.get("thumburl") or info.get("url") desc_url = info.get("descriptionurl", "") license_short = meta.get("LicenseShortName", {}).get("value", "") artist_html = meta.get("Artist", {}).get("value", "") # Strip HTML tags from artist artist = re.sub(r'<[^>]+>', '', artist_html).strip() # Clean up whitespace artist = re.sub(r'\s+', ' ', artist) return { "thumb_url": thumb_url, "description_url": desc_url, "license": license_short, "artist": artist, "filename": filename, } return None def is_license_allowed(license_str: str) -> bool: """Check if a license is in our allowed list.""" normalized = license_str.lower().strip() # Direct match if normalized in ALLOWED_LICENSES: return True # Check for NC or ND if "nc" in normalized or "nd" in normalized: return False # Check patterns if normalized.startswith("public domain") or normalized.startswith("pd"): return True if re.match(r'^cc[- ]?by[- ]?sa[- ]?\d', normalized): return True if re.match(r'^cc[- ]?by[- ]?\d', normalized): return True if re.match(r'^cc[- ]?0', normalized) or normalized == "cc zero": return True return False def normalize_license(license_str: str) -> str: """Normalize license string for storage.""" low = license_str.lower().strip() if "public domain" in low or low.startswith("pd"): return "Public domain" if re.match(r'^cc[- ]?0', low) or "cc-zero" in low or "cc zero" in low: return "CC0 1.0" # CC BY-SA X.0 m = re.match(r'^cc[- ]?by[- ]?sa[- ]?(\d+\.?\d*)', low) if m: return f"CC BY-SA {m.group(1)}" # CC BY X.0 m = re.match(r'^cc[- ]?by[- ]?(\d+\.?\d*)', low) if m: return f"CC BY {m.group(1)}" return license_str def s3_upload(s3_key: str, data: bytes, content_type: str = "image/jpeg"): """Upload to S3 Garage using AWS CLI.""" tmp_path = "/tmp/_herbapi_upload_tmp_file_file" with open(tmp_path, "wb") as f: f.write(data) env = os.environ.copy() env["AWS_ACCESS_KEY_ID"] = S3_ACCESS_KEY env["AWS_SECRET_ACCESS_KEY"] = S3_SECRET_KEY env["AWS_DEFAULT_REGION"] = S3_REGION result = subprocess.run( [ "aws", "s3", "cp", tmp_path, f"s3://{S3_BUCKET}/{s3_key}", "--endpoint-url", S3_ENDPOINT, "--content-type", content_type, ], capture_output=True, text=True, env=env ) os.unlink(tmp_path) if result.returncode != 0: raise RuntimeError(f"S3 upload failed: {result.stderr.strip()}") def download_image(url: str) -> bytes | None: """Download image data from URL.""" req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) try: with urllib.request.urlopen(req, timeout=60) as resp: return resp.read() except Exception as e: print(f" Download error: {e}") return None def main(): # 1. Get species rows = psql( "SELECT id, name_scientific, wikidata_qid FROM species " "WHERE wikidata_qid IS NOT NULL AND wikidata_qid <> '' " "ORDER BY name_scientific" ) if not rows: print("No species with wikidata_qid found.") return species_list = [] for line in rows.split("\n"): parts = line.split("|") if len(parts) == 3: species_list.append({ "id": parts[0], "name": parts[1], "qid": parts[2], }) print(f"Found {len(species_list)} species with Wikidata QIDs.") # 2. Get existing images existing = set() existing_rows = psql("SELECT entity_id FROM images WHERE entity_type = 'species'") if existing_rows: for line in existing_rows.split("\n"): line = line.strip() if line: existing.add(line) print(f"Found {len(existing)} species that already have images.") imported = 0 skipped_existing = 0 skipped_no_image = 0 skipped_license = 0 skipped_download = 0 errors = 0 for i, sp in enumerate(species_list): name = sp["name"] qid = sp["qid"] sp_id = sp["id"] slug = slugify(name) print(f"\n[{i+1}/{len(species_list)}] {name} ({qid})") if sp_id in existing: print(" Already has image, skipping.") skipped_existing += 1 continue # Query Wikidata for image time.sleep(REQUEST_DELAY) filename = get_wikidata_image(qid) if not filename: print(" No image on Wikidata.") skipped_no_image += 1 continue # Get Commons info time.sleep(REQUEST_DELAY) info = get_commons_info(filename) if not info: print(f" Could not get Commons info for {filename}") skipped_no_image += 1 continue # Check license raw_license = info["license"] if not is_license_allowed(raw_license): print(f" License not allowed: {raw_license}") skipped_license += 1 continue norm_license = normalize_license(raw_license) artist = info["artist"] thumb_url = info["thumb_url"] desc_url = info["description_url"] print(f" License: {raw_license} -> {norm_license}") print(f" Artist: {artist[:80]}") print(f" Thumbnail: {thumb_url[:100]}...") # Download image time.sleep(REQUEST_DELAY) image_data = download_image(thumb_url) if not image_data: print(" Failed to download image.") skipped_download += 1 continue print(f" Downloaded {len(image_data)} bytes") # Determine file extension from URL ext = "jpg" if ".png" in thumb_url.lower(): ext = "png" elif ".svg" in thumb_url.lower(): ext = "svg" elif ".gif" in thumb_url.lower(): ext = "gif" s3_key = f"species/{slug}.{ext}" content_type = { "jpg": "image/jpeg", "png": "image/png", "svg": "image/svg+xml", "gif": "image/gif", }.get(ext, "image/jpeg") # Upload to S3 try: s3_upload(s3_key, image_data, content_type) print(f" Uploaded to s3://{S3_BUCKET}/{s3_key}") except RuntimeError as e: print(f" S3 upload failed: {e}") errors += 1 continue # Insert into database caption = f"Photo: {artist}" if artist else "Wikimedia Commons" # Escape single quotes for SQL caption_esc = caption.replace("'", "''") desc_url_esc = desc_url.replace("'", "''") norm_license_esc = norm_license.replace("'", "''") s3_key_esc = s3_key.replace("'", "''") insert_sql = ( f"INSERT INTO images (id, entity_type, entity_id, s3_key, caption, source_url, license, is_primary) " f"VALUES (gen_random_uuid(), 'species', '{sp_id}', '{s3_key_esc}', " f"'{caption_esc}', '{desc_url_esc}', '{norm_license_esc}', true)" ) result = psql(insert_sql) # psql returns empty on success for INSERT print(f" Inserted into images table.") imported += 1 print(f"\n{'='*60}") print(f"DONE!") print(f" Imported: {imported}") print(f" Skipped (existing):{skipped_existing}") print(f" Skipped (no image):{skipped_no_image}") print(f" Skipped (license): {skipped_license}") print(f" Skipped (download):{skipped_download}") print(f" Errors: {errors}") print(f" Total processed: {len(species_list)}") if __name__ == "__main__": main()