如果有人感兴趣,这里有一些粗略的低级代码,用于仅使用标准库发出 dns 请求。
import secrets
import socket
# https://datatracker.ietf.org/doc/html/rfc1035
# https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#table-dns-parameters-4
def dns_request(name, qtype=1, addr=('127.0.0.53', 53), timeout=1): # A 1, NS 2, CNAME 5, SOA 6, NULL 10, PTR 12, MX 15, TXT 16, AAAA 28, NAPTR 35, * 255
name = name.rstrip('.')
queryid = secrets.token_bytes(2)
# Header. 1 for Recursion Desired, 1 question, 0 answers, 0 ns, 0 additional
request = queryid + b'\1\0\0\1\0\0\0\0\0\0'
# Question
for label in name.rstrip('.').split('.'):
assert len(label) < 64, name
request += int.to_bytes(len(label), length=1, byteorder='big')
request += label.encode()
request += b'\0' # terminates with the zero length octet for the null label of the root.
request += int.to_bytes(qtype, length=2, byteorder='big') # QTYPE
request += b'\0\1' # QCLASS = 1
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.sendto(request, addr)
s.settimeout(timeout)
try:
response, serveraddr = s.recvfrom(4096)
except socket.timeout:
raise TimeoutError(name, timeout)
assert serveraddr == addr, (serveraddr, addr)
assert response[:2] == queryid, (response[:2], queryid)
assert response[2] & 128 # QR = Response
assert not response[2] & 4 # No Truncation
assert response[3] & 128 # Recursion Available
error_code = response[3] % 16 # 0 = no error, 1 = format error, 2 = server failure, 3 = does not exist, 4 = not implemented, 5 = refused
qdcount = int.from_bytes(response[4:6], 'big')
ancount = int.from_bytes(response[6:8], 'big')
assert qdcount <= 1
# parse questions
qa = response[12:]
for question in range(qdcount):
domain, qa = parse_qname(qa, response)
qtype, qa = parse_int(qa, 2)
qclass, qa = parse_int(qa, 2)
# parse answers
answers = []
for answer in range(ancount):
domain, qa = parse_qname(qa, response)
qtype, qa = parse_int(qa, 2)
qclass, qa = parse_int(qa, 2)
ttl, qa = parse_int(qa, 4)
rdlength, qa = parse_int(qa, 2)
rdata, qa = qa[:rdlength], qa[rdlength:]
if qtype == 1: # IPv4 address
rdata = '.'.join(str(x) for x in rdata)
if qtype == 15: # MX
mx_pref, rdata = parse_int(rdata, 2)
if qtype in (2, 5, 12, 15): # NS, CNAME, MX
rdata, _ = parse_qname(rdata, response)
answer = (qtype, domain, ttl, rdata, mx_pref if qtype == 15 else None)
answers.append(answer)
return error_code, answers
def parse_int(byts, ln):
return int.from_bytes(byts[:ln], 'big'), byts[ln:]
def parse_qname(byts, full_response):
domain_parts = []
while True:
if byts[0] // 64: # OFFSET pointer
assert byts[0] // 64 == 3, byts[0]
offset, byts = parse_int(byts, 2)
offset = offset - (128 + 64) * 256 # clear out top 2 bits
label, _ = parse_qname(full_response[offset:], full_response)
domain_parts.append(label)
break
else: # regular QNAME
ln, byts = parse_int(byts, 1)
label, byts = byts[:ln], byts[ln:]
if not label:
break
domain_parts.append(label.decode())
return '.'.join(domain_parts), byts