-
-
Notifications
You must be signed in to change notification settings - Fork 295
/
Copy path12-dnsrecords.py
336 lines (291 loc) · 12.6 KB
/
12-dnsrecords.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
#!/usr/bin/env python3
#
# Copyright (c) 2025 YunoHost Contributors
#
# This file is part of YunoHost (see https://yunohost.org)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import logging
import os
import re
from datetime import datetime, timedelta
from typing import List
from moulinette.utils.process import check_output
from publicsuffix2 import PublicSuffixList
from yunohost.diagnosis import Diagnoser
from yunohost.dns import (
_build_dns_conf,
_get_dns_zone_for_domain,
_get_relative_name_for_dns_zone,
)
from yunohost.domain import _get_maindomain, domain_list
from yunohost.utils.dns import (
YNH_DYNDNS_DOMAINS,
dig,
is_special_use_tld,
is_yunohost_dyndns_domain,
)
logger = logging.getLogger("yunohost.diagnosis")
class MyDiagnoser(Diagnoser):
id_ = os.path.splitext(os.path.basename(__file__))[0].split("-")[1]
cache_duration = 600
dependencies: List[str] = ["ip"]
def run(self):
main_domain = _get_maindomain()
major_domains = domain_list(exclude_subdomains=True)["domains"]
for domain in major_domains:
logger.debug("Diagnosing DNS conf for %s" % domain)
for report in self.check_domain(
domain,
domain == main_domain,
):
yield report
# Check if a domain buy by the user will expire soon
psl = PublicSuffixList()
domains_from_registrar = [
psl.get_public_suffix(domain) for domain in major_domains
]
domains_from_registrar = [
domain for domain in domains_from_registrar if "." in domain
]
domains_from_registrar = set(domains_from_registrar) - set(
YNH_DYNDNS_DOMAINS + ["netlib.re"]
)
for report in self.check_expiration_date(domains_from_registrar):
yield report
def check_domain(self, domain, is_main_domain):
if is_special_use_tld(domain):
yield dict(
meta={"domain": domain},
data={},
status="INFO",
summary="diagnosis_dns_specialusedomain",
)
return
base_dns_zone = _get_dns_zone_for_domain(domain)
basename = _get_relative_name_for_dns_zone(domain, base_dns_zone)
expected_configuration = _build_dns_conf(
domain, include_empty_AAAA_if_no_ipv6=True
)
categories = ["basic", "mail", "extra"]
for category in categories:
records = expected_configuration[category]
discrepancies = []
results = {}
for r in records:
id_ = r["type"] + ":" + r["name"]
fqdn = r["name"] + "." + base_dns_zone if r["name"] != "@" else domain
# Ugly hack to not check mail records for subdomains stuff,
# otherwise will end up in a shitstorm of errors for people with many subdomains...
# Should find a cleaner solution in the suggested conf...
if r["type"] in ["MX", "TXT"] and fqdn not in [
domain,
f"mail._domainkey.{domain}",
f"_dmarc.{domain}",
]:
continue
r["current"] = self.get_current_record(fqdn, r["type"])
if r["value"] == "@":
r["value"] = domain + "."
elif r["type"] == "CNAME":
r["value"] = r["value"] # + f".{base_dns_zone}."
if self.current_record_match_expected(r):
results[id_] = "OK"
else:
if r["current"] is None:
results[id_] = "MISSING"
discrepancies.append(("diagnosis_dns_missing_record", r))
else:
results[id_] = "WRONG"
discrepancies.append(("diagnosis_dns_discrepancy", r))
def its_important():
# Every mail DNS records are important for main domain
# For other domain, we only report it as a warning for now...
if is_main_domain and category == "mail":
return True
elif category == "basic":
# A bad or missing A record is critical ...
# And so is a wrong AAAA record
# (However, a missing AAAA record is acceptable)
if (
results[f"A:{basename}"] != "OK"
or results[f"AAAA:{basename}"] == "WRONG"
):
return True
return False
if discrepancies:
status = "ERROR" if its_important() else "WARNING"
summary = "diagnosis_dns_bad_conf"
else:
status = "SUCCESS"
summary = "diagnosis_dns_good_conf"
# If status is okay and there's actually no expected records
# (e.g. XMPP disabled)
# then let's not yield any diagnosis line
if not records and status == "SUCCESS":
continue
output = dict(
meta={"domain": domain, "category": category},
data=results,
status=status,
summary=summary,
)
if discrepancies:
# For ynh-managed domains (nohost.me etc...), tell people to try to "yunohost dyndns update --force"
if is_yunohost_dyndns_domain(domain):
output["details"] = ["diagnosis_dns_try_dyndns_update_force"]
# Otherwise point to the documentation
else:
output["details"] = ["diagnosis_dns_point_to_doc"]
output["details"] += discrepancies
yield output
def get_current_record(self, fqdn, type_):
success, answers = dig(fqdn, type_, resolvers="force_external")
if success != "ok":
return None
else:
if type_ == "TXT" and isinstance(answers, list):
for part in answers:
if part.startswith('"v=spf1'):
return part
return answers[0] if len(answers) == 1 else answers
def current_record_match_expected(self, r):
if r["value"] is not None and r["current"] is None:
return False
if r["value"] is None and r["current"] is not None:
return False
elif isinstance(r["current"], list):
return False
if r["type"] == "TXT":
# Split expected/current
# from "v=DKIM1; k=rsa; p=hugekey;"
# to a set like {'v=DKIM1', 'k=rsa', 'p=...'}
# Additionally, for DKIM, because the key is pretty long,
# some DNS registrar sometime split it into several pieces like this:
# "p=foo" "bar" (with a space and quotes in the middle)...
expected = set(r["value"].strip(';" ').replace(";", " ").split())
current = set(
r["current"].replace('" "', "").strip(';" ').replace(";", " ").split()
)
# For SPF, ignore parts starting by ip4: or ip6:
if "v=spf1" in r["value"]:
current = {
part
for part in current
if not part.startswith("ip4:") and not part.startswith("ip6:")
}
if "v=DMARC1" in r["value"]:
for param in current:
if "=" not in param:
return False
key, value = param.split("=", 1)
if key == "p":
return value in ["none", "quarantine", "reject"]
return expected == current
elif r["type"] == "MX":
# For MX, we want to ignore the priority
expected = r["value"].split()[-1]
current = r["current"].split()[-1]
return expected == current
elif r["type"] == "CAA":
# For CAA, check only the last item, ignore the 0 / 128 nightmare
expected = r["value"].split()[-1]
current = r["current"].split()[-1]
return expected == current
else:
return r["current"] == r["value"]
def check_expiration_date(self, domains):
"""
Alert if expiration date of a domain is soon
"""
details = {"not_found": [], "error": [], "warning": [], "success": []}
for domain in domains:
expire_date = self.get_domain_expiration(domain)
if isinstance(expire_date, str):
status_ns, _ = dig(domain, "NS", resolvers="force_external")
status_a, _ = dig(domain, "A", resolvers="force_external")
if "ok" not in [status_ns, status_a]:
# i18n: diagnosis_domain_not_found_details
details["not_found"].append(
(
"diagnosis_domain_%s_details" % (expire_date),
{"domain": domain},
)
)
else:
logger.debug("Dyndns domain: %s" % (domain))
continue
expire_in = expire_date - datetime.now()
alert_type = "success"
if expire_in <= timedelta(15):
alert_type = "error"
elif expire_in <= timedelta(45):
alert_type = "warning"
args = {
"domain": domain,
"days": expire_in.days - 1,
"expire_date": str(expire_date),
}
details[alert_type].append(("diagnosis_domain_expires_in", args))
for alert_type in ["success", "error", "warning", "not_found"]:
if details[alert_type]:
if alert_type == "not_found":
meta = {"test": "domain_not_found"}
else:
meta = {"test": "domain_expiration"}
# Allow to ignore specifically a single domain
if len(details[alert_type]) == 1:
meta["domain"] = details[alert_type][0][1]["domain"]
# i18n: diagnosis_domain_expiration_not_found
# i18n: diagnosis_domain_expiration_error
# i18n: diagnosis_domain_expiration_warning
# i18n: diagnosis_domain_expiration_success
# i18n: diagnosis_domain_expiration_not_found_details
yield dict(
meta=meta,
data={},
status=(
alert_type.upper() if alert_type != "not_found" else "WARNING"
),
summary="diagnosis_domain_expiration_" + alert_type,
details=details[alert_type],
)
def get_domain_expiration(self, domain):
"""
Return the expiration datetime of a domain or None
"""
command = "whois -H %s || echo failed" % (domain)
out = check_output(command).split("\n")
# Reduce output to determine if whois answer is equivalent to NOT FOUND
filtered_out = [
line
for line in out
if re.search(r"^[a-zA-Z0-9 ]{4,25}:", line, re.IGNORECASE)
and not re.match(r">>> Last update of whois", line, re.IGNORECASE)
and not re.match(r"^NOTICE:", line, re.IGNORECASE)
and not re.match(r"^%%", line, re.IGNORECASE)
and not re.match(r'"https?:"', line, re.IGNORECASE)
]
# If there is less than 7 lines, it's NOT FOUND response
if len(filtered_out) <= 6:
return "not_found"
for line in out:
match = re.search(r"Expir.+(\d{4}-\d{2}-\d{2})", line, re.IGNORECASE)
if match is not None:
return datetime.strptime(match.group(1), "%Y-%m-%d")
match = re.search(r"Expir.+(\d{2}-\w{3}-\d{4})", line, re.IGNORECASE)
if match is not None:
return datetime.strptime(match.group(1), "%d-%b-%Y")
return "expiration_not_found"