跳到主要内容

数字域名靓号筛查

目录

这是一个AI编写的通用数字域名候选生成与可注册性检查脚本,本站的111654.xyz便是通过这个脚本寻找到的。

源代码

from __future__ import annotations

import argparse
import json
import socket
import ssl
import sys
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Callable

RDAP_URL_TEMPLATE = "https://rdap.centralnic.com/{suffix}/domain/{domain}"
WHOIS_SERVER = "whois.nic.xyz"
WHOIS_PORT = 43
USER_AGENT = "domain-suffix-checker/1.0"

# 默认域名后缀
DEFAULT_SUFFIX = ".xyz"
# 默认候选数字文件路径
DEFAULT_INPUT = Path("candidates.txt")
# 默认生成的数字列表输出路径
DEFAULT_NUMBERS_OUTPUT = Path("numbers.txt")
# 默认生成的域名候选列表输出路径
DEFAULT_DOMAIN_OUTPUT = Path("candidates.txt")
# 默认可注册结果输出路径
DEFAULT_AVAILABLE_OUTPUT = Path("available.txt")
# 默认已注册结果输出路径
DEFAULT_REGISTERED_OUTPUT = Path("registered.txt")
# 默认错误结果输出路径
DEFAULT_ERROR_OUTPUT = Path("errors.txt")
# 默认完整结果输出路径
DEFAULT_RESULTS_OUTPUT = Path("results.txt")
# 默认并发线程数
DEFAULT_WORKERS = 1
# 默认查询超时时间(秒)
DEFAULT_TIMEOUT = 15.0
# 默认失败重试次数
DEFAULT_RETRIES = 3
# 默认失败后暂停时间(秒)
DEFAULT_PAUSE = 1.0
# WHOIS 查询最小间隔(秒)
DEFAULT_WHOIS_INTERVAL = 1.0
# 限流退避基数(秒)
DEFAULT_RATE_LIMIT_BACKOFF = 60.0
# 日志打印间隔
DEFAULT_LOG_EVERY = 100
# 默认启用 RDAP 回退
DEFAULT_USE_RDAP_FALLBACK = True
# 默认从已有结果文件恢复检查
DEFAULT_RESUME = True
# 默认数字长度
DEFAULT_NUM_DIGITS = 6

PatternChecker = Callable[[str], bool]

# 支持的数字筛选规则定义:
# key = 规则名, value = (中文标签, 默认最小匹配长度, 检查函数)
PATTERN_DEFINITIONS = {
    "same": ("相同数字", 4, lambda fragment: len(set(fragment)) == 1),
    "ascending": ("递增顺子", 4, lambda fragment: all(int(fragment[i + 1]) - int(fragment[i]) == 1 for i in range(len(fragment) - 1))),
    "descending": ("递减顺子", 5, lambda fragment: all(int(fragment[i + 1]) - int(fragment[i]) == -1 for i in range(len(fragment) - 1))),
    "palindrome": ("回文", 6, lambda fragment: fragment == fragment[::-1]),
    "rhythm": ("重复节奏", 5, lambda fragment: any(
        len(fragment) % unit_length == 0
        and all(fragment[i] == fragment[i % unit_length] for i in range(len(fragment)))
        for unit_length in range(2, len(fragment))
    )),
}


class WhoisRateLimitError(Exception):
    pass


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="通用数字域名候选生成与可注册性检查脚本")

    parser.add_argument("--suffix", type=str, default=DEFAULT_SUFFIX, help="域名后缀,例如 .xyz 或 .top")
    parser.add_argument("--num-digits", type=int, default=DEFAULT_NUM_DIGITS, help="数字部分长度,默认 6")
    parser.add_argument("--mode", choices=["generate", "check", "both"], default="both", help="运行模式: generate=生成候选, check=检查可注册, both=先生成再检查")

    parser.add_argument("--start", type=int, default=0, help="生成候选号码的起始范围,默认 0")
    parser.add_argument("--end", type=int, default=999999, help="生成候选号码的结束范围,默认 999999")
    parser.add_argument("--numbers-input", type=Path, help="从已有号码文件加载候选号码(支持纯号码列表或带分类的 tab 文本)")
    parser.add_argument("--domain-input", type=Path, help="从已有域名列表加载候选域名,优先于生成的域名")
    parser.add_argument("--output-numbers", type=Path, default=DEFAULT_NUMBERS_OUTPUT, help="保存生成数字列表的文件")
    parser.add_argument("--output-domains", type=Path, default=DEFAULT_DOMAIN_OUTPUT, help="保存生成域名候选列表的文件")

    parser.add_argument("--patterns", nargs="*", choices=list(PATTERN_DEFINITIONS) + ["all"], default=["all"], help="启用的靓号规则,可重复指定,如 --patterns same palindrome")
    parser.add_argument("--min-length-same", type=int, default=4, help="相同数字最小连续长度")
    parser.add_argument("--min-length-ascending", type=int, default=4, help="递增顺子最小连续长度")
    parser.add_argument("--min-length-descending", type=int, default=5, help="递减顺子最小连续长度")
    parser.add_argument("--min-length-palindrome", type=int, default=6, help="回文最小连续长度")
    parser.add_argument("--min-length-rhythm", type=int, default=5, help="重复节奏最小连续长度")

    parser.add_argument("--available-output", type=Path, default=DEFAULT_AVAILABLE_OUTPUT, help="可注册域名输出文件")
    parser.add_argument("--registered-output", type=Path, default=DEFAULT_REGISTERED_OUTPUT, help="已注册域名输出文件")
    parser.add_argument("--error-output", type=Path, default=DEFAULT_ERROR_OUTPUT, help="查询错误输出文件")
    parser.add_argument("--results-output", type=Path, default=DEFAULT_RESULTS_OUTPUT, help="完整结果输出文件")

    parser.add_argument("--workers", type=int, default=DEFAULT_WORKERS, help="线程池并发数,WHOIS 主流程会自动限速")
    parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT, help="查询超时时间(秒)")
    parser.add_argument("--retries", type=int, default=DEFAULT_RETRIES, help="失败重试次数")
    parser.add_argument("--pause", type=float, default=DEFAULT_PAUSE, help="失败后的重试等待时间(秒)")
    parser.add_argument("--whois-interval", type=float, default=DEFAULT_WHOIS_INTERVAL, help="两个 WHOIS 查询之间的最小间隔(秒)")
    parser.add_argument("--rate-limit-backoff", type=float, default=DEFAULT_RATE_LIMIT_BACKOFF, help="遇到 WHOIS 限流时的退避基数(秒)")
    parser.add_argument("--use-rdap-fallback", action="store_true", default=DEFAULT_USE_RDAP_FALLBACK, help="启用 WHOIS 失败后的 RDAP 回退")
    parser.add_argument("--no-use-rdap-fallback", action="store_false", dest="use_rdap_fallback", help="禁用 RDAP 回退")
    parser.add_argument("--resume", action="store_true", default=DEFAULT_RESUME, help="从已有结果文件继续检查,跳过已完成条目")
    parser.add_argument("--log-every", type=int, default=DEFAULT_LOG_EVERY, help="每处理多少条输出一次日志")
    return parser.parse_args()


def normalize_suffix(suffix: str) -> str:
    if not suffix.startswith("."):
        suffix = "." + suffix
    return suffix.lower()


def parse_selected_patterns(selected: list[str], args: argparse.Namespace) -> dict[str, tuple[str, int, PatternChecker]]:
    if not selected or "all" in selected:
        selected = [key for key in PATTERN_DEFINITIONS if key != "all"]

    patterns: dict[str, tuple[str, int, PatternChecker]] = {}
    for key in selected:
        if key not in PATTERN_DEFINITIONS:
            continue
        label, default_min, checker = PATTERN_DEFINITIONS[key]
        min_length = getattr(args, f"min_length_{key}", default_min)
        patterns[key] = (label, min_length, checker)
    return patterns


def format_number(value: int, width: int) -> str:
    return f"{value:0{width}d}"


def is_valid_number_token(token: str, length: int) -> bool:
    return len(token) == length and token.isdigit()


def load_numbers_from_file(path: Path, num_digits: int) -> list[str]:
    numbers: list[str] = []
    seen: set[str] = set()
    for line in path.read_text(encoding="utf-8").splitlines():
        if not line or line.startswith("#"):
            continue
        token = line.split()[0].strip()
        if token.endswith("."):
            token = token[:-1]
        if is_valid_number_token(token, num_digits) and token not in seen:
            numbers.append(token)
            seen.add(token)
    return numbers


def generate_candidate_numbers(start: int, end: int, num_digits: int, patterns: dict[str, tuple[str, int, PatternChecker]]) -> list[str]:
    if start < 0 or end < 0 or start > end:
        raise ValueError("范围必须是非负且起始值不大于结束值")
    if end >= 10 ** num_digits:
        raise ValueError(f"结束值必须小于 {10**num_digits}")

    results: list[str] = []
    seen: set[str] = set()

    for value in range(start, end + 1):
        number = format_number(value, num_digits)
        matched = False
        for label, min_length, checker in patterns.values():
            for fragment_length in range(num_digits, min_length - 1, -1):
                for start_index in range(0, num_digits - fragment_length + 1):
                    fragment = number[start_index : start_index + fragment_length]
                    if checker(fragment):
                        matched = True
                        break
                if matched:
                    break
            if matched:
                break

        if matched and number not in seen:
            results.append(number)
            seen.add(number)
    return results


def numbers_to_domains(numbers: list[str], suffix: str) -> list[str]:
    return [f"{number}{suffix}" for number in numbers]


def load_domains(path: Path, suffix: str) -> list[str]:
    domains: list[str] = []
    seen: set[str] = set()
    for line in path.read_text(encoding="utf-8").splitlines():
        domain = line.strip().lower()
        if not domain or domain.startswith("#"):
            continue
        if not domain.endswith(suffix):
            if domain.isdigit() and len(domain) == len(suffix) - 1:
                domain = f"{domain}{suffix}"
            else:
                continue
        if domain not in seen:
            domains.append(domain)
            seen.add(domain)
    return domains


class DomainChecker:
    def __init__(
        self,
        timeout: float,
        retries: int,
        pause: float,
        whois_interval: float,
        rate_limit_backoff: float,
        use_rdap_fallback: bool,
    ) -> None:
        self.timeout = timeout
        self.retries = retries
        self.pause = pause
        self.whois_interval = whois_interval
        self.rate_limit_backoff = rate_limit_backoff
        self.use_rdap_fallback = use_rdap_fallback
        self._ssl_context = ssl.create_default_context()
        self._insecure_context = ssl._create_unverified_context()
        self._force_insecure = False
        self._lock = threading.Lock()
        self._whois_lock = threading.Lock()
        self._last_whois_at = 0.0

    def _whois_request(self, domain: str) -> str:
        with self._whois_lock:
            elapsed = time.monotonic() - self._last_whois_at
            if elapsed < self.whois_interval:
                time.sleep(self.whois_interval - elapsed)
            with socket.create_connection((WHOIS_SERVER, WHOIS_PORT), timeout=self.timeout) as sock:
                sock.sendall(f"{domain}\r\n".encode("utf-8"))
                chunks: list[bytes] = []
                while True:
                    data = sock.recv(4096)
                    if not data:
                        break
                    chunks.append(data)
            self._last_whois_at = time.monotonic()
        return b"".join(chunks).decode("utf-8", "ignore")

    def _parse_whois_response(self, domain: str, payload: str) -> tuple[str, str, str]:
        content = payload.strip()
        upper_content = content.upper()
        if "QUERY RATE EXCEEDED" in upper_content:
            raise WhoisRateLimitError("WHOIS query rate exceeded")
        if "DOMAIN NOT FOUND" in upper_content or "THE QUERIED OBJECT DOES NOT EXIST" in upper_content:
            return domain, "available", "WHOIS: DOMAIN NOT FOUND"
        if "DOMAIN NAME:" in upper_content or "REGISTRY DOMAIN ID:" in upper_content:
            return domain, "registered", "WHOIS: REGISTERED"
        snippet = " ".join(content.split())[:200]
        return domain, "error", f"WHOIS unexpected response: {snippet}"

    def _request_rdap(self, domain: str, suffix: str, insecure: bool) -> tuple[int, str]:
        url = RDAP_URL_TEMPLATE.format(suffix=suffix.lstrip("."), domain=domain)
        request = urllib.request.Request(
            url,
            headers={"User-Agent": USER_AGENT, "Accept": "application/rdap+json, application/json"},
        )
        context = self._insecure_context if insecure else self._ssl_context
        with urllib.request.urlopen(request, timeout=self.timeout, context=context) as response:
            payload = response.read().decode("utf-8", "ignore")
            return response.status, payload

    def _build_result(self, domain: str, status_code: int, payload: str) -> tuple[str, str, str]:
        if status_code == 200:
            return domain, "registered", "RDAP 200"
        return domain, f"unknown_{status_code}", summarize_payload(payload)

    def check_domain(self, domain: str, suffix: str) -> tuple[str, str, str]:
        last_error = ""
        insecure = self._force_insecure
        allow_insecure_retry = True
        for attempt in range(self.retries + 1):
            try:
                return self._parse_whois_response(domain, self._whois_request(domain))
            except WhoisRateLimitError as exc:
                last_error = str(exc)
                if attempt < self.retries:
                    time.sleep(self.rate_limit_backoff * (attempt + 1))
                    continue
                return domain, "error", last_error
            except (socket.timeout, TimeoutError) as exc:
                last_error = f"WHOIS timeout: {exc}"
            except OSError as exc:
                last_error = f"WHOIS network error: {exc}"

            if not self.use_rdap_fallback:
                if attempt < self.retries:
                    time.sleep(self.pause)
                    continue
                return domain, "error", last_error or "WHOIS failed"

            try:
                status_code, payload = self._request_rdap(domain, suffix, insecure=insecure)
                if status_code == 404:
                    return domain, "available", summarize_payload(payload)
                return self._build_result(domain, status_code, payload)
            except urllib.error.HTTPError as exc:
                payload = exc.read().decode("utf-8", "ignore")
                if exc.code == 404:
                    return domain, "available", summarize_payload(payload)
                last_error = f"HTTP {exc.code}: {summarize_payload(payload)}"
            except urllib.error.URLError as exc:
                reason = str(exc.reason)
                last_error = reason
                if "CERTIFICATE_VERIFY_FAILED" in reason and not insecure and allow_insecure_retry:
                    with self._lock:
                        self._force_insecure = True
                    allow_insecure_retry = False
                    try:
                        status_code, payload = self._request_rdap(domain, suffix, insecure=True)
                        if status_code == 404:
                            return domain, "available", summarize_payload(payload)
                        return self._build_result(domain, status_code, payload)
                    except Exception as retry_exc:
                        last_error = str(retry_exc)
            except ssl.SSLError as exc:
                reason = str(exc)
                last_error = reason
                if "CERTIFICATE_VERIFY_FAILED" in reason and not insecure and allow_insecure_retry:
                    with self._lock:
                        self._force_insecure = True
                    allow_insecure_retry = False
                    try:
                        status_code, payload = self._request_rdap(domain, suffix, insecure=True)
                        if status_code == 404:
                            return domain, "available", summarize_payload(payload)
                        return self._build_result(domain, status_code, payload)
                    except Exception as retry_exc:
                        last_error = str(retry_exc)
            except Exception as exc:
                last_error = str(exc)

            if attempt < self.retries:
                time.sleep(self.pause)

        return domain, "error", last_error or "unknown error"


def summarize_payload(payload: str) -> str:
    if not payload:
        return ""
    try:
        data = json.loads(payload)
    except json.JSONDecodeError:
        return payload[:160].replace("\n", " ")

    title = data.get("title")
    description = data.get("description")
    error_code = data.get("errorCode")
    parts: list[str] = []
    if error_code:
        parts.append(str(error_code))
    if title:
        parts.append(str(title))
    if isinstance(description, list):
        parts.extend(str(item) for item in description[:2])
    elif description:
        parts.append(str(description))
    return " | ".join(parts)[:200]


def read_existing_results(path: Path) -> set[str]:
    if not path.exists():
        return set()
    existing: set[str] = set()
    for line in path.read_text(encoding="utf-8").splitlines():
        if not line or line.startswith("domain\t"):
            continue
        domain = line.split("\t", 1)[0].strip().lower()
        if domain:
            existing.add(domain)
    return existing


def write_candidates(numbers: list[str], domains: list[str], numbers_path: Path, domains_path: Path) -> None:
    if numbers_path:
        numbers_path.write_text("\n".join(numbers) + ("\n" if numbers else ""), encoding="utf-8")
    if domains_path:
        domains_path.write_text("\n".join(domains) + ("\n" if domains else ""), encoding="utf-8")


def write_results(available: list[tuple[str, str, str]], registered: list[tuple[str, str, str]], errors: list[tuple[str, str]], results_path: Path, available_path: Path, registered_path: Path, error_path: Path) -> None:
    available_path.write_text("\n".join(domain for domain, _, _ in available) + ("\n" if available else ""), encoding="utf-8")
    registered_path.write_text("\n".join(domain for domain, _, _ in registered) + ("\n" if registered else ""), encoding="utf-8")
    error_lines = [f"{domain}\t{detail}" for domain, detail in errors]
    error_path.write_text("\n".join(error_lines) + ("\n" if error_lines else ""), encoding="utf-8")

    # 完整结果包含可注册、已注册和错误三个部分
    lines = ["domain\tstatus\tdetail"]
    lines.extend(f"{domain}\t{status}\t{detail}" for domain, status, detail in available)
    lines.extend(f"{domain}\t{status}\t{detail}" for domain, status, detail in registered)
    lines.extend(f"{domain}\terror\t{detail}" for domain, detail in errors)
    results_path.write_text("\n".join(lines) + "\n", encoding="utf-8")


def check_domains(domains: list[str], checker: DomainChecker, suffix: str, resume: bool, results_path: Path, log_every: int) -> list[tuple[str, str, str]]:
    # 已存在结果域名,用于 resume 模式下跳过已处理项
    existing = read_existing_results(results_path) if resume else set()
    # 本次检查的域名状态结果列表
    tasks: list[tuple[str, str, str]] = []

    for index, domain in enumerate(domains, start=1):
        if domain in existing:
            continue
        status = checker.check_domain(domain, suffix)
        tasks.append(status)
        if log_every and index % log_every == 0:
            print(f"[{index}/{len(domains)}] checked {domain}: {status[1]}")
    return tasks


def main() -> None:
    args = parse_args()
    suffix = normalize_suffix(args.suffix)
    patterns = parse_selected_patterns(args.patterns, args)
    if not patterns:
        raise ValueError("请至少启用一个筛选规则")

    # 生成或加载的候选域名和候选数字列表
    candidate_domains: list[str] = []
    candidate_numbers: list[str] = []

    if args.mode in ("generate", "both"):
        if args.numbers_input:
            # 支持从现有数字文件读取候选号码
            candidate_numbers = load_numbers_from_file(args.numbers_input, args.num_digits)
            if not candidate_numbers:
                raise ValueError(f"从文件 {args.numbers_input} 未加载到任何数字")
        else:
            # 按规则生成候选数字
            candidate_numbers = generate_candidate_numbers(args.start, args.end, args.num_digits, patterns)
            if not candidate_numbers:
                raise ValueError("未生成任何候选号码,请检查范围与筛选规则")

        # 将候选数字拼接为域名后缀
        candidate_domains = numbers_to_domains(candidate_numbers, suffix)
        write_candidates(candidate_numbers, candidate_domains, args.output_numbers, args.output_domains)
        print(f"已生成 {len(candidate_numbers)} 个候选号码 / {len(candidate_domains)} 个候选域名")
        print(f"号码输出: {args.output_numbers}")
        print(f"域名输出: {args.output_domains}")

    if args.mode in ("check", "both"):
        if args.domain_input:
            candidate_domains = load_domains(args.domain_input, suffix)
        if not candidate_domains:
            if args.mode == "check":
                raise ValueError("检查模式需要通过 --domain-input 或 --output-domains 提供候选域名")
            candidate_domains = load_domains(args.output_domains, suffix)

        if not candidate_domains:
            raise ValueError("未获取任何候选域名进行检查")

        checker = DomainChecker(
            timeout=args.timeout,
            retries=args.retries,
            pause=args.pause,
            whois_interval=args.whois_interval,
            rate_limit_backoff=args.rate_limit_backoff,
            use_rdap_fallback=args.use_rdap_fallback,
        )

        print(f"开始检查 {len(candidate_domains)} 个域名,后缀 {suffix}")
        results = check_domains(candidate_domains, checker, suffix, args.resume, args.results_output, args.log_every)

        available = [(domain, status, detail) for domain, status, detail in results if status == "available"]
        registered = [(domain, status, detail) for domain, status, detail in results if status == "registered"]
        errors = [(domain, detail) for domain, status, detail in results if status == "error"]

        write_results(available, registered, errors, args.results_output, args.available_output, args.registered_output, args.error_output)
        print(f"检查完成: {len(available)} 可注册, {len(registered)} 已注册, {len(errors)} 错误")
        print(f"结果输出: {args.results_output}")
        print(f"可注册列表: {args.available_output}")
        print(f"已注册列表: {args.registered_output}")
        print(f"错误列表: {args.error_output}")


if __name__ == "__main__":
    main()

功能

  • 生成数字候选

  • 按指定后缀组装域名候选

  • 检查域名可注册性

  • 支持 WHOIS 查询和 RDAP 回退

  • 支持可配置的筛选规则

运行方式

python domain_suffix_checker.py --help

参数说明

基本参数

  • --suffix:域名后缀,默认 .xyz

  • --num-digits:数字部分长度,默认 6

  • --mode:运行模式,可选 generate|check|both

    • generate:只生成候选

    • check:只检查可注册性

    • both:先生成再检查

生成候选相关

  • --start:生成号码起始值,默认 0

  • --end:生成号码结束值,默认 999999

  • --numbers-input:从已有号码文件加载候选号码

  • --domain-input:从已有域名列表加载候选域名

  • --output-numbers:保存生成数字列表文件,默认 numbers.txt

  • --output-domains:保存生成域名候选列表文件,默认 candidates.txt

筛选规则

  • --patterns:启用规则,支持以下选项:

    • same:相同数字

    • ascending:递增顺子

    • descending:递减顺子

    • palindrome:回文

    • rhythm:重复节奏

    • all:全部规则(默认)

每项规则最小长度

  • --min-length-same

  • --min-length-ascending

  • --min-length-descending

  • --min-length-palindrome

  • --min-length-rhythm

检查结果输出

  • --available-output:可注册域名输出文件,默认 available.txt

  • --registered-output:已注册域名输出文件,默认 registered.txt

  • --error-output:查询错误输出文件,默认 errors.txt

  • --results-output:完整结果输出文件,默认 results.txt

查询调优参数

  • --timeout:单次查询超时时间(秒),默认 15

  • --retries:失败重试次数,默认 3

  • --pause:失败后重试等待时间(秒),默认 1

  • --whois-interval:WHOIS 查询最小间隔(秒),默认 1

  • --rate-limit-backoff:限流退避基数(秒),默认 60

  • --use-rdap-fallback:启用 RDAP 回退(默认开启)

  • --no-use-rdap-fallback:禁用 RDAP 回退

  • --resume:从已有结果继续检查,默认开启

  • --log-every:每处理多少条记录输出一次日志,默认 100

示例

1. 生成 .xyz 候选

python domain_suffix_checker.py --mode generate --suffix .xyz --start 0 --end 999999 --patterns same ascending

2. 生成并检查可注册

python domain_suffix_checker.py --mode both --suffix .xyz --start 0 --end 999999 --patterns same ascending

3. 仅检查已有域名列表

python domain_suffix_checker.py --mode check --domain-input test_domains.txt --suffix .xyz

4. 只生成回文和重复节奏

python domain_suffix_checker.py --mode generate --patterns palindrome rhythm --start 0 --end 999999

输出文件说明

  • numbers.txt:生成的数字候选列表

  • candidates.txt:生成的域名候选列表

  • available.txt:可注册域名列表

  • registered.txt:已注册域名列表

  • errors.txt:查询错误和详细报错

  • results.txt:完整结果明细

注意事项

  • --domain-input 优先于自动生成的候选域名

  • --numbers-input 可直接加载现成数字列表

  • 建议先用小范围测试,如 --start 0 --end 99

  • 如果需要中断后继续,开启 --resume