这是一个AI编写的通用数字域名候选生成与可注册性检查脚本,本站的111654.xyz便是通过这个脚本寻找到的。
源代码
from __future__ import annotations
import argparse
import json
import socket
import ssl
import sys
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Callable
RDAP_URL_TEMPLATE = "https://rdap.centralnic.com/{suffix}/domain/{domain}"
WHOIS_SERVER = "whois.nic.xyz"
WHOIS_PORT = 43
USER_AGENT = "domain-suffix-checker/1.0"
# 默认域名后缀
DEFAULT_SUFFIX = ".xyz"
# 默认候选数字文件路径
DEFAULT_INPUT = Path("candidates.txt")
# 默认生成的数字列表输出路径
DEFAULT_NUMBERS_OUTPUT = Path("numbers.txt")
# 默认生成的域名候选列表输出路径
DEFAULT_DOMAIN_OUTPUT = Path("candidates.txt")
# 默认可注册结果输出路径
DEFAULT_AVAILABLE_OUTPUT = Path("available.txt")
# 默认已注册结果输出路径
DEFAULT_REGISTERED_OUTPUT = Path("registered.txt")
# 默认错误结果输出路径
DEFAULT_ERROR_OUTPUT = Path("errors.txt")
# 默认完整结果输出路径
DEFAULT_RESULTS_OUTPUT = Path("results.txt")
# 默认并发线程数
DEFAULT_WORKERS = 1
# 默认查询超时时间(秒)
DEFAULT_TIMEOUT = 15.0
# 默认失败重试次数
DEFAULT_RETRIES = 3
# 默认失败后暂停时间(秒)
DEFAULT_PAUSE = 1.0
# WHOIS 查询最小间隔(秒)
DEFAULT_WHOIS_INTERVAL = 1.0
# 限流退避基数(秒)
DEFAULT_RATE_LIMIT_BACKOFF = 60.0
# 日志打印间隔
DEFAULT_LOG_EVERY = 100
# 默认启用 RDAP 回退
DEFAULT_USE_RDAP_FALLBACK = True
# 默认从已有结果文件恢复检查
DEFAULT_RESUME = True
# 默认数字长度
DEFAULT_NUM_DIGITS = 6
PatternChecker = Callable[[str], bool]
# 支持的数字筛选规则定义:
# key = 规则名, value = (中文标签, 默认最小匹配长度, 检查函数)
PATTERN_DEFINITIONS = {
"same": ("相同数字", 4, lambda fragment: len(set(fragment)) == 1),
"ascending": ("递增顺子", 4, lambda fragment: all(int(fragment[i + 1]) - int(fragment[i]) == 1 for i in range(len(fragment) - 1))),
"descending": ("递减顺子", 5, lambda fragment: all(int(fragment[i + 1]) - int(fragment[i]) == -1 for i in range(len(fragment) - 1))),
"palindrome": ("回文", 6, lambda fragment: fragment == fragment[::-1]),
"rhythm": ("重复节奏", 5, lambda fragment: any(
len(fragment) % unit_length == 0
and all(fragment[i] == fragment[i % unit_length] for i in range(len(fragment)))
for unit_length in range(2, len(fragment))
)),
}
class WhoisRateLimitError(Exception):
pass
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="通用数字域名候选生成与可注册性检查脚本")
parser.add_argument("--suffix", type=str, default=DEFAULT_SUFFIX, help="域名后缀,例如 .xyz 或 .top")
parser.add_argument("--num-digits", type=int, default=DEFAULT_NUM_DIGITS, help="数字部分长度,默认 6")
parser.add_argument("--mode", choices=["generate", "check", "both"], default="both", help="运行模式: generate=生成候选, check=检查可注册, both=先生成再检查")
parser.add_argument("--start", type=int, default=0, help="生成候选号码的起始范围,默认 0")
parser.add_argument("--end", type=int, default=999999, help="生成候选号码的结束范围,默认 999999")
parser.add_argument("--numbers-input", type=Path, help="从已有号码文件加载候选号码(支持纯号码列表或带分类的 tab 文本)")
parser.add_argument("--domain-input", type=Path, help="从已有域名列表加载候选域名,优先于生成的域名")
parser.add_argument("--output-numbers", type=Path, default=DEFAULT_NUMBERS_OUTPUT, help="保存生成数字列表的文件")
parser.add_argument("--output-domains", type=Path, default=DEFAULT_DOMAIN_OUTPUT, help="保存生成域名候选列表的文件")
parser.add_argument("--patterns", nargs="*", choices=list(PATTERN_DEFINITIONS) + ["all"], default=["all"], help="启用的靓号规则,可重复指定,如 --patterns same palindrome")
parser.add_argument("--min-length-same", type=int, default=4, help="相同数字最小连续长度")
parser.add_argument("--min-length-ascending", type=int, default=4, help="递增顺子最小连续长度")
parser.add_argument("--min-length-descending", type=int, default=5, help="递减顺子最小连续长度")
parser.add_argument("--min-length-palindrome", type=int, default=6, help="回文最小连续长度")
parser.add_argument("--min-length-rhythm", type=int, default=5, help="重复节奏最小连续长度")
parser.add_argument("--available-output", type=Path, default=DEFAULT_AVAILABLE_OUTPUT, help="可注册域名输出文件")
parser.add_argument("--registered-output", type=Path, default=DEFAULT_REGISTERED_OUTPUT, help="已注册域名输出文件")
parser.add_argument("--error-output", type=Path, default=DEFAULT_ERROR_OUTPUT, help="查询错误输出文件")
parser.add_argument("--results-output", type=Path, default=DEFAULT_RESULTS_OUTPUT, help="完整结果输出文件")
parser.add_argument("--workers", type=int, default=DEFAULT_WORKERS, help="线程池并发数,WHOIS 主流程会自动限速")
parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT, help="查询超时时间(秒)")
parser.add_argument("--retries", type=int, default=DEFAULT_RETRIES, help="失败重试次数")
parser.add_argument("--pause", type=float, default=DEFAULT_PAUSE, help="失败后的重试等待时间(秒)")
parser.add_argument("--whois-interval", type=float, default=DEFAULT_WHOIS_INTERVAL, help="两个 WHOIS 查询之间的最小间隔(秒)")
parser.add_argument("--rate-limit-backoff", type=float, default=DEFAULT_RATE_LIMIT_BACKOFF, help="遇到 WHOIS 限流时的退避基数(秒)")
parser.add_argument("--use-rdap-fallback", action="store_true", default=DEFAULT_USE_RDAP_FALLBACK, help="启用 WHOIS 失败后的 RDAP 回退")
parser.add_argument("--no-use-rdap-fallback", action="store_false", dest="use_rdap_fallback", help="禁用 RDAP 回退")
parser.add_argument("--resume", action="store_true", default=DEFAULT_RESUME, help="从已有结果文件继续检查,跳过已完成条目")
parser.add_argument("--log-every", type=int, default=DEFAULT_LOG_EVERY, help="每处理多少条输出一次日志")
return parser.parse_args()
def normalize_suffix(suffix: str) -> str:
if not suffix.startswith("."):
suffix = "." + suffix
return suffix.lower()
def parse_selected_patterns(selected: list[str], args: argparse.Namespace) -> dict[str, tuple[str, int, PatternChecker]]:
if not selected or "all" in selected:
selected = [key for key in PATTERN_DEFINITIONS if key != "all"]
patterns: dict[str, tuple[str, int, PatternChecker]] = {}
for key in selected:
if key not in PATTERN_DEFINITIONS:
continue
label, default_min, checker = PATTERN_DEFINITIONS[key]
min_length = getattr(args, f"min_length_{key}", default_min)
patterns[key] = (label, min_length, checker)
return patterns
def format_number(value: int, width: int) -> str:
return f"{value:0{width}d}"
def is_valid_number_token(token: str, length: int) -> bool:
return len(token) == length and token.isdigit()
def load_numbers_from_file(path: Path, num_digits: int) -> list[str]:
numbers: list[str] = []
seen: set[str] = set()
for line in path.read_text(encoding="utf-8").splitlines():
if not line or line.startswith("#"):
continue
token = line.split()[0].strip()
if token.endswith("."):
token = token[:-1]
if is_valid_number_token(token, num_digits) and token not in seen:
numbers.append(token)
seen.add(token)
return numbers
def generate_candidate_numbers(start: int, end: int, num_digits: int, patterns: dict[str, tuple[str, int, PatternChecker]]) -> list[str]:
if start < 0 or end < 0 or start > end:
raise ValueError("范围必须是非负且起始值不大于结束值")
if end >= 10 ** num_digits:
raise ValueError(f"结束值必须小于 {10**num_digits}")
results: list[str] = []
seen: set[str] = set()
for value in range(start, end + 1):
number = format_number(value, num_digits)
matched = False
for label, min_length, checker in patterns.values():
for fragment_length in range(num_digits, min_length - 1, -1):
for start_index in range(0, num_digits - fragment_length + 1):
fragment = number[start_index : start_index + fragment_length]
if checker(fragment):
matched = True
break
if matched:
break
if matched:
break
if matched and number not in seen:
results.append(number)
seen.add(number)
return results
def numbers_to_domains(numbers: list[str], suffix: str) -> list[str]:
return [f"{number}{suffix}" for number in numbers]
def load_domains(path: Path, suffix: str) -> list[str]:
domains: list[str] = []
seen: set[str] = set()
for line in path.read_text(encoding="utf-8").splitlines():
domain = line.strip().lower()
if not domain or domain.startswith("#"):
continue
if not domain.endswith(suffix):
if domain.isdigit() and len(domain) == len(suffix) - 1:
domain = f"{domain}{suffix}"
else:
continue
if domain not in seen:
domains.append(domain)
seen.add(domain)
return domains
class DomainChecker:
def __init__(
self,
timeout: float,
retries: int,
pause: float,
whois_interval: float,
rate_limit_backoff: float,
use_rdap_fallback: bool,
) -> None:
self.timeout = timeout
self.retries = retries
self.pause = pause
self.whois_interval = whois_interval
self.rate_limit_backoff = rate_limit_backoff
self.use_rdap_fallback = use_rdap_fallback
self._ssl_context = ssl.create_default_context()
self._insecure_context = ssl._create_unverified_context()
self._force_insecure = False
self._lock = threading.Lock()
self._whois_lock = threading.Lock()
self._last_whois_at = 0.0
def _whois_request(self, domain: str) -> str:
with self._whois_lock:
elapsed = time.monotonic() - self._last_whois_at
if elapsed < self.whois_interval:
time.sleep(self.whois_interval - elapsed)
with socket.create_connection((WHOIS_SERVER, WHOIS_PORT), timeout=self.timeout) as sock:
sock.sendall(f"{domain}\r\n".encode("utf-8"))
chunks: list[bytes] = []
while True:
data = sock.recv(4096)
if not data:
break
chunks.append(data)
self._last_whois_at = time.monotonic()
return b"".join(chunks).decode("utf-8", "ignore")
def _parse_whois_response(self, domain: str, payload: str) -> tuple[str, str, str]:
content = payload.strip()
upper_content = content.upper()
if "QUERY RATE EXCEEDED" in upper_content:
raise WhoisRateLimitError("WHOIS query rate exceeded")
if "DOMAIN NOT FOUND" in upper_content or "THE QUERIED OBJECT DOES NOT EXIST" in upper_content:
return domain, "available", "WHOIS: DOMAIN NOT FOUND"
if "DOMAIN NAME:" in upper_content or "REGISTRY DOMAIN ID:" in upper_content:
return domain, "registered", "WHOIS: REGISTERED"
snippet = " ".join(content.split())[:200]
return domain, "error", f"WHOIS unexpected response: {snippet}"
def _request_rdap(self, domain: str, suffix: str, insecure: bool) -> tuple[int, str]:
url = RDAP_URL_TEMPLATE.format(suffix=suffix.lstrip("."), domain=domain)
request = urllib.request.Request(
url,
headers={"User-Agent": USER_AGENT, "Accept": "application/rdap+json, application/json"},
)
context = self._insecure_context if insecure else self._ssl_context
with urllib.request.urlopen(request, timeout=self.timeout, context=context) as response:
payload = response.read().decode("utf-8", "ignore")
return response.status, payload
def _build_result(self, domain: str, status_code: int, payload: str) -> tuple[str, str, str]:
if status_code == 200:
return domain, "registered", "RDAP 200"
return domain, f"unknown_{status_code}", summarize_payload(payload)
def check_domain(self, domain: str, suffix: str) -> tuple[str, str, str]:
last_error = ""
insecure = self._force_insecure
allow_insecure_retry = True
for attempt in range(self.retries + 1):
try:
return self._parse_whois_response(domain, self._whois_request(domain))
except WhoisRateLimitError as exc:
last_error = str(exc)
if attempt < self.retries:
time.sleep(self.rate_limit_backoff * (attempt + 1))
continue
return domain, "error", last_error
except (socket.timeout, TimeoutError) as exc:
last_error = f"WHOIS timeout: {exc}"
except OSError as exc:
last_error = f"WHOIS network error: {exc}"
if not self.use_rdap_fallback:
if attempt < self.retries:
time.sleep(self.pause)
continue
return domain, "error", last_error or "WHOIS failed"
try:
status_code, payload = self._request_rdap(domain, suffix, insecure=insecure)
if status_code == 404:
return domain, "available", summarize_payload(payload)
return self._build_result(domain, status_code, payload)
except urllib.error.HTTPError as exc:
payload = exc.read().decode("utf-8", "ignore")
if exc.code == 404:
return domain, "available", summarize_payload(payload)
last_error = f"HTTP {exc.code}: {summarize_payload(payload)}"
except urllib.error.URLError as exc:
reason = str(exc.reason)
last_error = reason
if "CERTIFICATE_VERIFY_FAILED" in reason and not insecure and allow_insecure_retry:
with self._lock:
self._force_insecure = True
allow_insecure_retry = False
try:
status_code, payload = self._request_rdap(domain, suffix, insecure=True)
if status_code == 404:
return domain, "available", summarize_payload(payload)
return self._build_result(domain, status_code, payload)
except Exception as retry_exc:
last_error = str(retry_exc)
except ssl.SSLError as exc:
reason = str(exc)
last_error = reason
if "CERTIFICATE_VERIFY_FAILED" in reason and not insecure and allow_insecure_retry:
with self._lock:
self._force_insecure = True
allow_insecure_retry = False
try:
status_code, payload = self._request_rdap(domain, suffix, insecure=True)
if status_code == 404:
return domain, "available", summarize_payload(payload)
return self._build_result(domain, status_code, payload)
except Exception as retry_exc:
last_error = str(retry_exc)
except Exception as exc:
last_error = str(exc)
if attempt < self.retries:
time.sleep(self.pause)
return domain, "error", last_error or "unknown error"
def summarize_payload(payload: str) -> str:
if not payload:
return ""
try:
data = json.loads(payload)
except json.JSONDecodeError:
return payload[:160].replace("\n", " ")
title = data.get("title")
description = data.get("description")
error_code = data.get("errorCode")
parts: list[str] = []
if error_code:
parts.append(str(error_code))
if title:
parts.append(str(title))
if isinstance(description, list):
parts.extend(str(item) for item in description[:2])
elif description:
parts.append(str(description))
return " | ".join(parts)[:200]
def read_existing_results(path: Path) -> set[str]:
if not path.exists():
return set()
existing: set[str] = set()
for line in path.read_text(encoding="utf-8").splitlines():
if not line or line.startswith("domain\t"):
continue
domain = line.split("\t", 1)[0].strip().lower()
if domain:
existing.add(domain)
return existing
def write_candidates(numbers: list[str], domains: list[str], numbers_path: Path, domains_path: Path) -> None:
if numbers_path:
numbers_path.write_text("\n".join(numbers) + ("\n" if numbers else ""), encoding="utf-8")
if domains_path:
domains_path.write_text("\n".join(domains) + ("\n" if domains else ""), encoding="utf-8")
def write_results(available: list[tuple[str, str, str]], registered: list[tuple[str, str, str]], errors: list[tuple[str, str]], results_path: Path, available_path: Path, registered_path: Path, error_path: Path) -> None:
available_path.write_text("\n".join(domain for domain, _, _ in available) + ("\n" if available else ""), encoding="utf-8")
registered_path.write_text("\n".join(domain for domain, _, _ in registered) + ("\n" if registered else ""), encoding="utf-8")
error_lines = [f"{domain}\t{detail}" for domain, detail in errors]
error_path.write_text("\n".join(error_lines) + ("\n" if error_lines else ""), encoding="utf-8")
# 完整结果包含可注册、已注册和错误三个部分
lines = ["domain\tstatus\tdetail"]
lines.extend(f"{domain}\t{status}\t{detail}" for domain, status, detail in available)
lines.extend(f"{domain}\t{status}\t{detail}" for domain, status, detail in registered)
lines.extend(f"{domain}\terror\t{detail}" for domain, detail in errors)
results_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def check_domains(domains: list[str], checker: DomainChecker, suffix: str, resume: bool, results_path: Path, log_every: int) -> list[tuple[str, str, str]]:
# 已存在结果域名,用于 resume 模式下跳过已处理项
existing = read_existing_results(results_path) if resume else set()
# 本次检查的域名状态结果列表
tasks: list[tuple[str, str, str]] = []
for index, domain in enumerate(domains, start=1):
if domain in existing:
continue
status = checker.check_domain(domain, suffix)
tasks.append(status)
if log_every and index % log_every == 0:
print(f"[{index}/{len(domains)}] checked {domain}: {status[1]}")
return tasks
def main() -> None:
args = parse_args()
suffix = normalize_suffix(args.suffix)
patterns = parse_selected_patterns(args.patterns, args)
if not patterns:
raise ValueError("请至少启用一个筛选规则")
# 生成或加载的候选域名和候选数字列表
candidate_domains: list[str] = []
candidate_numbers: list[str] = []
if args.mode in ("generate", "both"):
if args.numbers_input:
# 支持从现有数字文件读取候选号码
candidate_numbers = load_numbers_from_file(args.numbers_input, args.num_digits)
if not candidate_numbers:
raise ValueError(f"从文件 {args.numbers_input} 未加载到任何数字")
else:
# 按规则生成候选数字
candidate_numbers = generate_candidate_numbers(args.start, args.end, args.num_digits, patterns)
if not candidate_numbers:
raise ValueError("未生成任何候选号码,请检查范围与筛选规则")
# 将候选数字拼接为域名后缀
candidate_domains = numbers_to_domains(candidate_numbers, suffix)
write_candidates(candidate_numbers, candidate_domains, args.output_numbers, args.output_domains)
print(f"已生成 {len(candidate_numbers)} 个候选号码 / {len(candidate_domains)} 个候选域名")
print(f"号码输出: {args.output_numbers}")
print(f"域名输出: {args.output_domains}")
if args.mode in ("check", "both"):
if args.domain_input:
candidate_domains = load_domains(args.domain_input, suffix)
if not candidate_domains:
if args.mode == "check":
raise ValueError("检查模式需要通过 --domain-input 或 --output-domains 提供候选域名")
candidate_domains = load_domains(args.output_domains, suffix)
if not candidate_domains:
raise ValueError("未获取任何候选域名进行检查")
checker = DomainChecker(
timeout=args.timeout,
retries=args.retries,
pause=args.pause,
whois_interval=args.whois_interval,
rate_limit_backoff=args.rate_limit_backoff,
use_rdap_fallback=args.use_rdap_fallback,
)
print(f"开始检查 {len(candidate_domains)} 个域名,后缀 {suffix}")
results = check_domains(candidate_domains, checker, suffix, args.resume, args.results_output, args.log_every)
available = [(domain, status, detail) for domain, status, detail in results if status == "available"]
registered = [(domain, status, detail) for domain, status, detail in results if status == "registered"]
errors = [(domain, detail) for domain, status, detail in results if status == "error"]
write_results(available, registered, errors, args.results_output, args.available_output, args.registered_output, args.error_output)
print(f"检查完成: {len(available)} 可注册, {len(registered)} 已注册, {len(errors)} 错误")
print(f"结果输出: {args.results_output}")
print(f"可注册列表: {args.available_output}")
print(f"已注册列表: {args.registered_output}")
print(f"错误列表: {args.error_output}")
if __name__ == "__main__":
main()
功能
生成数字候选
按指定后缀组装域名候选
检查域名可注册性
支持 WHOIS 查询和 RDAP 回退
支持可配置的筛选规则
运行方式
python domain_suffix_checker.py --help参数说明
基本参数
--suffix:域名后缀,默认.xyz--num-digits:数字部分长度,默认6--mode:运行模式,可选generate|check|bothgenerate:只生成候选check:只检查可注册性both:先生成再检查
生成候选相关
--start:生成号码起始值,默认0--end:生成号码结束值,默认999999--numbers-input:从已有号码文件加载候选号码--domain-input:从已有域名列表加载候选域名--output-numbers:保存生成数字列表文件,默认numbers.txt--output-domains:保存生成域名候选列表文件,默认candidates.txt
筛选规则
--patterns:启用规则,支持以下选项:same:相同数字ascending:递增顺子descending:递减顺子palindrome:回文rhythm:重复节奏all:全部规则(默认)
每项规则最小长度
--min-length-same--min-length-ascending--min-length-descending--min-length-palindrome--min-length-rhythm
检查结果输出
--available-output:可注册域名输出文件,默认available.txt--registered-output:已注册域名输出文件,默认registered.txt--error-output:查询错误输出文件,默认errors.txt--results-output:完整结果输出文件,默认results.txt
查询调优参数
--timeout:单次查询超时时间(秒),默认15--retries:失败重试次数,默认3--pause:失败后重试等待时间(秒),默认1--whois-interval:WHOIS 查询最小间隔(秒),默认1--rate-limit-backoff:限流退避基数(秒),默认60--use-rdap-fallback:启用 RDAP 回退(默认开启)--no-use-rdap-fallback:禁用 RDAP 回退--resume:从已有结果继续检查,默认开启--log-every:每处理多少条记录输出一次日志,默认100
示例
1. 生成 .xyz 候选
python domain_suffix_checker.py --mode generate --suffix .xyz --start 0 --end 999999 --patterns same ascending2. 生成并检查可注册
python domain_suffix_checker.py --mode both --suffix .xyz --start 0 --end 999999 --patterns same ascending3. 仅检查已有域名列表
python domain_suffix_checker.py --mode check --domain-input test_domains.txt --suffix .xyz4. 只生成回文和重复节奏
python domain_suffix_checker.py --mode generate --patterns palindrome rhythm --start 0 --end 999999输出文件说明
numbers.txt:生成的数字候选列表candidates.txt:生成的域名候选列表available.txt:可注册域名列表registered.txt:已注册域名列表errors.txt:查询错误和详细报错results.txt:完整结果明细
注意事项
--domain-input优先于自动生成的候选域名--numbers-input可直接加载现成数字列表建议先用小范围测试,如
--start 0 --end 99如果需要中断后继续,开启
--resume