API
 
Loading...
Searching...
No Matches
dbIngest.py
Go to the documentation of this file.
1import glob
2from purepyindi2 import device, properties, constants, messages
3from purepyindi2.messages import DefNumber, DefSwitch, DefText
4import sys
5import logging
6import xconf
7import psycopg
8from magaox.indi.device import XDevice
9from magaox.constants import DEFAULT_PREFIX
10from magaox.db.config import BaseDbDeviceConfig
11from magaox.db import Telem, FileOrigin, UserLog
12from magaox.db import ingest
13from magaox.utils import parse_iso_datetime_as_utc, creation_time_from_filename
14
15import json
16import orjson
17import xconf
18import subprocess
19import queue
20import socket
21import threading
22import pathlib
23import time
24import os.path
25import os
26import sys
27import datetime
28from datetime import timezone
29from watchdog.observers import Observer, BaseObserverSubclassCallable
30from watchdog.events import FileSystemEventHandler
31
32log = logging.getLogger(__name__)
33
34class TimeoutError(Exception):
35 pass
36
38 dt_sec : float = 0.1
39 def __init__(self, conn: psycopg.Connection, timeout_sec=30.0):
40 self.exit = False
41 self.conn = conn
42 self.timeout_sec = 30.0
43 super().__init__(target=self.run)
44 def complete(self):
45 self.exit = True
46 def run(self):
47 total_wait_sec = 0.0
48 while not self.exit and total_wait_sec < self.timeout_sec:
50 total_wait_sec += self.dt_secdt_sec
51 if self.exit:
52 return
53 else:
54 self.conn.cancel_safe()
55 raise TimeoutError()
56
57class NewXFilesHandler(FileSystemEventHandler):
58 def __init__(self, host, events_queue, log_name):
59 self.host = host
60 self.events_queue = events_queue
61 self.log = logging.getLogger(log_name)
62
63 def construct_message(self, stat_result, event, is_new_file=False):
64 return FileOrigin(
65 origin_host=self.host,
66 origin_path=event.src_path,
67 creation_time=creation_time_from_filename(event.src_path, stat_result=stat_result),
69 size_bytes=stat_result.st_size,
70 )
71
72 def on_created(self, event):
74 return
75 try:
76 stat_result = os.stat(event.src_path)
77 except FileNotFoundError:
78 return
79 self.events_queue.put(self.construct_message(stat_result, event, is_new_file=True))
80
81 def on_modified(self, event):
83 return
84 try:
85 stat_result = os.stat(event.src_path)
86 except FileNotFoundError:
87 return
88 self.events_queue.put(self.construct_message(stat_result, event, is_new_file=False))
89
90RETRY_WAIT_SEC = 2
91CREATE_CONNECTION_TIMEOUT_SEC = 2
92EXIT_TIMEOUT_SEC = 2
93
94def _run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue, record_class):
95 # filter what content from user_logs gets put into db
96 log = logging.getLogger(logger_name)
97 while True:
98 try:
99 args = logdump_args + ('--dir='+logdump_dir, '-J', '-f', name)
100 log.debug(f"Running logdump command {repr(' '.join(args))} for {name} in follow mode")
101 p = subprocess.Popen(args, stdout=subprocess.PIPE, stdin=subprocess.DEVNULL, stderr=subprocess.DEVNULL, text=True)
102 for line in p.stdout:
103 message = record_class.from_json(name, line)
104 message_queue.put(message)
105 p.wait() # stdout is over when the process exits
106 if p.returncode != 0:
107 raise RuntimeError(f"{name} logdump exited with {p.returncode} ({repr(' '.join(args))})")
108 except Exception as e:
109 glob_pattern = logdump_dir + f"/{name}/*/{name}_*"
110 if len(glob.glob(glob_pattern + ".ndjson.gz")):
111 log.info(f"Looks like {name} is a Python app; support is TODO")
112 return
113 if len(glob.glob(glob_pattern)):
114 log.exception(f"Exception in log/telem follower for {name}")
115 else:
116 log.info(f"No files found for {name}, waiting for them to appear")
117 while not len(glob.glob(glob_pattern)):
118 time.sleep(RETRY_WAIT_SEC)
119
120@xconf.config
121class dbIngestConfig(BaseDbDeviceConfig):
122 proclist : str = xconf.field(default="/opt/MagAOX/config/proclist_%s.txt", help="Path to process list file, %s will be replaced with the value of $MAGAOX_ROLE (or an empty string if absent from the environment)")
123 query_timeout_sec : float = xconf.field(default=30.0, help="Number of seconds after which to (attempt to) cancel an insert query under the assumption the connection's gone bad")
124 logdump_exe : str = xconf.field(default="/opt/MagAOX/bin/logdump", help="logdump (a.k.a. teldump) executable to use")
125
126class dbIngest(XDevice):
127 config : dbIngestConfig
128 telem_threads : list[tuple[str, threading.Thread]]
129 telem_queue : queue.Queue
130 fs_observer : BaseObserverSubclassCallable
131 fs_queue : queue.Queue
132 last_update_ts_sec : float
133 startup_ts_sec : float
134 records_since_startup : float
135 _connections : dict[str, psycopg.Connection]
136 _connections_to_attempt : set[str]
137
138 #add user_log support here
139 user_log_threads : list[tuple[str, threading.Thread]]
140 user_log_queue : queue.Queue
141
142 def launch_followers(self, dev):
143 telem_args = self.log.name + '.' + dev, '/opt/MagAOX/telem', (self.config.logdump_exe, '--ext=.bintel'), dev, self.telem_queuetelem_queue, Telem
144 telem_thread = threading.Thread(target=_run_logdump_thread, args=telem_args, daemon=True)
146 self.log.debug(f"Watching {dev} for incoming telem")
147 self.telem_threadstelem_threads.append((dev, telem_thread))
148
149 if dev == "observers":
150 ULog_args = self.log.name + '.' + dev, '/opt/MagAOX/logs', (self.config.logdump_exe, '--ext=.binlog'), dev, self.user_log_queueuser_log_queue, UserLog
151 user_log_thread = threading.Thread(target=_run_logdump_thread, args= ULog_args, daemon=True)
153 self.log.debug(f"Watching {dev} for incoming user logs")
154 self.user_log_threadsuser_log_threads.append((dev, user_log_thread))
155
157 self.properties['last_update']['timestamp'] = self.last_update_ts_seclast_update_ts_sec
158 self.update_property(self.properties['last_update'])
159 self.properties['records']['since_startup'] = self.records_since_startuprecords_since_startup
160 self.properties['records']['per_sec'] = self.records_since_startuprecords_since_startup / (time.time() - self.startup_ts_secstartup_ts_sec)
161 self.update_property(self.properties['records'])
162
163 def setup(self):
169 last_update = properties.NumberVector(name="last_update", perm=constants.PropertyPerm.READ_ONLY)
170 last_update.add_element(DefNumber(
171 name="timestamp",
173 min=0.0, max=1e200, format='%f',
174 step=1e-6,
175 ))
176 self.add_property(last_update)
177
178 records = properties.NumberVector(name="records", perm=constants.PropertyPerm.READ_ONLY)
179 records.add_element(DefNumber(
180 name="per_sec",
181 _value=0.0,
182 min=0.0, max=1e200, format='%f',
183 step=1e-6,
184 ))
185 records.add_element(DefNumber(
186 name="since_startup",
187 _value=0,
188 min=0, max=1_000_000_000, format='%i',
189 step=1,
190 ))
191 self.add_property(records)
192
193 role = os.environ.get('MAGAOX_ROLE', '')
194 proclist = pathlib.Path(self.config.proclist.replace('%s', role))
195 if not proclist.exists():
196 raise RuntimeError(f"No process list at {proclist}")
197
198 device_names = set()
199
200 with proclist.open() as fh:
201 for line in fh:
202 if not line.strip():
203 continue
204 if line.strip()[0] == '#':
205 continue
206 parts = line.split(None, 1)
207 if len(parts) != 2:
208 raise RuntimeError(f"Got malformed proclist line: {repr(line)}")
209 device_names.add(parts[0])
210
213
216 for dev in device_names:
217 self.launch_followers(dev)
218
220
222 event_handler = NewXFilesHandler(self.config.hostname, self.fs_queuefs_queue, self.log.name + '.fs_observer')
223 self.fs_observerfs_observer = Observer()
224 for dirname in self.config.data_dirs:
225 dirpath = self.config.common_path_prefix / dirname
226 if not dirpath.exists():
227 self.log.debug(f"No {dirpath} to watch")
228 continue
229 self.fs_observerfs_observer.schedule(event_handler, dirpath, recursive=True)
230 self.log.info(f"Watching {dirpath} for changes")
232
233 def ingest_line(self, line):
234 # avoid infinite loop of modifying log file and getting notified of the modification
235 if self.log_file_name.encode('utf8') not in line:
236 self.log.debug(line)
237
239 for configkey in self.config.databases.keys():
240 if configkey in self._connections_to_attempt_connections_to_attempt or self._connections_connections[configkey].closed:
241 connections_to_reattempt = set()
243 try:
244 self._connections_connections[configkey].close()
245 except Exception:
246 pass
247 try:
248 self._connections_connections[configkey] = self.config.databases[configkey].connect()
249 self.log.info(f"Connected to {configkey} db")
250 except Exception:
251 self.log.exception(f"Failed to connect to {configkey} ({self.config.databases[configkey]})")
253 self._connections_to_attempt_connections_to_attempt = connections_to_reattempt
254
255 def loop(self):
256 self._ensure_connected()
257 telems = []
258 try:
259 while rec := self.telem_queuetelem_queue.get(timeout=0.1):
260 telems.append(rec)
262 except queue.Empty:
263 pass
264
265 fs_events = []
266 try:
267 while rec := self.fs_queuefs_queue.get(timeout=0.1):
270 except queue.Empty:
271 pass
272
273 user_logs = []
274 try:
275 while rec := self.user_log_queueuser_log_queue.get(timeout=0.1):
278 except queue.Empty:
279 pass
280
281 for connkey in self._connections_connections:
282 conn = self._connections_connections[connkey]
283 self.log.debug(f"Batching ingest for {connkey}")
284 query_timeout = QueryCancellationWatchdog(conn, timeout_sec=self.config.query_timeout_sec)
285 try:
287 ingest.batch_telem(conn, telems)
289 except Exception as e:
290 self.log.exception(f"Caught exception {e} in batch telem ingest, reconnecting {connkey} on next loop()")
292 continue
293 finally:
295
296 query_timeout = QueryCancellationWatchdog(conn, timeout_sec=self.config.query_timeout_sec)
297 try:
299 ingest.batch_file_origins(conn, fs_events)
301 except Exception as e:
302 self.log.exception(f"Caught exception {e} in batch file origins ingest, reconnecting {connkey} on next loop()")
304 continue
305 finally:
307
308 query_timeout = QueryCancellationWatchdog(conn, timeout_sec=self.config.query_timeout_sec)
309 try:
311 ingest.batch_user_log(conn, user_logs)
313 except Exception as e:
314 self.log.exception(f"Caught exception {e} in batch user log ingest, reconnecting {connkey} on next loop()")
316 continue
317 finally:
319
320 this_ts_sec = time.time()
321 self.last_update_ts_seclast_update_ts_sec = this_ts_sec
322 self.refresh_properties()
323
__init__(self, host, events_queue, log_name)
Definition dbIngest.py:58
on_modified(self, event)
Definition dbIngest.py:81
on_created(self, event)
Definition dbIngest.py:72
construct_message(self, stat_result, event, is_new_file=False)
Definition dbIngest.py:63
__init__(self, psycopg.Connection conn, timeout_sec=30.0)
Definition dbIngest.py:39
float records_since_startup
Definition dbIngest.py:134
float last_update_ts_sec
Definition dbIngest.py:132
set _connections_to_attempt
Definition dbIngest.py:136
BaseObserverSubclassCallable fs_observer
Definition dbIngest.py:130
launch_followers(self, dev)
Definition dbIngest.py:142
ingest_line(self, line)
Definition dbIngest.py:233
_ensure_connected(self)
Definition dbIngest.py:238
dbIngestConfig config
Definition dbIngest.py:127
refresh_properties(self)
Definition dbIngest.py:156
_run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue, record_class)
Definition dbIngest.py:94