summaryrefslogtreecommitdiff
path: root/email-print-mime-structure
diff options
context:
space:
mode:
Diffstat (limited to 'email-print-mime-structure')
-rwxr-xr-xemail-print-mime-structure189
1 files changed, 147 insertions, 42 deletions
diff --git a/email-print-mime-structure b/email-print-mime-structure
index 7adeb2b..4f165b1 100755
--- a/email-print-mime-structure
+++ b/email-print-mime-structure
@@ -1,4 +1,5 @@
#!/usr/bin/env python3
+# PYTHON_ARGCOMPLETE_OK
# -*- coding: utf-8 -*-
# Copyright (C) 2019 Daniel Kahn Gillmor
@@ -29,49 +30,153 @@ Example:
If you want to number the parts, i suggest piping the output through
something like "cat -n"
'''
-import email
+import os
import sys
+import email
+import logging
+import subprocess
-def print_part(z, prefix):
- fname = '' if z.get_filename() is None else ' [' + z.get_filename() + ']'
- cset = '' if z.get_charset() is None else ' (' + z.get_charset() + ')'
- disp = z.get_params(None, header='Content-Disposition')
- if (disp is None):
- disposition = ''
- else:
- disposition = ''
- for d in disp:
- if d[0] in [ 'attachment', 'inline' ]:
- disposition = ' ' + d[0]
- if z.is_multipart():
- nbytes = len(z.as_string())
- else:
- nbytes = len(z.get_payload())
-
- print('{}{}{}{}{} {:d} bytes'.format(
- prefix,
- z.get_content_type(),
- cset,
- disposition,
- fname,
- nbytes,
- ))
-
-def test(z, prefix=''):
- if (z.is_multipart()):
- print_part(z, prefix+'┬╴')
- if prefix.endswith('└'):
- prefix = prefix.rpartition('└')[0] + ' '
- if prefix.endswith('├'):
- prefix = prefix.rpartition('├')[0] + '│'
- parts = z.get_payload()
- i = 0
- while (i < parts.__len__()-1):
- test(parts[i], prefix + '├')
- i += 1
- test(parts[i], prefix + '└')
- # FIXME: show epilogue?
+from argparse import ArgumentParser, Namespace
+from typing import Optional, Union, List, Tuple, Any
+from email.charset import Charset
+from email.message import Message
+
+try:
+ import pgpy #type: ignore
+except ImportError:
+ pgpy = None
+
+try:
+ import argcomplete #type: ignore
+except ImportError:
+ argcomplete = None
+
+class MimePrinter(object):
+ def __init__(self, args:Namespace):
+ self.args = args
+
+ def print_part(self, z:Message, prefix:str, parent:Optional[Message], num:int) -> None:
+ ofname:Optional[str] = z.get_filename()
+ fname:str = '' if ofname is None else f' [{ofname}]'
+ ocharset:Union[Charset, str, None] = z.get_charset()
+ cset:str = '' if ocharset is None else f' ({ocharset})'
+ disp:Union[List[Tuple[str,str]], List[str], None] = z.get_params(None, header='Content-Disposition')
+ disposition:str = ''
+ if (disp is not None):
+ for d in disp:
+ if d[0] in [ 'attachment', 'inline' ]:
+ disposition = ' ' + d[0]
+ nbytes:int
+ if z.is_multipart():
+ # FIXME: it looks like we are counting chars here, not bytes:
+ nbytes = len(z.as_string())
+ else:
+ payload:Union[List[Message], str, bytes, None] = z.get_payload()
+ if not isinstance(payload, (str,bytes)):
+ raise TypeError(f'expected payload to be either str or bytes, got {type(payload)}')
+ # FIXME: it looks like we are counting chars here, not bytes:
+ nbytes = len(payload)
+
+ print(f'{prefix}{z.get_content_type()}{cset}{disposition}{fname} {nbytes:d} bytes')
+ try_decrypt:bool = self.args.pgpkey or self.args.use_gpg_agent
+
+ if try_decrypt and \
+ (parent is not None) and \
+ (parent.get_content_type().lower() == 'multipart/encrypted') and \
+ (str(parent.get_param('protocol')).lower() == 'application/pgp-encrypted') and \
+ (num == 2):
+ cryptopayload:Optional[Message] = None
+ ciphertext:Union[List[Message],str,bytes,None] = z.get_payload()
+ if not isinstance(ciphertext, str):
+ logging.warning('encrypted part was not a leaf mime part somehow')
+ return
+ if self.args.pgpkey:
+ cryptopayload = self.pgpy_decrypt(self.args.pgpkey, ciphertext)
+ if cryptopayload is None and self.args.use_gpg_agent:
+ cryptopayload = self.gpg_decrypt(ciphertext)
+ if cryptopayload is None:
+ logging.warning(f'Unable to decrypt')
+ return
+ newprefix = prefix[:-3] + ' '
+ print(f'{newprefix}↧ (decrypts to)')
+ self.print_tree(cryptopayload, newprefix + '└', z, 0)
+
+ def pgpy_decrypt(self, keys:List[str], ciphertext:str) -> Optional[Message]:
+ if pgpy is None:
+ logging.warning(f'Python module pgpy is not available, not decrypting (try "apt install python3-pgpy")')
+ return None
+ keyname:str
+ ret:Optional[Message] = None
+ for keyname in keys:
+ try:
+ key:pgpy.PGPKey
+ key, _ = pgpy.PGPKey.from_file(keyname)
+ msg:pgpy.PGPMessage = pgpy.PGPMessage.from_blob(ciphertext)
+ msg = key.decrypt(msg)
+ return email.message_from_bytes(msg.message)
+ except:
+ pass
+ return None
+
+ def gpg_decrypt(self, ciphertext:str) -> Optional[Message]:
+ inp:int
+ outp:int
+ inp, outp = os.pipe()
+ with open(outp, 'w') as outf:
+ outf.write(ciphertext)
+ out:subprocess.CompletedProcess[bytes] = subprocess.run(['gpg', '--batch', '--decrypt'],
+ stdin=inp,
+ capture_output=True)
+ if out.returncode == 0:
+ return email.message_from_bytes(out.stdout)
+ return None
+
+ def print_tree(self, z:Message, prefix:str, parent:Optional[Message], num:int) -> None:
+ if (z.is_multipart()):
+ self.print_part(z, prefix+'┬╴', parent, num)
+ if prefix.endswith('└'):
+ prefix = prefix.rpartition('└')[0] + ' '
+ if prefix.endswith('├'):
+ prefix = prefix.rpartition('├')[0] + '│'
+ parts:Union[List[Message], str, bytes, None] = z.get_payload()
+ if not isinstance(parts, list):
+ raise TypeError(f'parts was {type(parts)}, expected List[Message]')
+ i = 0
+ while (i < len(parts)-1):
+ self.print_tree(parts[i], prefix + '├', z, i+1)
+ i += 1
+ self.print_tree(parts[i], prefix + '└', z, i+1)
+ # FIXME: show epilogue?
+ else:
+ self.print_part(z, prefix+'─╴', parent, num)
+
+def main() -> None:
+ parser:ArgumentParser = ArgumentParser(description='Read RFC2822 MIME message from stdin and emit a tree diagram to stdout.',
+ epilog="Example: email-print-mime-structure <message.eml")
+ parser.add_argument('--pgpkey', metavar='KEYFILE', action='append',
+ help='OpenPGP Transferable Secret Key for decrypting')
+ parser.add_argument('--use-gpg-agent', action='store_true',
+ help='Ask local GnuPG installation for decryption')
+ parser.add_argument('--no-use-gpg-agent', action='store_false',
+ help='Don\'t ask local GnuPG installation for decryption')
+ parser.set_defaults(use_gpg_agent=False)
+
+ if argcomplete:
+ argcomplete.autocomplete(parser)
+ elif '_ARGCOMPLETE' in os.environ:
+ logging.error('Argument completion requested but the "argcomplete" '
+ 'module is not installed. '
+ 'Maybe you want to "apt install python3-argcomplete"')
+ sys.exit(1)
+
+ args:Namespace = parser.parse_args()
+ msg:Union[Message, str, int, Any] = email.message_from_file(sys.stdin)
+
+ if isinstance(msg, Message):
+ printer:MimePrinter = MimePrinter(args)
+ printer.print_tree(msg, '└', None, 0)
else:
- print_part(z, prefix+'─╴')
+ logging.error('Input was not an e-mail message')
-test(email.message_from_file(sys.stdin), '└')
+if __name__ == '__main__':
+ main()