I get TONS of emails; newsletter subscriptions, spam, unsolicited offers, receipts, server updates, website updates, server alerts, team updates, and actual conversations about work and personal matters. All of this in ONE place. I know this resonates with almost everyone even though your stack may look differently. I do implement a pile of filters that makes this better and moves messages to folders but this functionality is limited and spam filters are terrible at catching the dozen or so unsolicited offers I get each day. When it comes to updates and alerts it is difficult to pull important ones out without reading the body of each email.
Last week I reached a tipping point where I just had to figure this out. I wondered if I could stand up a simple AI agent that could do this for me. I did ask ChatGPT if this qualifies as an AI Agent and indeed it does, albeit a simple one!
Defining the Problem
Here is what I wanted out of this tool.
- Privacy first! This must run on a local LLM without exposing my emails a language model on the internet.
- Ability to read and sort my email without deleting or marking things unread (unless I choose for that to happen).
- Updates need to not only happen in the inbox on my computer, but also on the mail server so that all of my devices are in sync.
- I need to be able to trust that it is making the correct judgement calls and is not moving important emails to a spam folder.
Why a Local Email AI Agent?
Most AI email filters or agents rely on cloud-based services that scan your emails, raising security and privacy concerns. By using Ollama, I can process emails entirely offline or on a private server, keeping data private while automating tedious email management. This solution runs locally on my computer for now but I will be moving it to a computer/server in my office in the future. The setup is as follows:
- Uses FastMail’s IMAP & JMAP APIs to fetch emails and “contacts”.
- Runs Ollama (Mistral model) locally for AI-driven filtering.
- Detects solicitations & spam and moves them to a “Possible Spam” folder.
- Ensures 100% privacy by keeping all processing my computer.
How Should We Do It?
Below are the detailed steps I took, but this approach will only work for you if you use FastMail and have the same needs as I do. However, it does illustrate the core concept. After setting up the initial system, I added a whitelisting mechanism to ensure that messages from people I’ve emailed are never moved. I also fine-tuned the prompts after noticing instances where messages were incorrectly categorized. While I still rely on FastMail’s built-in manual blocking features, I’m exploring ways to let the AI automatically block senders who repeatedly end up in my possible spam folder.

How Did It Turn Out?
All in all it turned out great! So far I think of it as an assistant that I am still training as I watch over their shoulder. As I continue to tune the prompts I can see how this becomes much more capable. This whole experience has me now extending it to my Gmail accounts. As part of this we will also add a feedback loop so the AI can learn from mistakes. Stay Tuned!
If you want to get into the details, keep reading. The source code for how this works is below. We are also now helping customers setup AI solutions. Just reach out!
Deep Dive
Step 1: Set Up Your FastMail API Token
To access FastMail’s APIs, you need an App Password:
- Log into FastMail.
- Go to Settings > Privacy & Security > App Passwords.
- Click Add App Password and enable:
- ✅ IMAP (Read & Write)
- ✅ JMAP (Read Only)
- Copy the password (you won’t see it again!).
Step 2: Install Required Python Libraries
Ensure you have Python installed, then install the necessary packages:
pip install imapclient requests ollama
If you’re on a Mac M1/M2, ensure you install for the correct architecture:
arch -arm64 pip install imapclient requests ollama
Step 3: Python Script to Fetch & Filter Emails
The script does the following:
- Fetches unread emails from FastMail.
- Retrieves contacts via FastMail’s JMAP API.
- Uses Ollama’s AI model to detect spam & solicitations.
- Moves unwanted emails to a “Possible Spam” folder.
import requests
import json
import imapclient
import email
from email.policy import default
import ollama
from config import * # Import credentials from config.py
import time
import logging
from datetime import datetime, timedelta
import ssl
import json
from pathlib import Path
import argparse
import subprocess
import sys
# Add proper logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('email_filter.log'),
logging.StreamHandler()
]
)
class WhiteList:
def __init__(self, filename='whitelist.json', metadata_file='whitelist_metadata.json'):
self.filename = filename
self.metadata_file = metadata_file
self.emails = set()
self.wildcards = set() # New set for wildcard patterns
self.metadata = {}
self._load()
def _load(self):
# Load whitelist
try:
with open(self.filename, 'r') as f:
entries = json.load(f)
# Separate wildcards and regular emails
self.emails = {e.lower() for e in entries if not e.startswith('*')}
self.wildcards = {e.lower() for e in entries if e.startswith('*')}
except FileNotFoundError:
self.emails = set()
self.wildcards = set()
# Load metadata
try:
with open(self.metadata_file, 'r') as f:
self.metadata = json.load(f)
except FileNotFoundError:
self.metadata = {
"last_check": None,
"total_processed": 0
}
def save(self):
# Save whitelist combining both regular emails and wildcards
with open(self.filename, 'w') as f:
json.dump(list(self.emails | self.wildcards), f, indent=4)
# Save metadata
with open(self.metadata_file, 'w') as f:
json.dump(self.metadata, f, indent=4)
def add(self, email):
if email.startswith('*'):
self.wildcards.add(email.lower())
else:
self.emails.add(email.lower())
self.save()
def contains(self, email):
email = email.lower()
# Check exact matches first
if email in self.emails:
return True
# Then check wildcards
domain = email.split('@')[1] if '@' in email else ''
for wildcard in self.wildcards:
if wildcard == f'*@{domain}':
return True
return False
def build_from_sent_emails(self, days_back=30, force_full_rebuild=False):
# Build whitelist from sent emails since last check or within days_back if forcing rebuild
if force_full_rebuild:
start_date = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
logging.info(f"Rebuilding whitelist from sent emails (last {days_back} days)...")
else:
if self.metadata["last_check"]:
start_date = datetime.fromisoformat(self.metadata["last_check"]).strftime("%d-%b-%Y")
logging.info(f"Updating whitelist with emails sent since {start_date}")
else:
start_date = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
logging.info(f"First run - checking last {days_back} days of sent emails")
try:
# Try to create a verified SSL context
try:
ssl_context = ssl.create_default_context()
# For macOS, try to load certificates from certifi if available
try:
import certifi
ssl_context.load_verify_locations(certifi.where())
except ImportError:
logging.warning("certifi not found. Using system certificates.")
except Exception as e:
logging.error(f"Error creating SSL context: {e}")
logging.warning("Falling back to unverified context - NOT RECOMMENDED FOR PRODUCTION")
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
with imapclient.IMAPClient(
IMAP_SERVER,
ssl_context=ssl_context,
timeout=30
) as client:
client.login(IMAP_USER, IMAP_PASS)
client.select_folder("Sent")
messages = client.search(['SINCE', start_date])
if not messages:
logging.info("No new sent messages found")
self.metadata["last_check"] = datetime.now().isoformat()
self.save()
return
initial_count = len(self.emails)
processed_count = 0
# Fetch messages in batches
batch_size = 100
for i in range(0, len(messages), batch_size):
batch = messages[i:i + batch_size]
response = client.fetch(batch, ['RFC822'])
for msg_id, data in response.items():
try:
email_message = email.message_from_bytes(data[b'RFC822'], policy=default)
# Extract all recipient addresses
recipients = []
for header in ['to', 'cc', 'bcc']:
if email_message[header]:
recipients.extend(email_message[header].split(','))
# Process each recipient
for recipient in recipients:
try:
email_address = extract_email_address(recipient)
if email_address and '@' in email_address:
self.add(email_address)
except Exception as e:
logging.error(f"Error processing recipient {recipient}: {e}")
continue
processed_count += 1
except Exception as e:
logging.error(f"Error processing message {msg_id}: {e}")
continue
new_addresses = len(self.emails) - initial_count
self.metadata["last_check"] = datetime.now().isoformat()
self.metadata["total_processed"] = self.metadata.get("total_processed", 0) + processed_count
self.save()
logging.info(f"Processed {processed_count} emails")
logging.info(f"Added {new_addresses} new addresses to whitelist")
logging.info(f"Whitelist now contains {len(self.emails)} addresses")
except Exception as e:
logging.error(f"Error building whitelist from sent emails: {e}")
# Connect to FastMail IMAP, filter emails, and move spam
def fetch_and_filter_emails(whitelist):
logging.info("Starting email processing run")
try:
# Try to create a verified SSL context
try:
ssl_context = ssl.create_default_context()
# For macOS, try to load certificates from certifi if available
try:
import certifi
ssl_context.load_verify_locations(certifi.where())
except ImportError:
logging.warning("certifi not found. Using system certificates.")
except Exception as e:
logging.error(f"Error creating SSL context: {e}")
logging.warning("Falling back to unverified context - NOT RECOMMENDED FOR PRODUCTION")
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
with imapclient.IMAPClient(
IMAP_SERVER,
ssl_context=ssl_context,
timeout=30
) as client:
client.login(IMAP_USER, IMAP_PASS)
# Check if spam folder exists, create if not
folders = [f[2] for f in client.list_folders()]
if SPAM_FOLDER_NAME not in folders:
client.create_folder(SPAM_FOLDER_NAME)
client.select_folder("INBOX")
messages = client.search(["UNSEEN"])[:MAX_MESSAGES_PER_RUN]
if not messages:
logging.info("No new messages to process")
return
# Use BODY.PEEK to fetch without setting \Seen flag
response = client.fetch(messages, ["FLAGS", "BODY.PEEK[]"])
for msg_id, data in response.items():
try:
original_flags = data[b"FLAGS"]
msg = email.message_from_bytes(data[b"BODY[]"], policy=default)
subject = msg["subject"]
sender = msg["from"]
body = extract_email_body(msg)
email_address = extract_email_address(sender)
logging.info(f"📧 Checking Email from {sender} - {subject}...")
if whitelist.contains(email_address):
logging.info("✅ Sender is whitelisted. Keeping in Inbox.")
continue
ai_response = get_ai_decision(subject, sender, body)
logging.info(f"🔹 AI Decision: {ai_response}")
if ai_response == "YES":
logging.info(f"🚨 Moving email from {sender} to '{SPAM_FOLDER_NAME}'...")
# Move message while preserving flags
client.move([msg_id], SPAM_FOLDER_NAME)
# Restore original flags in new folder
client.select_folder(SPAM_FOLDER_NAME)
client.add_flags([msg_id], original_flags)
client.select_folder("INBOX")
else:
logging.info("✅ Email seems normal. Keeping in Inbox.")
time.sleep(PROCESS_DELAY)
except Exception as e:
logging.error(f"Error processing message {msg_id}: {e}")
continue
except Exception as e:
logging.error(f"Error connecting to mail server: {e}")
# Extract plain text from email
def extract_email_body(msg):
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
try:
# Try different encodings if utf-8 fails
content = part.get_payload(decode=True)
for encoding in ['utf-8', 'latin1', 'ascii', 'iso-8859-1']:
try:
return content.decode(encoding)
except UnicodeDecodeError:
continue
# If all encodings fail, return a sanitized version
return content.decode('utf-8', errors='ignore')
except Exception as e:
logging.error(f"Error decoding email body: {e}")
return ""
else:
try:
content = msg.get_payload(decode=True)
return content.decode('utf-8', errors='ignore')
except Exception as e:
logging.error(f"Error decoding email body: {e}")
return ""
# Extract the actual email address from sender field
def extract_email_address(sender):
if "<" in sender and ">" in sender:
return sender.split("<")[1].split(">")[0].strip()
return sender.strip()
def ensure_model_exists(model_name):
"""Ensure the AI model is downloaded and available"""
try:
logging.info(f"Checking if {model_name} model is available...")
# Try to pull the model if it doesn't exist
subprocess.run(['ollama', 'pull', model_name],
check=True,
capture_output=True)
logging.info(f"Model {model_name} is ready")
return True
except Exception as e:
logging.error(f"Failed to ensure model availability: {e}")
return False
def get_ai_decision(subject, sender, body):
# First ensure model exists
if not ensure_model_exists(AI_MODEL):
logging.error("Could not ensure AI model availability")
return "NO" # Default to keeping email if we can't use AI
prompt = f"""Analyze this email for signs of unsolicited commercial email or cold outreach.
Consider these factors:
- Generic greetings
- Mass marketing language
- Cold sales tactics
- Unsolicited business proposals
- Lack of personal connection
- Offering construction services
Email details:
Subject: {subject}
From: {sender}
Body: {body}
Respond with only 'YES' if this is clearly an unsolicited commercial email or cold outreach.
Respond with 'NO' if this appears to be legitimate correspondence.
"""
try:
# Remove timeout parameter as it's not supported
response = ollama.chat(
model="mistral",
messages=[{"role": "user", "content": prompt}]
)
return response['message']['content'].strip().upper()
except Exception as e:
logging.error(f"AI processing error: {e}")
return "NO" # Default to keeping email on error
def get_vulnerability_decision(subject, sender, body):
prompt = f"""Analyze this alert for critical security vulnerabilities or system issues.
Consider these factors for security:
- Severity level mentioned (Critical, High, Medium, Low)
- CVE numbers
- CVSS scores (7.0+ is High, 9.0+ is Critical)
- Words like "critical vulnerability", "high-risk", "zero-day"
- Immediate action required language
Consider these CRITICAL system issues:
- Server down/offline/unreachable messages (ALWAYS HIGH PRIORITY)
- Messages containing "server is down" or "server down"
- Messages from monitoring systems about downtime
- SSL/TLS certificate expiration/failure
- Database connection failures
- Authentication system problems
- Critical system errors
- Disk space warnings
- Resource exhaustion
- Messages from system bots (CloudwaysBot, monitoring systems, etc.)
Alert details:
Subject: {subject}
From: {sender}
Body: {body}
IMPORTANT: Server down notifications should ALWAYS be marked as HIGH priority!
Respond with only 'HIGH' if this is either:
1. A critical/high severity security vulnerability
2. A critical system issue requiring immediate attention (especially server down alerts)
Respond with 'NO' if this is a lower severity alert or non-critical issue.
"""
try:
response = ollama.chat(
model=AI_MODEL,
messages=[{"role": "user", "content": prompt}]
)
return response['message']['content'].strip().upper()
except Exception as e:
logging.error(f"AI processing error: {e}")
return "NO"
def process_alerts(whitelist):
logging.info("Starting alerts processing run")
try:
# SSL context setup (reuse from fetch_and_filter_emails)
ssl_context = ssl.create_default_context()
try:
import certifi
ssl_context.load_verify_locations(certifi.where())
except ImportError:
logging.warning("certifi not found. Using system certificates.")
with imapclient.IMAPClient(
IMAP_SERVER,
ssl_context=ssl_context,
timeout=30
) as client:
client.login(IMAP_USER, IMAP_PASS)
# Check if high alert folder exists, create if not
folders = [f[2] for f in client.list_folders()]
if HIGH_ALERT_FOLDER not in folders:
client.create_folder(HIGH_ALERT_FOLDER)
client.select_folder(ALERTS_FOLDER)
# Only search for unread messages
messages = client.search(["UNSEEN"])[:MAX_MESSAGES_PER_RUN]
if not messages:
logging.info("No new alerts to process")
return
response = client.fetch(messages, ["FLAGS", "BODY.PEEK[]"])
for msg_id, data in response.items():
try:
original_flags = data[b"FLAGS"]
msg = email.message_from_bytes(data[b"BODY[]"], policy=default)
subject = msg["subject"]
sender = msg["from"]
body = extract_email_body(msg)
logging.info(f"🔔 Checking Alert from {sender} - {subject}...")
ai_response = get_vulnerability_decision(subject, sender, body)
logging.info(f"🔹 AI Decision: {ai_response}")
if ai_response == "HIGH":
logging.info(f"⚠️ Moving high severity alert to '{HIGH_ALERT_FOLDER}'...")
# Move message and keep it unread
client.move([msg_id], HIGH_ALERT_FOLDER)
client.select_folder(HIGH_ALERT_FOLDER)
unread_flags = set(original_flags) - {b'\\Seen'}
client.set_flags([msg_id], unread_flags)
client.select_folder(ALERTS_FOLDER)
else:
logging.info("📥 Normal severity alert - marking as read")
# Mark as read in current folder
client.add_flags([msg_id], [b'\\Seen'])
time.sleep(PROCESS_DELAY)
except Exception as e:
logging.error(f"Error processing alert {msg_id}: {e}")
continue
except Exception as e:
logging.error(f"Error connecting to mail server: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='FastMail AI Filter')
parser.add_argument('--rebuild-whitelist', action='store_true',
help='Force full rebuild of whitelist from sent emails')
parser.add_argument('--days', type=int, default=30,
help='Number of days of sent emails to process for full rebuild')
parser.add_argument('--skip-spam', action='store_true',
help='Skip spam filtering')
parser.add_argument('--skip-alerts', action='store_true',
help='Skip alert processing')
args = parser.parse_args()
# Ensure AI model is available
if not ensure_model_exists(AI_MODEL):
logging.error("Could not ensure AI model availability. Exiting.")
sys.exit(1)
whitelist = WhiteList()
# Update whitelist
whitelist.build_from_sent_emails(
days_back=args.days,
force_full_rebuild=args.rebuild_whitelist
)
logging.info(f"Loaded whitelist with {len(whitelist.emails)} entries")
# Run spam filter
if not args.skip_spam:
fetch_and_filter_emails(whitelist)
# Run alert processor
if not args.skip_alerts:
process_alerts(whitelist)
Step 4: Automate the Script (Coming Soon)
Once I get this script running perfectly, I will move it to one of our servers and set it up to run automatically:
- Mac/Linux: Use a cron job
- Windows: Use Task Scheduler
Example cron job to run the script every 10 minutes:
*/10 * * * * /usr/bin/python3 /path/to/script.py