Recipes

Copy-paste recipes for common Bulk URL Checker integrations. Each snippet is complete and standalone. Paste it into a file, replace YOUR_KEY, and run it. Use these as a starting point, or feed them to your AI assistant.

Monitor a sitemap weekly and alert on broken links

You publish a content site and want a weekly heads-up when any indexed URL goes 4xx/5xx, including redirects to error pages. Cron + the SDK + Slack/email do this in one short script.

monitor_sitemap.py
# monitor_sitemap.py -- run weekly via cron, GitHub Actions, etc.
# pip install bulkurlchecker requests
import os
import xml.etree.ElementTree as ET
import requests
from bulkurlchecker import Client
API_KEY = os.environ["BULKURLCHECKER_API_KEY"]
SITEMAP_URL = "https://your-site.example.com/sitemap.xml"
SLACK_URL = os.environ.get("SLACK_WEBHOOK_URL") # optional
def urls_from_sitemap(url: str) -> list[str]:
xml = requests.get(url, timeout=30).content
ns = {"sm": "http://www.sitemaps.org/schemas/sitemap/0.9"}
return [el.text for el in ET.fromstring(xml).findall(".//sm:loc", ns) if el.text]
def main():
urls = urls_from_sitemap(SITEMAP_URL)
print(f"Checking {len(urls)} URLs from {SITEMAP_URL}...")
client = Client(api_key=API_KEY)
results = client.check_urls(urls, wait_seconds=300)
broken = results.broken
if not broken:
print("All URLs healthy.")
return
print(f"Found {len(broken)} broken URL(s):")
lines = []
for r in broken:
line = f" {r.status_code or 'ERR'} {r.url}"
if r.final_url and r.final_url != r.url:
line += f" -> {r.final_url}"
print(line)
lines.append(line)
if SLACK_URL:
requests.post(SLACK_URL, json={
"text": f"*Broken links on {SITEMAP_URL}*: {len(broken)} of {len(urls)}\n" + "\n".join(lines[:50])
}, timeout=10)
if __name__ == "__main__":
main()

For sitemaps over ~2,000 URLs the synchronous wait will time out. Switch to client.submit() + client.iter_results() and let the job run in the background.

AWS Lambda: validate URLs from an HTTP trigger

You want a tiny serverless function that accepts a JSON array of URLs and returns broken ones. Bulk URL Checker handles the proxy + retry hell so the Lambda stays under 30 seconds.

handler.py
# handler.py -- AWS Lambda (Python 3.12 runtime)
# requirements.txt: bulkurlchecker
import json
import os
from bulkurlchecker import Client
API_KEY = os.environ["BULKURLCHECKER_API_KEY"]
client = Client(api_key=API_KEY) # reused across warm invocations
def lambda_handler(event, _context):
try:
body = json.loads(event.get("body") or "{}")
urls = body.get("urls") or []
if not urls:
return {"statusCode": 400, "body": json.dumps({"error": "missing 'urls' array"})}
if len(urls) > 1000:
return {"statusCode": 400, "body": json.dumps({"error": "max 1000 URLs per request"})}
# waitSeconds<29 keeps us under Lambda's default 30s timeout.
results = client.check_urls(urls, wait_seconds=25)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({
"checked": results.completed_urls,
"total": results.total_urls,
"broken": [
{"url": r.url, "status_code": r.status_code, "final_url": r.final_url}
for r in results.broken
],
}),
}
except Exception as e:
return {"statusCode": 500, "body": json.dumps({"error": str(e)})}

Add BULKURLCHECKER_API_KEY to Lambda environment variables. For batches > 1000 URLs, queue them into SQS and have a worker submit asynchronously instead.

Node.js: validate URLs from a CSV in a shell pipeline

You have a CSV of URLs and want to spit out a CSV of just the broken ones, all from the command line, no Python.

check-broken.mjs
// check-broken.mjs
// npm install bulkurlchecker csv-parse csv-stringify
// Usage: node check-broken.mjs input.csv > broken.csv
import { readFileSync } from "node:fs";
import { parse } from "csv-parse/sync";
import { stringify } from "csv-stringify/sync";
import { Client } from "bulkurlchecker";
const apiKey = process.env.BULKURLCHECKER_API_KEY;
if (!apiKey) { console.error("Set BULKURLCHECKER_API_KEY"); process.exit(1); }
const file = process.argv[2];
if (!file) { console.error("Usage: node check-broken.mjs <input.csv>"); process.exit(1); }
const rows = parse(readFileSync(file), { columns: true, skip_empty_lines: true });
const urls = rows.map(r => r.url || r.URL).filter(Boolean);
console.error(`Checking ${urls.length} URLs...`);
const client = new Client({ apiKey });
const out = await client.checkUrls(urls, { waitSeconds: 300 });
const broken = out.broken.map(r => ({
url: r.url,
status_code: r.statusCode ?? "",
final_url: r.finalUrl ?? "",
is_soft_404: r.isSoft404 ? "yes" : "no",
}));
process.stdout.write(stringify(broken, { header: true }));
console.error(`Done: ${broken.length} broken / ${out.totalUrls} total`);

Requires Node 18+ (global fetch). For inputs over ~2,000 URLs use the two-step submit + iterResults pattern so the shell isn't blocked for minutes.

Submit a big job (75K URLs) and stream results back

You have tens of thousands of URLs. The synchronous wait would time out. Submit asynchronously, poll, then stream results in pages.

bulk_export.py
# bulk_export.py
# pip install bulkurlchecker
import csv
import sys
import time
from bulkurlchecker import Client
client = Client(api_key="uck_live_YOUR_KEY")
with open("urls_75k.txt") as f:
urls = [line.strip() for line in f if line.strip()]
job = client.submit(urls)
print(f"Submitted {job.job_id} ({job.total_urls} URLs)")
# Poll every 30s. Server-side timeout is 15 min; this loop is bounded
# by max_seconds in case something stalls.
TERMINAL = {"completed", "failed", "cancelled", "paused"}
max_seconds = 3600 # 1 hour
deadline = time.time() + max_seconds
while time.time() < deadline:
status = client.get_job_status(job.job_id)
print(f" {status.status} {status.completed_urls}/{status.total_urls}")
if status.status in TERMINAL:
break
time.sleep(30)
# Stream results -- uses cursor pagination under the hood, stable
# even if the job is still finishing.
writer = csv.writer(sys.stdout)
writer.writerow(["url", "status_code", "final_url", "is_broken"])
for batch in client.iter_results(job.job_id, page_size=1000):
for r in batch:
writer.writerow([r.url, r.status_code or "", r.final_url or "", "yes" if r.is_broken else "no"])
print(f"\nDone.", file=sys.stderr)

Webhook receiver (Flask): handle job.completed events

You don't want to poll. Register a webhook endpoint, receive a signed POST when each job finishes, verify the signature, and trigger your downstream pipeline.

webhook_receiver.py
# webhook_receiver.py
# pip install bulkurlchecker flask
import os
from flask import Flask, request, jsonify
from bulkurlchecker import Client, verify_signature, InvalidSignatureError
# This is the SIGNING SECRET shown once when you created the endpoint
# at https://app.bulkurlchecker.com/dashboard/webhooks
SECRET = os.environ["MY_WEBHOOK_SECRET"]
API_KEY = os.environ["BULKURLCHECKER_API_KEY"]
app = Flask(__name__)
client = Client(api_key=API_KEY)
@app.post("/webhook/bulkurlchecker")
def webhook():
try:
verify_signature(
request.get_data(), # RAW bytes -- do not use request.get_json()
request.headers.get("Bulkurlchecker-Signature", ""),
SECRET,
)
except InvalidSignatureError:
return "", 401
event = request.get_json()
if event["type"] == "job.completed":
job_id = event["data"]["job_id"]
print(f"Job {job_id} finished. Pulling broken URLs...")
# Stream results -- they're paginated cursor-style.
broken = []
for batch in client.iter_results(job_id, page_size=1000):
broken.extend([r for r in batch if r.is_broken])
print(f" {len(broken)} broken URLs out of {event['data']['total_urls']}")
# ... kick off your downstream pipeline ...
return "", 200

verify_signature() raises InvalidSignatureError on missing / malformed / expired / tampered signatures (default 5-minute replay tolerance). Pair this receiver with `POST /api/v2/webhooks/endpoints` to register the URL.

Idempotent submit: safe retries on flaky networks

You're submitting from a worker that occasionally drops mid-request. Without idempotency keys, retries create duplicate jobs (and double-charge credits).

idempotent_submit.py
# idempotent_submit.py
# pip install bulkurlchecker
import uuid
from bulkurlchecker import Client, BulkURLCheckerError
client = Client(api_key="uck_live_YOUR_KEY")
# Generate ONE UUID per logical request. Reuse on retries so the
# server can dedupe.
KEY = str(uuid.uuid4())
URLS = ["https://example.com", "https://example.org"]
def submit_with_retry(urls: list[str], idempotency_key: str, max_tries: int = 3):
last_err = None
for attempt in range(1, max_tries + 1):
try:
return client.submit(urls, idempotency_key=idempotency_key)
except BulkURLCheckerError as e:
last_err = e
print(f"Attempt {attempt} failed: {e}; retrying...")
raise last_err
job = submit_with_retry(URLS, idempotency_key=KEY)
print(f"job_id: {job.job_id}")
# A second submit_with_retry(URLS, idempotency_key=KEY) within 24h
# returns the SAME job_id without creating a duplicate.

Same key + same body returns the original response. Same key + different body returns 409 ValidationError. That's intentional: don't reuse a key against a different payload.

CLI one-liner: check from terminal, pipe to grep/jq

Sometimes you just want to spit URLs at the API from a shell and inspect by hand.

shell.sh
# Install
pip install "bulkurlchecker[cli]"
# Set your API key once per shell session
export BULKURLCHECKER_API_KEY=uck_live_YOUR_KEY
# Check a few URLs ad-hoc, broken only
bulkurlchecker check --urls "https://a.com,https://b.com,https://c.com" --only-broken
# Check from a file
bulkurlchecker check urls.txt > report.csv
# Pipe from any URL-extractor
curl -s https://your-site.com/sitemap.xml \
| xmllint --xpath '//*[local-name()="loc"]/text()' - \
| bulkurlchecker check - --output jsonl \
| jq 'select(.is_broken)'
# Two-step for big batches
JOB=$(bulkurlchecker submit huge_list.txt)
bulkurlchecker status "$JOB"
bulkurlchecker results "$JOB" --output csv > report.csv

Missing your use case? Tell us what you're building and we'll add a recipe for it.

We use analytics cookies to improve your experience. Opt out anytime in Cookie Settings. Privacy Policy

Settings