heartrate


1 # heartrate
2 if __name__ == "__main__":
3 from sys import platform
4
5 from logging_config import configure_logging
6 from rich.console import Console
7
8 RICH_CONSOLE = Console(width=None if platform == "win32" else 175, log_time=platform == "win32")
9
10 configure_logging(RICH_CONSOLE)
11 else:
12 from logging_config import RICH_CONSOLE
13
14
15 from asyncio.queues import Queue
16 from datetime import date
17 from logging import getLogger
18 from re import compile
19 from ssl import create_default_context
20 from time import sleep
21
22 from environment_init_vars import FATAL_EVENT, SETTINGS
23 from err_handling import handle_fatal_exc_sync
24 from imap_tools import A, MailBox, MailMessage, UidRange
25
26 logger = getLogger(__name__)
27
28
29 STATIC_DATE_FILTER = date(2026, 4, 8) # Only process emails from this date onward to avoid old backlog
30
31
32 RESPONSE_UID_PATTERN = compile(r"^\* (?P<uid>\d+) (?P<resp>[A-Z]+).*$")
33
34
35 @handle_fatal_exc_sync
36 def start_imap_email_monitoring(queue: Queue[MailMessage]) -> None:
37 """Start the IMAP email monitoring. Runs in a separate thread"""
38 # waiting for updates 60 sec, print unseen immediately if any update
39 from heartrate import trace
40
41 trace(
42 port=9999,
43 host="127.0.0.1" if __debug__ else "0.0.0.0",
44 browser=__debug__,
45 daemon=True,
46 )
47
48 1 ssl_context = create_default_context()
49 1 exists_but_unfound: set[int] = set()
50
51 9 while True:
52 9 logger.info(f"Emails currently in processing queue: {queue.qsize()}")
53 9 sleep(0) # Yield control to allow the main thread to run
54
55 9 logger.info(f"Connecting to IMAP server {SETTINGS.watch_imap_server}:{SETTINGS.watch_imap_port}")
56 9 logger.info(f" Using email: {SETTINGS.watch_email}")
57 35 with MailBox(
58 9 host=SETTINGS.watch_imap_server,
59 9 port=SETTINGS.watch_imap_port,
60 9 ssl_context=ssl_context,
61 18 ).login(SETTINGS.watch_email, SETTINGS.watch_email_pwd, "Inbox") as mailbox:
62 # Attempting fetch of unfound emails from previous EXISTS responses before polling for new ones
63 9 if exists_but_unfound:
64 mailbox.folder.set("Inbox")
65 logger.info(f"Attempting to fetch previously unfound emails with UIDs: {exists_but_unfound}")
66 for uid in exists_but_unfound.copy(): # Iterate over a copy of the set
67 logger.info(f" Attempting direct UID fetch for previously unfound email with UID: {uid}...")
68 fetch_found = False
69 for msg in mailbox.fetch(
70 A(
71 uid=UidRange(str(uid), "*"),
72 from_="emails@mailing.goftx.com",
73 date_gte=STATIC_DATE_FILTER,
74 text="report contents",
75 no_keyword="AutoMon_Seen",
76 )
77 ):
78 fetch_found = True
79 if msg.uid is not None:
80 flag_as_seen(msg, mailbox)
81 logger.info(f" Previously unfound email found with UID: {msg.uid}, subject: {msg.subject}. Adding to processing queue.")
82 queue.put_nowait(msg)
83 if msg.uid is not None:
84 logger.info(f" Email with UID {msg.uid} found and added to queue. Removing from unfound list.")
85 exists_but_unfound.discard(int(msg.uid)) # Remove from the original list if found
86 if not fetch_found:
87 logger.info(f" Email with UID {uid} still not found. Will check again on next IDLE response.")
88
89 9 logger.info("Entering IMAP IDLE mode to wait for new emails...")
90 9 mailbox.client
91 17 with mailbox.idle as idle:
92 9 logger.info("Polling for new emails...")
93 9 responses = idle.poll(SETTINGS.watch_polling_timeout_sec)
94
95 8 if FATAL_EVENT.is_set():
96 break
97
98 8 if responses:
99 logger.info(f" IMAP IDLE response received: {responses}. Refreshing mailbox")
100 mailbox.folder.set("Inbox")
101
102 match = RESPONSE_UID_PATTERN.match(responses[0].decode())
103 if match is None:
104 logger.error(f" Received IMAP response did not match expected pattern: {responses[0].decode()}.")
105 continue
106
107 logger.info(
108 " Attempting fetch for emails with the following criteria:"
109 " From: emails@mailing.goftx.com\n"
110 f" Date >= {STATIC_DATE_FILTER}\n"
111 " Text contains: 'report contents'\n"
112 " Does not have keyword: 'AutoMon_Seen'"
113 )
114
115 fetch_found = False
116 for msg in mailbox.fetch(
117 A(
118 from_="emails@mailing.goftx.com",
119 date_gte=STATIC_DATE_FILTER,
120 text="report contents",
121 no_keyword="AutoMon_Seen",
122 ),
123 ):
124 fetch_found = True
125 if msg.uid is not None:
126 flag_as_seen(msg, mailbox)
127 exists_but_unfound.discard(int(msg.uid)) # Remove from unfound list if it was there, no error if it wasn't
128 logger.info(f" New email found with UID: {msg.uid}, subject: {msg.subject}. Adding to processing queue.")
129 queue.put_nowait(msg)
130
131 if not fetch_found:
132 logger.info(" No matching unseen emails found. Will check again on next IDLE response\n")
133 exists_but_unfound.add(int(match.group("uid")))
134
135 else:
136 logger.info(" Finished processing IMAP IDLE response.\n")
137
138 else:
139 8 logger.info(f"no updates in {SETTINGS.watch_polling_timeout_sec} sec\n")
140
141
142 def flag_as_seen(msg: MailMessage, mailbox: MailBox):
143 assert msg.uid is not None, "This is impossible."
144 logger.info(f" Flagging {msg.uid} as seen")
145 mailbox.flag(msg.uid, "AutoMon_Seen", True)
146 # mailbox.flag(msg.uid, MailMessageFlags.SEEN, True)
147 logger.info(f" {msg.uid} flagged as seen")
148
149
150 if __name__ == "__main__":
151 test_queue = Queue()
152 start_imap_email_monitoring(test_queue)

Stack trace: