removed /etc and /opt

This commit is contained in:
cutemeli
2025-12-22 10:48:14 +00:00
parent 5ce7ca2c5d
commit 10d1afbb17
32559 changed files with 0 additions and 6756692 deletions

View File

@@ -1,13 +0,0 @@
# Copyright (c) 2014, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
# version_info provides the version number in programmer friendly way.
# The 4th part will be either alpha, beta or final.
from .imapclient import * # noqa: F401,F403
from .response_parser import * # noqa: F401,F403
from .tls import * # noqa: F401,F403
from .version import author as __author__ # noqa: F401
from .version import version as __version__ # noqa: F401
from .version import version_info # noqa: F401

View File

@@ -1,219 +0,0 @@
# Copyright (c) 2015, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
import argparse
import configparser
import json
import os
import ssl
import urllib.parse
import urllib.request
from typing import Any, Callable, Dict, Optional, Tuple, TYPE_CHECKING, TypeVar
import imapclient
def getenv(name: str, default: Optional[str]) -> Optional[str]:
return os.environ.get("imapclient_" + name, default)
def get_config_defaults() -> Dict[str, Any]:
return {
"username": getenv("username", None),
"password": getenv("password", None),
"ssl": True,
"ssl_check_hostname": True,
"ssl_verify_cert": True,
"ssl_ca_file": None,
"timeout": None,
"starttls": False,
"stream": False,
"oauth2": False,
"oauth2_client_id": getenv("oauth2_client_id", None),
"oauth2_client_secret": getenv("oauth2_client_secret", None),
"oauth2_refresh_token": getenv("oauth2_refresh_token", None),
"expect_failure": None,
}
def parse_config_file(filename: str) -> argparse.Namespace:
"""Parse INI files containing IMAP connection details.
Used by livetest.py and interact.py
"""
parser = configparser.ConfigParser(get_string_config_defaults())
parser.read(filename)
conf = _read_config_section(parser, "DEFAULT")
if conf.expect_failure:
raise ValueError("expect_failure should not be set for the DEFAULT section")
conf.alternates = {}
for section in parser.sections():
# pylint: disable=no-member
conf.alternates[section] = _read_config_section(parser, section)
return conf
def get_string_config_defaults() -> Dict[str, str]:
out = {}
for k, v in get_config_defaults().items():
if v is True:
v = "true"
elif v is False:
v = "false"
elif not v:
v = ""
out[k] = v
return out
T = TypeVar("T")
def _read_config_section(
parser: configparser.ConfigParser, section: str
) -> argparse.Namespace:
def get(name: str) -> str:
return parser.get(section, name)
def getboolean(name: str) -> bool:
return parser.getboolean(section, name)
def get_allowing_none(name: str, typefunc: Callable[[str], T]) -> Optional[T]:
try:
v = parser.get(section, name)
except configparser.NoOptionError:
return None
if not v:
return None
return typefunc(v)
def getint(name: str) -> Optional[int]:
return get_allowing_none(name, int)
def getfloat(name: str) -> Optional[float]:
return get_allowing_none(name, float)
ssl_ca_file = get("ssl_ca_file")
if ssl_ca_file:
ssl_ca_file = os.path.expanduser(ssl_ca_file)
return argparse.Namespace(
host=get("host"),
port=getint("port"),
ssl=getboolean("ssl"),
starttls=getboolean("starttls"),
ssl_check_hostname=getboolean("ssl_check_hostname"),
ssl_verify_cert=getboolean("ssl_verify_cert"),
ssl_ca_file=ssl_ca_file,
timeout=getfloat("timeout"),
stream=getboolean("stream"),
username=get("username"),
password=get("password"),
oauth2=getboolean("oauth2"),
oauth2_client_id=get("oauth2_client_id"),
oauth2_client_secret=get("oauth2_client_secret"),
oauth2_refresh_token=get("oauth2_refresh_token"),
expect_failure=get("expect_failure"),
)
OAUTH2_REFRESH_URLS = {
"imap.gmail.com": "https://accounts.google.com/o/oauth2/token",
"imap.mail.yahoo.com": "https://api.login.yahoo.com/oauth2/get_token",
}
def refresh_oauth2_token(
hostname: str, client_id: str, client_secret: str, refresh_token: str
) -> str:
url = OAUTH2_REFRESH_URLS.get(hostname)
if not url:
raise ValueError("don't know where to refresh OAUTH2 token for %r" % hostname)
post = {
"client_id": client_id.encode("ascii"),
"client_secret": client_secret.encode("ascii"),
"refresh_token": refresh_token.encode("ascii"),
"grant_type": b"refresh_token",
}
with urllib.request.urlopen(
url, urllib.parse.urlencode(post).encode("ascii")
) as request:
response = request.read()
result = json.loads(response.decode("ascii"))["access_token"]
if TYPE_CHECKING:
assert isinstance(result, str)
return result
# Tokens are expensive to refresh so use the same one for the duration of the process.
_oauth2_cache: Dict[Tuple[str, str, str, str], str] = {}
def get_oauth2_token(
hostname: str, client_id: str, client_secret: str, refresh_token: str
) -> str:
cache_key = (hostname, client_id, client_secret, refresh_token)
token = _oauth2_cache.get(cache_key)
if token:
return token
token = refresh_oauth2_token(hostname, client_id, client_secret, refresh_token)
_oauth2_cache[cache_key] = token
return token
def create_client_from_config(
conf: argparse.Namespace, login: bool = True
) -> imapclient.IMAPClient:
assert conf.host, "missing host"
ssl_context = None
if conf.ssl:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = conf.ssl_check_hostname
if not conf.ssl_verify_cert:
ssl_context.verify_mode = ssl.CERT_NONE
if conf.ssl_ca_file:
ssl_context.load_verify_locations(cafile=conf.ssl_ca_file)
client = imapclient.IMAPClient(
conf.host,
port=conf.port,
ssl=conf.ssl,
ssl_context=ssl_context,
stream=conf.stream,
timeout=conf.timeout,
)
if not login:
return client
try:
if conf.starttls:
client.starttls()
if conf.oauth2:
assert conf.oauth2_client_id, "missing oauth2 id"
assert conf.oauth2_client_secret, "missing oauth2 secret"
assert conf.oauth2_refresh_token, "missing oauth2 refresh token"
access_token = get_oauth2_token(
conf.host,
conf.oauth2_client_id,
conf.oauth2_client_secret,
conf.oauth2_refresh_token,
)
client.oauth2_login(conf.username, access_token)
elif not conf.stream:
assert conf.username, "missing username"
assert conf.password, "missing password"
client.login(conf.username, conf.password)
return client
except: # noqa: E722
client.shutdown()
raise

View File

@@ -1,70 +0,0 @@
# Copyright (c) 2014, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
import re
from datetime import datetime
from email.utils import parsedate_tz
from .fixed_offset import FixedOffset
_SHORT_MONTHS = " Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec".split(" ")
def parse_to_datetime(timestamp: bytes, normalise: bool = True) -> datetime:
"""Convert an IMAP datetime string to a datetime.
If normalise is True (the default), then the returned datetime
will be timezone-naive but adjusted to the local time.
If normalise is False, then the returned datetime will be
unadjusted but will contain timezone information as per the input.
"""
time_tuple = parsedate_tz(_munge(timestamp))
if time_tuple is None:
raise ValueError("couldn't parse datetime %r" % timestamp)
tz_offset_seconds = time_tuple[-1]
tz = None
if tz_offset_seconds is not None:
tz = FixedOffset(tz_offset_seconds / 60)
dt = datetime(*time_tuple[:6], tzinfo=tz)
if normalise and tz:
dt = datetime_to_native(dt)
return dt
def datetime_to_native(dt: datetime) -> datetime:
return dt.astimezone(FixedOffset.for_system()).replace(tzinfo=None)
def datetime_to_INTERNALDATE(dt: datetime) -> str:
"""Convert a datetime instance to a IMAP INTERNALDATE string.
If timezone information is missing the current system
timezone is used.
"""
if not dt.tzinfo:
dt = dt.replace(tzinfo=FixedOffset.for_system())
fmt = "%d-" + _SHORT_MONTHS[dt.month] + "-%Y %H:%M:%S %z"
return dt.strftime(fmt)
# Matches timestamp strings where the time separator is a dot (see
# issue #154). For example: 'Sat, 8 May 2010 16.03.09 +0200'
_rfc822_dotted_time = re.compile(r"\w+, ?\d{1,2} \w+ \d\d(\d\d)? \d\d?\.\d\d?\.\d\d?.*")
def _munge(timestamp: bytes) -> str:
s = timestamp.decode("latin-1") # parsedate_tz only works with strings
if _rfc822_dotted_time.match(s):
return s.replace(".", ":")
return s
def format_criteria_date(dt: datetime) -> bytes:
"""Format a date or datetime instance for use in IMAP search criteria."""
out = "%02d-%s-%d" % (dt.day, _SHORT_MONTHS[dt.month], dt.year)
return out.encode("ascii")

View File

@@ -1,29 +0,0 @@
# Copyright (c) 2014, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
from __future__ import unicode_literals
from imapclient import IMAPClient
HOST = 'imap.host.com'
USERNAME = 'someuser'
PASSWORD = 'secret'
ssl = False
server = IMAPClient(HOST, use_uid=True, ssl=ssl)
server.login(USERNAME, PASSWORD)
select_info = server.select_folder('INBOX')
print('%d messages in INBOX' % select_info[b'EXISTS'])
messages = server.search(['NOT', 'DELETED'])
print("%d messages that aren't deleted" % len(messages))
print()
print("Messages:")
response = server.fetch(messages, ['FLAGS', 'RFC822.SIZE'])
for msgid, data in response.items():
print(' ID %d: %d bytes, flags=%s' % (msgid,
data[b'RFC822.SIZE'],
data[b'FLAGS']))

View File

@@ -1,29 +0,0 @@
# This example is a lot more interesting if you have an active client
# connected to the same IMAP account!
from __future__ import unicode_literals
from imapclient import IMAPClient
HOST = 'imap.host.com'
USERNAME = 'someuser'
PASSWORD = 'password'
ssl = True
server = IMAPClient(HOST, use_uid=True, ssl=ssl)
server.login(USERNAME, PASSWORD)
server.select_folder('INBOX')
# Start IDLE mode
server.idle()
# Wait for up to 30 seconds for an IDLE response
responses = server.idle_check(timeout=30)
print(responses)
# Come out of IDLE mode
text, responses = server.idle_done()
print('IDLE done. Server said %r' % text)
print('Final responses: ', responses)
print(server.logout())

View File

@@ -1,21 +0,0 @@
from __future__ import unicode_literals
from imapclient import IMAPClient
# Populate these with actual values
OAUTH2_USER = '...'
OAUTH2_ACCESS_TOKEN = '...'
HOST = 'imap.host.com'
URL = "https://somedomain.com/someuser/imap/"
ssl = True
server = IMAPClient(HOST, use_uid=True, ssl=ssl)
resp = server.oauth2_login(URL, OAUTH2_USER, OAUTH2_ACCESS_TOKEN)
print(resp)
select_info = server.select_folder('INBOX')
print(select_info)
server.logout()

View File

@@ -1,16 +0,0 @@
# Copyright (c) 2015, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
from __future__ import unicode_literals
import imapclient
HOST = 'imap.host.com'
USERNAME = 'someuser'
PASSWORD = 'secret'
context = imapclient.create_default_context(cafile="/path/to/cacert.pem")
server = imapclient.IMAPClient(HOST, use_uid=True, ssl=True, ssl_context=context)
server.login(USERNAME, PASSWORD)
# ...

View File

@@ -1,23 +0,0 @@
# Copyright (c) 2015, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
from __future__ import unicode_literals
import imapclient
from backports import ssl
HOST = 'imap.host.com'
USERNAME = 'someuser'
PASSWORD = 'secret'
context = imapclient.create_default_context()
# don't check if certificate hostname doesn't match target hostname
context.check_hostname = False
# don't check if the certificate is trusted by a certificate authority
context.verify_mode = ssl.CERT_NONE
server = imapclient.IMAPClient(HOST, use_uid=True, ssl=True, ssl_context=context)
server.login(USERNAME, PASSWORD)
# ...

View File

@@ -1,43 +0,0 @@
import imaplib
# Base class allowing to catch any IMAPClient related exceptions
# To ensure backward compatibility, we "rename" the imaplib general
# exception class, so we can catch its exceptions without having to
# deal with it in IMAPClient codebase
IMAPClientError = imaplib.IMAP4.error
IMAPClientAbortError = imaplib.IMAP4.abort
IMAPClientReadOnlyError = imaplib.IMAP4.readonly
class CapabilityError(IMAPClientError):
"""
The command tried by the user needs a capability not installed
on the IMAP server
"""
class LoginError(IMAPClientError):
"""
A connection has been established with the server but an error
occurred during the authentication.
"""
class IllegalStateError(IMAPClientError):
"""
The command tried needs a different state to be executed. This
means the user is not logged in or the command needs a folder to
be selected.
"""
class InvalidCriteriaError(IMAPClientError):
"""
A command using a search criteria failed, probably due to a syntax
error in the criteria string.
"""
class ProtocolError(IMAPClientError):
"""The server replied with a response that violates the IMAP protocol."""

View File

@@ -1,45 +0,0 @@
# Copyright (c) 2014, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
import datetime
import time
from typing import Optional
ZERO = datetime.timedelta(0)
class FixedOffset(datetime.tzinfo):
"""
This class describes fixed timezone offsets in hours and minutes
east from UTC
"""
def __init__(self, minutes: float) -> None:
self.__offset = datetime.timedelta(minutes=minutes)
sign = "+"
if minutes < 0:
sign = "-"
hours, remaining_mins = divmod(abs(minutes), 60)
self.__name = "%s%02d%02d" % (sign, hours, remaining_mins)
def utcoffset(self, _: Optional[datetime.datetime]) -> datetime.timedelta:
return self.__offset
def tzname(self, _: Optional[datetime.datetime]) -> str:
return self.__name
def dst(self, _: Optional[datetime.datetime]) -> datetime.timedelta:
return ZERO
@classmethod
def for_system(cls) -> "FixedOffset":
"""Return a FixedOffset instance for the current working timezone and
DST conditions.
"""
if time.localtime().tm_isdst and time.daylight:
offset = time.altzone
else:
offset = time.timezone
return cls(-offset // 60)

View File

@@ -1,27 +0,0 @@
# Copyright (c) 2015, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
import imaplib
import socket
from typing import Optional
class IMAP4WithTimeout(imaplib.IMAP4):
def __init__(self, address: str, port: int, timeout: Optional[float]) -> None:
self._timeout = timeout
imaplib.IMAP4.__init__(self, address, port)
def open(
self, host: str = "", port: int = 143, timeout: Optional[float] = None
) -> None:
# This is overridden to make it consistent across Python versions.
self.host = host
self.port = port
self.sock = self._create_socket(timeout)
self.file = self.sock.makefile("rb")
def _create_socket(self, timeout: Optional[float] = None) -> socket.socket:
return socket.create_connection(
(self.host, self.port), timeout if timeout is not None else self._timeout
)

View File

@@ -1,108 +0,0 @@
# This file contains two main methods used to encode and decode UTF-7
# string, described in the RFC 3501. There are some variations specific
# to IMAP4rev1, so the built-in Python UTF-7 codec can't be used instead.
#
# The main difference is the shift character (used to switch from ASCII to
# base64 encoding context), which is & in this modified UTF-7 convention,
# since + is considered as mainly used in mailbox names.
# Other variations and examples can be found in the RFC 3501, section 5.1.3.
import binascii
from typing import List, Union
def encode(s: Union[str, bytes]) -> bytes:
"""Encode a folder name using IMAP modified UTF-7 encoding.
Input is unicode; output is bytes (Python 3) or str (Python 2). If
non-unicode input is provided, the input is returned unchanged.
"""
if not isinstance(s, str):
return s
res = bytearray()
b64_buffer: List[str] = []
def consume_b64_buffer(buf: List[str]) -> None:
"""
Consume the buffer by encoding it into a modified base 64 representation
and surround it with shift characters & and -
"""
if buf:
res.extend(b"&" + base64_utf7_encode(buf) + b"-")
del buf[:]
for c in s:
# printable ascii case should not be modified
o = ord(c)
if 0x20 <= o <= 0x7E:
consume_b64_buffer(b64_buffer)
# Special case: & is used as shift character so we need to escape it in ASCII
if o == 0x26: # & = 0x26
res.extend(b"&-")
else:
res.append(o)
# Bufferize characters that will be encoded in base64 and append them later
# in the result, when iterating over ASCII character or the end of string
else:
b64_buffer.append(c)
# Consume the remaining buffer if the string finish with non-ASCII characters
consume_b64_buffer(b64_buffer)
return bytes(res)
AMPERSAND_ORD = ord("&")
DASH_ORD = ord("-")
def decode(s: Union[bytes, str]) -> str:
"""Decode a folder name from IMAP modified UTF-7 encoding to unicode.
Input is bytes (Python 3) or str (Python 2); output is always
unicode. If non-bytes/str input is provided, the input is returned
unchanged.
"""
if not isinstance(s, bytes):
return s
res = []
# Store base64 substring that will be decoded once stepping on end shift character
b64_buffer = bytearray()
for c in s:
# Shift character without anything in buffer -> starts storing base64 substring
if c == AMPERSAND_ORD and not b64_buffer:
b64_buffer.append(c)
# End shift char. -> append the decoded buffer to the result and reset it
elif c == DASH_ORD and b64_buffer:
# Special case &-, representing "&" escaped
if len(b64_buffer) == 1:
res.append("&")
else:
res.append(base64_utf7_decode(b64_buffer[1:]))
b64_buffer = bytearray()
# Still buffering between the shift character and the shift back to ASCII
elif b64_buffer:
b64_buffer.append(c)
# No buffer initialized yet, should be an ASCII printable char
else:
res.append(chr(c))
# Decode the remaining buffer if any
if b64_buffer:
res.append(base64_utf7_decode(b64_buffer[1:]))
return "".join(res)
def base64_utf7_encode(buffer: List[str]) -> bytes:
s = "".join(buffer).encode("utf-16be")
return binascii.b2a_base64(s).rstrip(b"\n=").replace(b"/", b",")
def base64_utf7_decode(s: bytearray) -> str:
s_utf7 = b"+" + s.replace(b",", b"/") + b"-"
return s_utf7.decode("utf-7")

View File

@@ -1,54 +0,0 @@
# Copyright (c) 2014, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
"""
Work-around for Python Issue 5943 (http://bugs.python.org/issue5949).
This will patch imaplib's IMAP4_SSL.readline method with the fixed
verion in Python versions that are known to have the problem.
The problem definitely exists in Python 2.5 and 2.6 up until but not
including 2.6.5. It was also fixed in Python 2.7 alpha 2 so no attempt
is made to patch 2.7 versions.
Please let me know if there's more Python versions that should be
patched.
Efforts are made to only perform the patch once.
"""
from __future__ import unicode_literals
import sys
import imaplib
def _is_affected_version(sys_version):
sys_version = sys_version[:3]
if sys_version < (2, 5, 0):
# Not sure whether these old versions are affected so being
# conservative and not patching.
return False
elif sys_version < (2, 6, 5):
return True
return False
def _fixed_readline(self):
"""Read line from remote."""
line = []
while True:
char = self.sslobj.read(1)
line.append(char)
if char in ("\n", ""):
return ''.join(line)
_fixed_readline.patched = True # Marker to indicate patched version
ssl_class = imaplib.IMAP4_SSL
def apply_patch():
if _is_affected_version(sys.version_info) and not hasattr(ssl_class.readline, 'patched'):
ssl_class.readline = _fixed_readline

View File

@@ -1,155 +0,0 @@
#!/usr/bin/python
# Copyright (c) 2020, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
import argparse
from getpass import getpass
from . import imapclient
from .config import create_client_from_config, get_config_defaults, parse_config_file
def command_line() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"-H", "--host", dest="host", action="store", help="IMAP host connect to"
)
parser.add_argument(
"-u",
"--username",
dest="username",
action="store",
help="Username to login with",
)
parser.add_argument(
"-p",
"--password",
dest="password",
action="store",
help="Password to login with",
)
parser.add_argument(
"-P",
"--port",
dest="port",
action="store",
type=int,
default=None,
help="IMAP port to use (default is 993 for TLS, or 143 otherwise)",
)
ssl_group = parser.add_mutually_exclusive_group()
ssl_group.add_argument(
"-s",
"--ssl",
dest="ssl",
action="store_true",
default=None,
help="Use SSL/TLS connection (default)",
)
ssl_group.add_argument(
"--insecure",
dest="insecure",
action="store_true",
default=False,
help="Use insecure connection (i.e. without SSL/TLS)",
)
parser.add_argument(
"-f",
"--file",
dest="file",
action="store",
default=None,
help="Config file (same as livetest)",
)
args = parser.parse_args()
if args.file:
if (
args.host
or args.username
or args.password
or args.port
or args.ssl
or args.insecure
):
parser.error("If -f/--file is given no other options can be used")
# Use the options in the config file
args = parse_config_file(args.file)
return args
args.ssl = not args.insecure
# Scan through arguments, filling in defaults and prompting when
# a compulsory argument wasn't provided.
compulsory_args = ("host", "username", "password")
for name, default_value in get_config_defaults().items():
value = getattr(args, name, default_value)
if name in compulsory_args and value is None:
value = getpass(name + ": ")
setattr(args, name, value)
return args
def main() -> int:
args = command_line()
print("Connecting...")
client = create_client_from_config(args)
print("Connected.")
banner = '\nIMAPClient instance is "c"'
def ptpython(c: imapclient.IMAPClient) -> None:
from ptpython.repl import embed # type: ignore[import-not-found]
embed(globals(), locals())
def ipython_400(c: imapclient.IMAPClient) -> None:
from IPython.terminal.embed import ( # type: ignore[import-not-found]
InteractiveShellEmbed,
)
ipshell = InteractiveShellEmbed(banner1=banner)
ipshell("")
def ipython_011(c: imapclient.IMAPClient) -> None:
from IPython.frontend.terminal.embed import ( # type: ignore[import-not-found]
InteractiveShellEmbed,
)
ipshell = InteractiveShellEmbed(banner1=banner)
ipshell("")
def ipython_010(c: imapclient.IMAPClient) -> None:
from IPython.Shell import IPShellEmbed # type: ignore[import-not-found]
IPShellEmbed("", banner=banner)()
def builtin(c: imapclient.IMAPClient) -> None:
import code
code.interact(banner, local={"c": c})
shell_attempts = (
ptpython,
ipython_400,
ipython_011,
ipython_010,
builtin,
)
for shell in shell_attempts:
try:
shell(client)
except ImportError:
pass
else:
break
return 0
if __name__ == "__main__":
main()

View File

@@ -1,174 +0,0 @@
# Copyright (c) 2014, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
"""
A lexical analyzer class for IMAP responses.
Although Lexer does all the work, TokenSource is the class to use for
external callers.
"""
from typing import Iterator, List, Optional, Tuple, TYPE_CHECKING, Union
from .util import assert_imap_protocol
__all__ = ["TokenSource"]
CTRL_CHARS = frozenset(c for c in range(32))
ALL_CHARS = frozenset(c for c in range(256))
SPECIALS = frozenset(c for c in b' ()%"[')
NON_SPECIALS = ALL_CHARS - SPECIALS - CTRL_CHARS
WHITESPACE = frozenset(c for c in b" \t\r\n")
BACKSLASH = ord("\\")
OPEN_SQUARE = ord("[")
CLOSE_SQUARE = ord("]")
DOUBLE_QUOTE = ord('"')
class TokenSource:
"""
A simple iterator for the Lexer class that also provides access to
the current IMAP literal.
"""
def __init__(self, text: List[bytes]):
self.lex = Lexer(text)
self.src = iter(self.lex)
@property
def current_literal(self) -> Optional[bytes]:
if TYPE_CHECKING:
assert self.lex.current_source is not None
return self.lex.current_source.literal
def __iter__(self) -> Iterator[bytes]:
return self.src
class Lexer:
"""
A lexical analyzer class for IMAP
"""
def __init__(self, text: List[bytes]):
self.sources = (LiteralHandlingIter(chunk) for chunk in text)
self.current_source: Optional[LiteralHandlingIter] = None
def read_until(
self, stream_i: "PushableIterator", end_char: int, escape: bool = True
) -> bytearray:
token = bytearray()
try:
for nextchar in stream_i:
if escape and nextchar == BACKSLASH:
escaper = nextchar
nextchar = next(stream_i)
if nextchar not in (escaper, end_char):
token.append(escaper) # Don't touch invalid escaping
elif nextchar == end_char:
break
token.append(nextchar)
else:
raise ValueError("No closing '%s'" % chr(end_char))
except StopIteration:
raise ValueError("No closing '%s'" % chr(end_char))
token.append(end_char)
return token
def read_token_stream(self, stream_i: "PushableIterator") -> Iterator[bytearray]:
whitespace = WHITESPACE
wordchars = NON_SPECIALS
read_until = self.read_until
while True:
# Whitespace
for nextchar in stream_i:
if nextchar not in whitespace:
stream_i.push(nextchar)
break # done skipping over the whitespace
# Non-whitespace
token = bytearray()
for nextchar in stream_i:
if nextchar in wordchars:
token.append(nextchar)
elif nextchar == OPEN_SQUARE:
token.append(nextchar)
token.extend(read_until(stream_i, CLOSE_SQUARE, escape=False))
else:
if nextchar in whitespace:
yield token
elif nextchar == DOUBLE_QUOTE:
assert_imap_protocol(not token)
token.append(nextchar)
token.extend(read_until(stream_i, nextchar))
yield token
else:
# Other punctuation, eg. "(". This ends the current token.
if token:
yield token
yield bytearray([nextchar])
break
else:
if token:
yield token
break
def __iter__(self) -> Iterator[bytes]:
for source in self.sources:
self.current_source = source
for tok in self.read_token_stream(iter(source)):
yield bytes(tok)
# imaplib has poor handling of 'literals' - it both fails to remove the
# {size} marker, and fails to keep responses grouped into the same logical
# 'line'. What we end up with is a list of response 'records', where each
# record is either a simple string, or tuple of (str_with_lit, literal) -
# where str_with_lit is a string with the {xxx} marker at its end. Note
# that each element of this list does *not* correspond 1:1 with the
# untagged responses.
# (http://bugs.python.org/issue5045 also has comments about this)
# So: we have a special object for each of these records. When a
# string literal is processed, we peek into this object to grab the
# literal.
class LiteralHandlingIter:
def __init__(self, resp_record: Union[Tuple[bytes, bytes], bytes]):
self.literal: Optional[bytes]
if isinstance(resp_record, tuple):
# A 'record' with a string which includes a literal marker, and
# the literal itself.
self.src_text = resp_record[0]
assert_imap_protocol(self.src_text.endswith(b"}"), self.src_text)
self.literal = resp_record[1]
else:
# just a line with no literals.
self.src_text = resp_record
self.literal = None
def __iter__(self) -> "PushableIterator":
return PushableIterator(self.src_text)
class PushableIterator:
NO_MORE = object()
def __init__(self, it: bytes):
self.it = iter(it)
self.pushed: List[int] = []
def __iter__(self) -> "PushableIterator":
return self
def __next__(self) -> int:
if self.pushed:
return self.pushed.pop()
return next(self.it)
# For Python 2 compatibility
next = __next__
def push(self, item: int) -> None:
self.pushed.append(item)

View File

@@ -1,287 +0,0 @@
# Copyright (c) 2014, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
"""
Parsing for IMAP command responses with focus on FETCH responses as
returned by imaplib.
Initially inspired by http://effbot.org/zone/simple-iterator-parser.htm
"""
# TODO more exact error reporting
import datetime
import re
import sys
from collections import defaultdict
from typing import cast, Dict, Iterator, List, Optional, Tuple, TYPE_CHECKING, Union
from .datetime_util import parse_to_datetime
from .exceptions import ProtocolError
from .response_lexer import TokenSource
from .response_types import Address, BodyData, Envelope, SearchIds
from .typing_imapclient import _Atom
__all__ = ["parse_response", "parse_message_list"]
def parse_response(data: List[bytes]) -> Tuple[_Atom, ...]:
"""Pull apart IMAP command responses.
Returns nested tuples of appropriately typed objects.
"""
if data == [None]:
return tuple()
return tuple(gen_parsed_response(data))
_msg_id_pattern = re.compile(r"(\d+(?: +\d+)*)")
def parse_message_list(data: List[Union[bytes, str]]) -> SearchIds:
"""Parse a list of message ids and return them as a list.
parse_response is also capable of doing this but this is
faster. This also has special handling of the optional MODSEQ part
of a SEARCH response.
The returned list is a SearchIds instance which has a *modseq*
attribute which contains the MODSEQ response (if returned by the
server).
"""
if len(data) != 1:
raise ValueError("unexpected message list data")
message_data = data[0]
if not message_data:
return SearchIds()
if isinstance(message_data, bytes):
message_data = message_data.decode("ascii")
m = _msg_id_pattern.match(message_data)
if not m:
raise ValueError("unexpected message list format")
ids = SearchIds(int(n) for n in m.group(1).split())
# Parse any non-numeric part on the end using parse_response (this
# is likely to be the MODSEQ section).
extra = message_data[m.end(1) :]
if extra:
for item in parse_response([extra.encode("ascii")]):
if (
isinstance(item, tuple)
and len(item) == 2
and cast(bytes, item[0]).lower() == b"modseq"
):
if TYPE_CHECKING:
assert isinstance(item[1], int)
ids.modseq = item[1]
elif isinstance(item, int):
ids.append(item)
return ids
def gen_parsed_response(text: List[bytes]) -> Iterator[_Atom]:
if not text:
return
src = TokenSource(text)
token = None
try:
for token in src:
yield atom(src, token)
except ProtocolError:
raise
except ValueError:
_, err, _ = sys.exc_info()
raise ProtocolError("%s: %r" % (str(err), token))
_ParseFetchResponseInnerDict = Dict[
bytes, Optional[Union[datetime.datetime, int, BodyData, Envelope, _Atom]]
]
def parse_fetch_response(
text: List[bytes], normalise_times: bool = True, uid_is_key: bool = True
) -> "defaultdict[int, _ParseFetchResponseInnerDict]":
"""Pull apart IMAP FETCH responses as returned by imaplib.
Returns a dictionary, keyed by message ID. Each value a dictionary
keyed by FETCH field type (eg."RFC822").
"""
if text == [None]:
return defaultdict()
response = gen_parsed_response(text)
parsed_response: "defaultdict[int, _ParseFetchResponseInnerDict]" = defaultdict(
dict
)
while True:
try:
msg_id = seq = _int_or_error(next(response), "invalid message ID")
except StopIteration:
break
try:
msg_response = next(response)
except StopIteration:
raise ProtocolError("unexpected EOF")
if not isinstance(msg_response, tuple):
raise ProtocolError("bad response type: %s" % repr(msg_response))
if len(msg_response) % 2:
raise ProtocolError(
"uneven number of response items: %s" % repr(msg_response)
)
# always return the sequence of the message, so it is available
# even if we return keyed by UID.
msg_data: _ParseFetchResponseInnerDict = {b"SEQ": seq}
for i in range(0, len(msg_response), 2):
msg_attribute = msg_response[i]
if TYPE_CHECKING:
assert isinstance(msg_attribute, bytes)
word = msg_attribute.upper()
value = msg_response[i + 1]
if word == b"UID":
uid = _int_or_error(value, "invalid UID")
if uid_is_key:
msg_id = uid
else:
msg_data[word] = uid
elif word == b"INTERNALDATE":
msg_data[word] = _convert_INTERNALDATE(value, normalise_times)
elif word == b"ENVELOPE":
msg_data[word] = _convert_ENVELOPE(value, normalise_times)
elif word in (b"BODY", b"BODYSTRUCTURE"):
if TYPE_CHECKING:
assert isinstance(value, tuple)
msg_data[word] = BodyData.create(value)
else:
msg_data[word] = value
parsed_response[msg_id].update(msg_data)
return parsed_response
def _int_or_error(value: _Atom, error_text: str) -> int:
try:
return int(value) # type: ignore[arg-type]
except (TypeError, ValueError):
raise ProtocolError("%s: %s" % (error_text, repr(value)))
def _convert_INTERNALDATE(
date_string: _Atom, normalise_times: bool = True
) -> Optional[datetime.datetime]:
if date_string is None:
return None
try:
if TYPE_CHECKING:
assert isinstance(date_string, bytes)
return parse_to_datetime(date_string, normalise=normalise_times)
except ValueError:
return None
def _convert_ENVELOPE(
envelope_response: _Atom, normalise_times: bool = True
) -> Envelope:
if TYPE_CHECKING:
assert isinstance(envelope_response, tuple)
dt = None
if envelope_response[0]:
try:
if TYPE_CHECKING:
assert isinstance(envelope_response[0], bytes)
dt = parse_to_datetime(
envelope_response[0],
normalise=normalise_times,
)
except ValueError:
pass
subject = envelope_response[1]
in_reply_to = envelope_response[8]
message_id = envelope_response[9]
if TYPE_CHECKING:
assert isinstance(subject, bytes)
assert isinstance(in_reply_to, bytes)
assert isinstance(message_id, bytes)
# addresses contains a tuple of addresses
# from, sender, reply_to, to, cc, bcc headers
addresses: List[Optional[Tuple[Address, ...]]] = []
for addr_list in envelope_response[2:8]:
addrs = []
if addr_list:
if TYPE_CHECKING:
assert isinstance(addr_list, tuple)
for addr_tuple in addr_list:
if TYPE_CHECKING:
assert isinstance(addr_tuple, tuple)
if addr_tuple:
if TYPE_CHECKING:
addr_tuple = cast(Tuple[bytes, bytes, bytes, bytes], addr_tuple)
addrs.append(Address(*addr_tuple))
addresses.append(tuple(addrs))
else:
addresses.append(None)
return Envelope(
date=dt,
subject=subject,
from_=addresses[0],
sender=addresses[1],
reply_to=addresses[2],
to=addresses[3],
cc=addresses[4],
bcc=addresses[5],
in_reply_to=in_reply_to,
message_id=message_id,
)
def atom(src: TokenSource, token: bytes) -> _Atom:
if token == b"(":
return parse_tuple(src)
if token == b"NIL":
return None
if token[:1] == b"{":
literal_len = int(token[1:-1])
literal_text = src.current_literal
if literal_text is None:
raise ProtocolError("No literal corresponds to %r" % token)
if len(literal_text) != literal_len:
raise ProtocolError(
"Expecting literal of size %d, got %d"
% (literal_len, len(literal_text))
)
return literal_text
if len(token) >= 2 and (token[:1] == token[-1:] == b'"'):
return token[1:-1]
if token.isdigit() and (token[:1] != b"0" or len(token) == 1):
# this prevents converting items like 0123 to 123
return int(token)
return token
def parse_tuple(src: TokenSource) -> _Atom:
out: List[_Atom] = []
for token in src:
if token == b")":
return tuple(out)
out.append(atom(src, token))
# no terminator
raise ProtocolError('Tuple incomplete before "(%s"' % _fmt_tuple(out))
def _fmt_tuple(t: List[_Atom]) -> str:
return " ".join(str(item) for item in t)

View File

@@ -1,148 +0,0 @@
# Copyright (c) 2014, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses
import dataclasses
import datetime
from email.utils import formataddr
from typing import Any, List, Optional, Tuple, TYPE_CHECKING, Union
from .typing_imapclient import _Atom
from .util import to_unicode
@dataclasses.dataclass
class Envelope:
r"""Represents envelope structures of messages. Returned when parsing
ENVELOPE responses.
:ivar date: A datetime instance that represents the "Date" header.
:ivar subject: A string that contains the "Subject" header.
:ivar from\_: A tuple of Address objects that represent one or more
addresses from the "From" header, or None if header does not exist.
:ivar sender: As for from\_ but represents the "Sender" header.
:ivar reply_to: As for from\_ but represents the "Reply-To" header.
:ivar to: As for from\_ but represents the "To" header.
:ivar cc: As for from\_ but represents the "Cc" header.
:ivar bcc: As for from\_ but represents the "Bcc" recipients.
:ivar in_reply_to: A string that contains the "In-Reply-To" header.
:ivar message_id: A string that contains the "Message-Id" header.
A particular issue to watch out for is IMAP's handling of "group
syntax" in address fields. This is often encountered as a
recipient header of the form::
undisclosed-recipients:;
but can also be expressed per this more general example::
A group: a@example.com, B <b@example.org>;
This example would yield the following Address tuples::
Address(name=None, route=None, mailbox=u'A group', host=None)
Address(name=None, route=None, mailbox=u'a', host=u'example.com')
Address(name=u'B', route=None, mailbox=u'b', host=u'example.org')
Address(name=None, route=None, mailbox=None, host=None)
The first Address, where ``host`` is ``None``, indicates the start
of the group. The ``mailbox`` field contains the group name. The
final Address, where both ``mailbox`` and ``host`` are ``None``,
indicates the end of the group.
See :rfc:`3501#section-7.4.2` and :rfc:`2822` for further details.
"""
date: Optional[datetime.datetime]
subject: bytes
from_: Optional[Tuple["Address", ...]]
sender: Optional[Tuple["Address", ...]]
reply_to: Optional[Tuple["Address", ...]]
to: Optional[Tuple["Address", ...]]
cc: Optional[Tuple["Address", ...]]
bcc: Optional[Tuple["Address", ...]]
in_reply_to: bytes
message_id: bytes
@dataclasses.dataclass
class Address:
"""Represents electronic mail addresses. Used to store addresses in
:py:class:`Envelope`.
:ivar name: The address "personal name".
:ivar route: SMTP source route (rarely used).
:ivar mailbox: Mailbox name (what comes just before the @ sign).
:ivar host: The host/domain name.
As an example, an address header that looks like::
Mary Smith <mary@foo.com>
would be represented as::
Address(name=u'Mary Smith', route=None, mailbox=u'mary', host=u'foo.com')
See :rfc:`2822` for more detail.
See also :py:class:`Envelope` for information about handling of
"group syntax".
"""
name: bytes
route: bytes
mailbox: bytes
host: bytes
def __str__(self) -> str:
if self.mailbox and self.host:
address = to_unicode(self.mailbox) + "@" + to_unicode(self.host)
else:
address = to_unicode(self.mailbox or self.host)
return formataddr((to_unicode(self.name), address))
class SearchIds(List[int]):
"""
Contains a list of message ids as returned by IMAPClient.search().
The *modseq* attribute will contain the MODSEQ value returned by
the server (only if the SEARCH command sent involved the MODSEQ
criteria). See :rfc:`4551` for more details.
"""
def __init__(self, *args: Any):
super().__init__(*args)
self.modseq: Optional[int] = None
_BodyDataType = Tuple[Union[bytes, int, "BodyData"], "_BodyDataType"]
class BodyData(_BodyDataType):
"""
Returned when parsing BODY and BODYSTRUCTURE responses.
"""
@classmethod
def create(cls, response: Tuple[_Atom, ...]) -> "BodyData":
# In case of multipart messages we will see at least 2 tuples
# at the start. Nest these in to a list so that the returned
# response tuple always has a consistent number of elements
# regardless of whether the message is multipart or not.
if isinstance(response[0], tuple):
# Multipart, find where the message part tuples stop
parts = []
for i, part in enumerate(response):
if isinstance(part, bytes):
break
if TYPE_CHECKING:
assert isinstance(part, tuple)
parts.append(part)
return cls(([cls.create(part) for part in parts],) + response[i:])
return cls(response)
@property
def is_multipart(self) -> bool:
return isinstance(self[0], list)

View File

@@ -1,3 +0,0 @@
# Copyright (c) 2014, Menno Smits
# Released subject to the New BSD License
# Please see http://en.wikipedia.org/wiki/BSD_licenses

Some files were not shown because too many files have changed in this diff Show More