wide-midnight-78598
04/15/2023, 9:40 PMwide-midnight-78598
04/15/2023, 9:41 PMnarrow-vegetable-37489
04/15/2023, 10:00 PMwide-midnight-78598
04/15/2023, 10:13 PMwide-midnight-78598
04/15/2023, 10:14 PMwide-midnight-78598
04/15/2023, 10:14 PMwide-midnight-78598
04/15/2023, 10:18 PMnarrow-vegetable-37489
04/15/2023, 10:39 PMwide-midnight-78598
04/15/2023, 11:09 PMbroad-processor-92400
04/15/2023, 11:12 PMwide-midnight-78598
04/15/2023, 11:21 PMpython3 -m smtpd -c DebuggingServer -n localhost:2525
And that runs an SMTP server that prints to stdout. Pretty trivial to either pipe that to a shared file, or launch a FastAPI process that wraps the incoming messages into an API.
Thanks for the help guys!wide-midnight-78598
04/16/2023, 12:05 AMfrom contextlib import asynccontextmanager
from threading import Lock
from fastapi import FastAPI
from aiosmtpd.controller import Controller
# Arguably the SMTP server should be either an asyncio task, or a separate process - but this is just for low-load e2e testing
# TODO: If the e2e tests run via multi-process, maybe it's worth making this a queue and being a bit smarter about it
lock = Lock()
mailbox: dict = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
handler = CustomHandler()
controller = Controller(handler, hostname="localhost", port=2525)
controller.start()
yield
controller.stop()
class CustomHandler:
async def handle_DATA(self, server, session, envelope):
mail_from = envelope.mail_from
rcpt_tos = envelope.rcpt_tos
data = envelope.content # type: bytes
print(f"{envelope}, {mail_from}, {rcpt_tos}, {data}")
with lock:
mailbox[mail_from] = {
"from": mail_from,
"to": rcpt_tos,
"body": data.decode("utf-8"),
}
return "250 OK"
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def root():
with lock:
return mailbox