#!/usr/bin/env python3 # PYTHON_ARGCOMPLETE_OK # -*- coding: utf-8 -*- # Copyright (C) 2019 Daniel Kahn Gillmor # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . ''' This script reads a MIME message from stdin and produces a treelike representation on it stdout. Example: 0 dkg@alice:~$ printmimestructure < 'Maildir/cur/1269025522.M338697P12023.monkey,S=6459,W=6963:2,Sa' └┬╴multipart/signed 6546 bytes ├─╴text/plain inline 895 bytes └─╴application/pgp-signature inline [signature.asc] 836 bytes 0 dkg@alice:~$ If you want to number the parts, i suggest piping the output through something like "cat -n" ''' import os import sys import email import logging import subprocess from argparse import ArgumentParser, Namespace from typing import Optional, Union, List, Tuple, Any from email.charset import Charset from email.message import Message try: import pgpy #type: ignore except ImportError: pgpy = None try: import argcomplete #type: ignore except ImportError: argcomplete = None class MimePrinter(object): def __init__(self, args:Namespace): self.args = args def print_part(self, z:Message, prefix:str, parent:Optional[Message], num:int) -> None: ofname:Optional[str] = z.get_filename() fname:str = '' if ofname is None else f' [{ofname}]' ocharset:Union[Charset, str, None] = z.get_charset() cset:str = '' if ocharset is None else f' ({ocharset})' disp:Union[List[Tuple[str,str]], List[str], None] = z.get_params(None, header='Content-Disposition') disposition:str = '' if (disp is not None): for d in disp: if d[0] in [ 'attachment', 'inline' ]: disposition = ' ' + d[0] nbytes:int if z.is_multipart(): # FIXME: it looks like we are counting chars here, not bytes: nbytes = len(z.as_string()) else: payload:Union[List[Message], str, bytes, None] = z.get_payload() if not isinstance(payload, (str,bytes)): raise TypeError(f'expected payload to be either str or bytes, got {type(payload)}') # FIXME: it looks like we are counting chars here, not bytes: nbytes = len(payload) print(f'{prefix}{z.get_content_type()}{cset}{disposition}{fname} {nbytes:d} bytes') cryptopayload:Optional[Message] = None ciphertext:Union[List[Message],str,bytes,None] = None try_pgp_decrypt:bool = self.args.pgpkey or self.args.use_gpg_agent if try_pgp_decrypt and \ (parent is not None) and \ (parent.get_content_type().lower() == 'multipart/encrypted') and \ (str(parent.get_param('protocol')).lower() == 'application/pgp-encrypted') and \ (num == 2): ciphertext = z.get_payload() if not isinstance(ciphertext, str): logging.warning('encrypted part was not a leaf mime part somehow') return if self.args.pgpkey: cryptopayload = self.pgpy_decrypt(self.args.pgpkey, ciphertext) if cryptopayload is None and self.args.use_gpg_agent: cryptopayload = self.gpg_decrypt(ciphertext) if cryptopayload is None: logging.warning(f'Unable to decrypt') return if cryptopayload is not None: newprefix = prefix[:-3] + ' ' print(f'{newprefix}↧ (decrypts to)') self.print_tree(cryptopayload, newprefix + '└', z, 0) def pgpy_decrypt(self, keys:List[str], ciphertext:str) -> Optional[Message]: if pgpy is None: logging.warning(f'Python module pgpy is not available, not decrypting (try "apt install python3-pgpy")') return None keyname:str ret:Optional[Message] = None for keyname in keys: try: key:pgpy.PGPKey key, _ = pgpy.PGPKey.from_file(keyname) msg:pgpy.PGPMessage = pgpy.PGPMessage.from_blob(ciphertext) msg = key.decrypt(msg) return email.message_from_bytes(msg.message) except: pass return None def gpg_decrypt(self, ciphertext:str) -> Optional[Message]: inp:int outp:int inp, outp = os.pipe() with open(outp, 'w') as outf: outf.write(ciphertext) out:subprocess.CompletedProcess[bytes] = subprocess.run(['gpg', '--batch', '--decrypt'], stdin=inp, capture_output=True) if out.returncode == 0: return email.message_from_bytes(out.stdout) return None def print_tree(self, z:Message, prefix:str, parent:Optional[Message], num:int) -> None: if (z.is_multipart()): self.print_part(z, prefix+'┬╴', parent, num) if prefix.endswith('└'): prefix = prefix.rpartition('└')[0] + ' ' if prefix.endswith('├'): prefix = prefix.rpartition('├')[0] + '│' parts:Union[List[Message], str, bytes, None] = z.get_payload() if not isinstance(parts, list): raise TypeError(f'parts was {type(parts)}, expected List[Message]') i = 0 while (i < len(parts)-1): self.print_tree(parts[i], prefix + '├', z, i+1) i += 1 self.print_tree(parts[i], prefix + '└', z, i+1) # FIXME: show epilogue? else: self.print_part(z, prefix+'─╴', parent, num) def main() -> None: parser:ArgumentParser = ArgumentParser(description='Read RFC2822 MIME message from stdin and emit a tree diagram to stdout.', epilog="Example: email-print-mime-structure