#!/usr/bin/python2 # PLEASE NOTE. The use of this file is DEPRECATED. It will be removed # from this repository on or before Jan. 1 2020 pursuant to the End Of Life # of Python 2.7. Please use the file at # https://dev.fmp.com/contrib/dmarc_shield3.py instead, which is # compatible with Python vs. 3.5+ as well as Python 2.7. # # * * * * * * * * * # # Much of this code is borrowed from the DMARC mitigation implemented by the # GNU Mailman list server. The code herein, like GNU Mailman, is free software # and can redistributed and/or modifed under the terms of the GNU General # Public License as published by the Free Software Foundation, either version 3 # of the License, or (at your option) any later version. # # Many thanks to the Mailman developers team for making this code available and # to Mark Sapiro for assistance in porting this code for the purpose of setting # up more general DMARC mitigation for redirected email. # # Collected, modified and otherwise coded by Lindsay Haisley (fmouse@fmp.com) # # # INSTRUCTIONS FOR USE: # --------------------- # dmarc_shield.py should be on the first line in a .courier file with a leading # "|" to read an incoming email from standard input. The only argument to the # program MUST be a local address with a domain name which resolves to the IP # address of the redirecting system. The second line in the .courier file MUST # be a delivery instruction which will provide Courier with the address of the # redirection target, either an email address or a Dynamic Delivery # Instruction. These two lines, in their simplest form should look similar to # the following. # # | /usr/local/sbin/dmarc_shield.py postmaster@example.com # user@somesystem.com # # dmarc_shield.py is called twice. On the first pass, the email is handed off, # possibly with appropriately munged headers, to Courier's sendmail clone and # dmarc_shield.py exits with an exit code of 99, ending processing of the # .courier file for the first pass. A special header, X-DMARC-shield, is # inserted in the email during the first pass. On the second pass, # dmarc_shield.py notes the presence of this header and simply exits with an # exit code of zero, passing control on to the second delivery instruction in # .courier which sends the email on to the redirection target. # # The X-DMARC-shield header also indicates whether the email headers were # "Passed-through" (no potentially problematic DMARC policy on the sending # system) or "Munged", to mitigate potential DMARC problems on the redirection # target based on the DMARC policy of the sending system. # # If munging of the From address takes place, the From address is rewritten so # that the original From address is written out as an address comment with the # From address itself being a specified local address with a domain name which # resolves to the IP address of the redirecting system. If the email contains # no Reply-To address, one is created containing the original From address # information. If a Reply-To address already exists in the headers, it's left # unchanged. # # Mitigation of DMARC sender policy using this program on the redirecting # system requires that this sysem MUST publish a proper SPF record associating # the domain name used in the munged From header with the IP address of the # redirecting system. # # These are the exit codes used by dmarc_shield.py: # # 0 - Fall through to the next delivery instruction in .courier # # 99 - Consider an email successfully delivered and terminate # processing of delivery instructions. # # Errors and/or run-time processing comments are logged via syslog, facility = # mail and level = info. Verbosity of logging can be controlled with the # "verbose" setting, below. Setting verbose = False eliminates logging of # information about the operation of the program which is strictly # informational. If dmarc_shield.py isn't working for you, set verbose = True # and check your mail log files! # # Please edit the following as desired or necessary for your system. # Set the following to True to munge all messages passed through # dmarc_shield.py, as opposed to only those from potentially problematic # senders. There's a processing overhead for this, however if your user base # expects this level of header uniformity it may be what you want. munge_all = False # Set the following to True to munge messages with a From address for # which the DMARC policy is "p=quarantine". This shouldn't be # necessary. munge_quarantine = False # Enable simple logging (recommended). logging = True # Set this to False to turn off verbose logging to the mail log: verbose = False # Make the From header address a VERP address. If you set this to True you MUST # make the .courier-$EXT-default address a working address. VERP may be useful # for debugging purposes where the re-written From address is logged by other # components of the Courier mail suite. use_verp = True # Organizational Domain database URL. The address coded below is the SOURCE # address for this list. Please set up a cron job to download it to a LOCAL # web server, ideally on the same machine running Courier, or on the same LAN, # and access it there using an appropriate URL. The maintainers at # publicsuffix.org request that the list be downloaded no more frequently than # once a day: orgdburl = 'https://publicsuffix.org/list/public_suffix_list.dat' # Path to Courier's sendmail clone: sendmail = "/usr/sbin/sendmail" ####################################################################### import syslog as S import sys import os import re import urllib2 from email.utils import parseaddr import subprocess as P try: import dns.resolver from dns.exception import DNSException dns_resolver = True except ImportError: dns_resolver = False version = "18.04.17.01" def seconds(s): return s # The next functions read data from # https://publicsuffix.org/list/public_suffix_list.dat and implement the # algorithm at https://publicsuffix.org/list/ to find the "Organizational # Domain" corresponding to a From: domain. s_dict = {} def get_suffixes(url): """This method loads and parses the data from the url argument into s_dict for use by get_org_dom.""" global s_dict if s_dict: return if not url: return try: d = urllib2.urlopen(url) except urllib2.URLError, e: syslog('error', 'Unable to retrieve data from %s: %s', url, e) return for line in d.readlines(): if not line.strip() or line.startswith(' ') or line.startswith('//'): continue line = re.sub(' .*', '', line.strip()) if not line: continue parts = line.lower().split('.') if parts[0].startswith('!'): exc = True parts = [parts[0][1:]] + parts[1:] else: exc = False parts.reverse() k = '.'.join(parts) s_dict[k] = exc def _get_dom(d, l): """A helper to get a domain name consisting of the first l+1 labels in d.""" dom = d[:min(l+1, len(d))] dom.reverse() return '.'.join(dom) def get_org_dom(domain): """Given a domain name, this returns the corresponding Organizational Domain which may be the same as the input.""" global s_dict if not s_dict: get_suffixes(orgdburl) hits = [] d = domain.lower().split('.') d.reverse() for k in s_dict.keys(): ks = k.split('.') if len(d) >= len(ks): for i in range(len(ks)-1): if d[i] != ks[i] and ks[i] != '*': break else: if d[len(ks)-1] == ks[-1] or ks[-1] == '*': hits.append(k) if not hits: return _get_dom(d, 1) l = 0 for k in hits: if s_dict[k]: # It's an exception return _get_dom(d, len(k.split('.'))-1) if len(k.split('.')) > l: l = len(k.split('.')) return _get_dom(d, l) # This takes an email address, and returns True if DMARC policy is p=reject # or possibly quarantine. def IsDMARCProhibited(email): """DMARC mitigation target method""" if not dns_resolver: # This is a problem; log it. S.syslog('DNS lookup for DMARC mitigation not available. Install Python\'s dns.resolver module') return False email = email.lower() # Scan from the right in case quoted local part has an '@'. at_sign = email.rfind('@') if at_sign < 1: return False f_dom = email[at_sign+1:] x = _DMARCProhibited(email, '_dmarc.' + f_dom) if x != 'continue': return x o_dom = get_org_dom(f_dom) if o_dom != f_dom: x = _DMARCProhibited(email, '_dmarc.' + o_dom, org=True) if x != 'continue': return x return False def _DMARCProhibited(email, dmarc_domain, org=False): """DMARC migitation workhorse method - adapted from Gnu Mailman""" try: resolver = dns.resolver.Resolver() resolver.timeout = float(seconds(3)) resolver.lifetime = float(seconds(5)) txt_recs = resolver.query(dmarc_domain, dns.rdatatype.TXT) except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): return 'continue' except (dns.resolver.NoNameservers): S.syslog('DNSException: No Nameservers available for %s (%s)' % email, dmarc_domain) # Typically this means a dnssec validation error. Clients that don't # perform validation *may* successfully see a _dmarc RR whereas a # validating mailman server wont see the _dmarc RR. We should mitigate # this email to be safe. return True except DNSException, e: S.syslog('DNSException: Unable to query DMARC policy for %s (%s). %s' % email, dmarc_domain, e.__doc__) # While we can't be sure what caused the error, there is potentially # a DMARC policy record that we missed and that a receiver of the mail # might see. Thus, we should err on the side of caution and mitigate. return True else: # Be as robust as possible in parsing the result. results_by_name = {} cnames = {} want_names = set([dmarc_domain + '.']) for txt_rec in txt_recs.response.answer: if txt_rec.rdtype == dns.rdatatype.CNAME: cnames[txt_rec.name.to_text()] = ( txt_rec.items[0].target.to_text()) if txt_rec.rdtype != dns.rdatatype.TXT: continue results_by_name.setdefault(txt_rec.name.to_text(), []).append( "".join(txt_rec.items[0].strings)) expands = list(want_names) seen = set(expands) while expands: item = expands.pop(0) if item in cnames: if cnames[item] in seen: continue # cname loop expands.append(cnames[item]) seen.add(cnames[item]) want_names.add(cnames[item]) want_names.discard(item) if len(want_names) != 1: if verbose: S.syslog("""multiple DMARC entries in results for %s, processing each to be strict""" % dmarc_domain) for name in want_names: if name not in results_by_name: continue dmarcs = filter(lambda n: n.startswith('v=DMARC1;'), results_by_name[name]) if len(dmarcs) == 0: return 'continue' if len(dmarcs) > 1: if verbose: S.syslog("""RRset of TXT records for %s has %d v=DMARC1 entries; testing them all""" % (dmarc_domain, len(dmarcs))) for entry in dmarcs: mo = re.search(r'\bsp=(\w*)\b', entry, re.IGNORECASE) if org and mo: policy = mo.group(1).lower() else: mo = re.search(r'\bp=(\w*)\b', entry, re.IGNORECASE) if mo: policy = mo.group(1).lower() else: continue if policy == 'reject': if verbose: S.syslog('DMARC lookup for %s (%s) found p=reject in %s = %s' % (email, dmarc_domain, name, entry)) return True if munge_quarantine: if (policy == 'quarantine'): if verbose: S.syslog('DMARC lookup for %s (%s) found p=quarantine in %s = %s' % (email, dmarc_domain, name, entry)) return True return False def mod_headers(local_from): """Munge From header and possibly insert a Reply-To header""" global dshield_proc, mailsuser, newfrom newfrom = local_from if not 'msg_repto' in globals(): header_array.insert(msg_from[0]+1, "Reply-To: " + msg_from[1]) par = lambda x:x if parseaddr(msg_from[1])[0] else "" header_array[msg_from[0]] = "From: " + "\"%s%s%s%s via\" <%s>" % \ ((parseaddr(msg_from[1])[0]+" ").lstrip(), par("("), re.sub("@", " at ", parseaddr(msg_from[1])[1]),par(")"), local_from) header_array.append("X-Original-From: " + msg_from[1]) dshield_proc = "Munged" mailsuser = "MAILSUSER=%s " % (local_address,) def verp(env, address): """Embed address into env to create a VERP address""" s = re.compile("(.*)@(.*)") p = s.match(env) q = s.match(address) return("%s-%s=%s@%s" % (p.group(1), q.group(1), q.group(2), p.group(2))) def see_headers(): """For debugging ... This method can be removed from the code if desired""" for ii in range(len(header_array)): print header_array[ii] ################################## # Main body of program starts here ################################## S.openlog("dmarc_shield", S.LOG_MAIL | S.LOG_INFO, S.LOG_MAIL) if verbose: logging = True try: local_address = sys.argv[1] except IndexError: S.syslog("dmarc_shield.py failed. Observe correct usage!") S.syslog("Usage: 'dmarc_shield.py local_address'") S.syslog("local_address MUST be a valid address on") S.syslog("redirecting system, e.g. 'postmaster@example.com'") sys.exit(0) dshield_proc = "Passed-through" header_array = [] # Read the mail headers into an array n = True while n: n = sys.stdin.readline() if n == "\n": break header_array.append(n.rstrip()) # Get some needed values from the header array, or exit if we've been # here before. for i in range(len(header_array)): # if header_array[i][:3] == "To:": # header_array[i] = "To: <%s>" % (os.getenv("RECIPIENT")) if header_array[i][:5] == "From:": msg_from = (i, header_array[i][6:]) if header_array[i][:9] == "Reply-To:": msg_repto = (i, header_array[i][10:]) if header_array[i][:15] == "X-DMARC-shield:": sys.exit(0) mailsuser="" # No From, no problem :) if not "msg_from" in globals() or not parseaddr(msg_from[1])[1]: sys.exit(0) #Parse the actual email address from the From header newfrom = parseaddr(msg_from[1])[1] # VERPify the new From address if requested to do so. if use_verp: local_address = verp(local_address, os.getenv("SENDER")) # Modify headers, if appropriate if munge_all == False: try: if IsDMARCProhibited(newfrom): mod_headers(local_address) except: sys.exit(0) else: mod_headers(local_address) # Open the pipe to the world outp = P.Popen([mailsuser + sendmail + " %s" % os.getenv("RECIPIENT")],stdin=P.PIPE, shell=True) for ii in header_array: outp.stdin.write(ii + "\n") outp.stdin.write("X-DMARC-shield: %s: v%s\n" % (dshield_proc, version)) outp.stdin.write("\n") while n: n = sys.stdin.readline() outp.stdin.write(n) # Log what we did if logging: oldfrom = parseaddr(msg_from[1])[1] S.syslog("action=%s: old_from=<%s>, from=<%s>, addr=<%s>" % (dshield_proc, oldfrom, newfrom, os.getenv("RECIPIENT"))) sys.exit(99)