2from purepyindi2
import device, properties, constants, messages
11from magaox.db import Telem, FileOrigin, UserLog
13from magaox.utils import parse_iso_datetime_as_utc, creation_time_from_filename
28from datetime
import timezone
39 def __init__(self, conn: psycopg.Connection, timeout_sec=30.0):
58 def __init__(self, host, events_queue, log_name):
65 origin_host=self.
host,
67 creation_time=creation_time_from_filename(
event.src_path, stat_result=stat_result),
77 except FileNotFoundError:
86 except FileNotFoundError:
91CREATE_CONNECTION_TIMEOUT_SEC = 2
99 args = logdump_args + (
'--dir='+logdump_dir,
'-J',
'-f', name)
100 log.debug(f
"Running logdump command {repr(' '.join(args))} for {name} in follow mode")
107 raise RuntimeError(f
"{name} logdump exited with {p.returncode} ({repr(' '.join(args))})")
108 except Exception
as e:
109 glob_pattern = logdump_dir + f
"/{name}/*/{name}_*"
111 log.info(f
"Looks like {name} is a Python app; support is TODO")
116 log.info(f
"No files found for {name}, waiting for them to appear")
122 proclist : str =
xconf.field(default=
"/opt/MagAOX/config/proclist_%s.txt", help=
"Path to process list file, %s will be replaced with the value of $MAGAOX_ROLE (or an empty string if absent from the environment)")
123 query_timeout_sec : float =
xconf.field(default=30.0, help=
"Number of seconds after which to (attempt to) cancel an insert query under the assumption the connection's gone bad")
124 logdump_exe : str =
xconf.field(default=
"/opt/MagAOX/bin/logdump", help=
"logdump (a.k.a. teldump) executable to use")
127 config : dbIngestConfig
130 fs_observer : BaseObserverSubclassCallable
132 last_update_ts_sec : float
133 startup_ts_sec : float
134 records_since_startup : float
136 _connections_to_attempt : set[str]
143 telem_args = self.log.name +
'.' + dev,
'/opt/MagAOX/telem', (self.
config.logdump_exe,
'--ext=.bintel'), dev, self.
telem_queuetelem_queue, Telem
144 telem_thread =
threading.Thread(target=_run_logdump_thread, args=telem_args, daemon=
True)
146 self.log.
debug(f
"Watching {dev} for incoming telem")
149 if dev ==
"observers":
151 user_log_thread =
threading.Thread(target=_run_logdump_thread, args= ULog_args, daemon=
True)
153 self.log.
debug(f
"Watching {dev} for incoming user logs")
158 self.update_property(self.properties[
'last_update'])
161 self.update_property(self.properties[
'records'])
173 min=0.0, max=1e200, format=
'%f',
176 self.add_property(last_update)
182 min=0.0, max=1e200, format=
'%f',
186 name=
"since_startup",
188 min=0, max=1_000_000_000, format=
'%i',
191 self.add_property(records)
208 raise RuntimeError(f
"Got malformed proclist line: {repr(line)}")
216 for dev
in device_names:
224 for dirname
in self.
config.data_dirs:
225 dirpath = self.
config.common_path_prefix / dirname
227 self.log.
debug(f
"No {dirpath} to watch")
230 self.log.info(f
"Watching {dirpath} for changes")
235 if self.log_file_name.
encode(
'utf8')
not in line:
241 connections_to_reattempt = set()
249 self.log.info(f
"Connected to {configkey} db")
251 self.log.exception(f
"Failed to connect to {configkey} ({self.config.databases[configkey]})")
283 self.log.
debug(f
"Batching ingest for {connkey}")
289 except Exception
as e:
290 self.log.exception(f
"Caught exception {e} in batch telem ingest, reconnecting {connkey} on next loop()")
301 except Exception
as e:
302 self.log.exception(f
"Caught exception {e} in batch file origins ingest, reconnecting {connkey} on next loop()")
313 except Exception
as e:
314 self.log.exception(f
"Caught exception {e} in batch user log ingest, reconnecting {connkey} on next loop()")
__init__(self, host, events_queue, log_name)
construct_message(self, stat_result, event, is_new_file=False)
__init__(self, psycopg.Connection conn, timeout_sec=30.0)
float records_since_startup
set _connections_to_attempt
BaseObserverSubclassCallable fs_observer
launch_followers(self, dev)
_run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue, record_class)