-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gandi-flatten-spf.py
executable file
·227 lines (168 loc) · 7.83 KB
/
gandi-flatten-spf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!/usr/bin/env python3
import argparse
import logging
import os
import requests
import json
import re
from dns import resolver
from dns import name
from sender_policy_flattener.crawler import crawl
from sender_policy_flattener.formatting import ips_to_spf_strings
DEFAULT_LOGLEVEL = logging.getLevelName(logging.INFO)
# Template URLs to get / put the TXT records of {domain}
URL_TEMPLATE = "https://api.gandi.net/v5/livedns/domains/{domain}/records/@/TXT"
# HTTP headers with {apikey}
AUTHZ_TEMPLATE = 'Apikey {apikey}'
# Regular expression to detect SPF records between other records
# Note that a SPF string may not exceed 255 characters,
# however it is allowed so split the record's content into several quote-enclosed strings to bypass this limit
# (very useful when flattening DNS into many IPs)
RE_SPF = re.compile(r'v=spf1')
RE_MERGE_SUBSTRINGS = re.compile(r'"\s+"')
# Template string to build TXT record for SPF
SPF_TEMPLATE = "\"v=spf1 {includes} ~all\""
# Default name servers to use to resolve domain addresses
DEFAULT_DNS = ['8.8.8.8','4.4.4.4']
def spf2ips(records, domain, resolvers):
ips = set()
for rrecord, rdtype in records.items():
for ip in crawl(rrecord, rdtype, domain, resolvers):
ips.add(ip)
ips = ips_to_spf_strings(ips)
return ips
"""
Builds a valid TXT record for SPF by resolving DNS into IP addresses
_emailProvidersToIp: list of SPF domains to convert to a list of IPs (of the email providers you want to add to SPF)
_emailProvidersToInclude: : list of SPF domains NOT to convert to IP
_nameServers: list of DNS you want to use to resolve domains into IP addresses
Returns the TXT record as a string
"""
def createFlatSpfRecord( _emailProvidersToIp, _emailProvidersToInclude, _nameServers ):
dnsResolver = resolver.Resolver()
dnsResolver.nameservers = _nameServers
parts = []
for emailProvider in _emailProvidersToIp:
parts = parts + spf2ips({emailProvider: 'txt'}, emailProvider, dnsResolver)
for emailProvider in _emailProvidersToInclude:
parts = parts + ["include:" + emailProvider]
logging.debug("Flattened IPs : %s",parts)
return SPF_TEMPLATE.format( includes=" ".join(parts) )
"""
Reads JSON TXT record using Gandi's API
Returns the list of TXT records as an array of strings
"""
def gandi_getTxtRecords( _domain, _apikey ):
url = URL_TEMPLATE.format(domain=_domain)
headers = {'authorization': AUTHZ_TEMPLATE.format(apikey=_apikey)}
logging.debug("Calling : %s",url)
logging.debug("Headers : %s",headers)
response = requests.get(url, headers=headers)
logging.debug("Response & .text : %s %s",response,response.text)
txtRecords = json.loads(response.text)
return txtRecords['rrset_values']
"""
Reads JSON TXT records from a JSON file
Returns the list of TXT records as an array of strings
"""
def file_getTxtRecords( _filename ):
with open(_filename) as f:
response = json.dumps(json.load(f))
txtRecords = json.loads(response)
return txtRecords['rrset_values']
"""
Takes existing TXT SPF records and returns a flattened version.
This function only does formatting (no online call).
"""
def flattenSpfRecords( _txtRecords, _flatSpfRecord ):
oldRecords = []
newRecords = []
done = None
for record in _txtRecords:
# If this record is a SPF one (starts with 'v=spf1')
# Replace it with the flatten record
# Note : using search instead of match because the record might start with quotes (or not ?)
if RE_SPF.search(record):
logging.debug("Flattening record : %s",record)
# There must be only one such record
if not done:
newRecords = newRecords + [_flatSpfRecord]
# Normalize the old record so that it can be compared later
# => The old record may contain split strings,
# but we don't do it in the new record (it's handled by the API)
oldRecords = oldRecords + [ RE_MERGE_SUBSTRINGS.sub('',record) ]
done = record
else:
logging.warning("Multiple SPF records detected !\n1. %s\n2. %s",done,record)
# Else, simply put it back in the list (unchanged)
else:
logging.debug("Keeping record untouched : %s",record)
newRecords = newRecords + [record]
oldRecords = oldRecords + [record]
return oldRecords, newRecords
"""
Custom comparison function.
Compares one by one the records before and after,
and especially dive into the SPF records to make sure that equality is not
dependent on the order of entries within the record.
Returns True if the records have changed
"""
def isRecordsChanged( _recordsBefore, _recordsAfter ):
if len(_recordsBefore) != len(_recordsAfter):
return True
sortedBefore = sorted(_recordsBefore)
sortedAfter = sorted(_recordsAfter)
for r in range(len(_recordsBefore)):
if RE_SPF.search(sortedBefore[r]) and RE_SPF.search(sortedAfter[r]):
listBefore = sorted( re.split( r'\s+', sortedBefore[r] ) )
listAfter = sorted( re.split( r'\s+', sortedAfter[r] ) )
for i in range(len(listBefore)):
if listBefore[i] != listAfter[i]:
return True
else:
if sortedAfter[r] != sortedBefore[r]:
return True
return False
"""
Updates the TXT records with the flattened SPF ones, using Gandi's Live DNS API
"""
def updateDns( _domain, _apikey, _txtRecords, _dryRun=False ):
url = URL_TEMPLATE.format(domain=_domain)
headers = {'authorization': AUTHZ_TEMPLATE.format(apikey=_apikey)}
body = json.dumps( {"rrset_values":_txtRecords} )
if not _dryRun:
response = requests.put(url, headers=headers, data=body)
else:
logging.info("Calling : %s",url)
logging.info("With headers : %s",headers)
logging.info("And body : %s",body)
parser = argparse.ArgumentParser(description="Flatten SPF records using Gandi's Live DNS API")
parser.add_argument('-d', '--domain', nargs='+', required=True, help="Domains you own from which to update the TXT record for SPF")
parser.add_argument('-e', '--email-providers', nargs='+', required=True, help="E-mail providers' SPF domains to add to the TXT record AFTER CONVERSION to a list of IP addresses")
parser.add_argument('-E', '--email-providers-as-is', nargs='+', required=True, help="E-mail providers' SPF domains to add to the TXT record AS-IS (no IP conversion)")
parser.add_argument('-k', '--api-key', help="Your Gandi API key (otherwise looks for the 'GANDI_APIKEY' environment variable)")
parser.add_argument('-r', '--dns', default=DEFAULT_DNS, nargs='+', help="DNS servers to use to resolve into IP addresses")
parser.add_argument('-l', '--log-level', default=DEFAULT_LOGLEVEL, help="Log level")
parser.add_argument('-L', '--load', help="A JSON file to load the result from Gandi's API instead of calling the API")
parser.add_argument('-K', '--dry-run', action='store_true', help="Dry-run mode (will not change the records, only print)")
args = parser.parse_args()
logging.basicConfig(level=logging.getLevelName(args.log_level))
logging.debug(repr(args))
if args.api_key:
apikey = args.api_key
else :
apikey = os.environ['GANDI_APIKEY']
flatSpf = createFlatSpfRecord(args.email_providers,args.email_providers_as_is,args.dns)
for domain in args.domain:
if args.load:
txts = file_getTxtRecords(args.load)
else:
txts = gandi_getTxtRecords(domain,apikey)
oldTxts, newTxts = flattenSpfRecords(txts,flatSpf)
logging.debug("Old records : %s",oldTxts)
logging.debug("New records : %s",newTxts)
# Only proceed if records have changed
if isRecordsChanged(newTxts,oldTxts):
updateDns(domain,apikey,newTxts,args.dry_run)
else:
logging.info("No change - quitting")