diff --git a/pg_view/__init__.py b/pg_view/__init__.py index 07422ea..b7d7ba6 100644 --- a/pg_view/__init__.py +++ b/pg_view/__init__.py @@ -12,11 +12,11 @@ from pg_view import consts from pg_view import flags -from pg_view.collectors.host_collector import HostStatCollector -from pg_view.collectors.memory_collector import MemoryStatCollector +from pg_view.collectors.host_collector import HostStatCollector, RemoteHostDataSource +from pg_view.collectors.memory_collector import MemoryStatCollector, RemoteMemoryDataSource from pg_view.collectors.partition_collector import PartitionStatCollector, DetachedDiskStatCollector from pg_view.collectors.pg_collector import PgstatCollector -from pg_view.collectors.system_collector import SystemStatCollector +from pg_view.collectors.system_collector import SystemStatCollector, RemoteSystemDataSource from pg_view.loggers import logger, enable_logging_to_stderr, disable_logging_to_stderr from pg_view.models.consumers import DiskCollectorConsumer from pg_view.models.db_client import build_connection, detect_db_connection_arguments, \ @@ -258,15 +258,25 @@ def main(): collector.start() consumer = DiskCollectorConsumer(q) - collectors.append(HostStatCollector()) - collectors.append(SystemStatCollector()) - collectors.append(MemoryStatCollector()) + use_local_data = not(options.host) or options.host.startswith('/') + if use_local_data: + collectors.append(HostStatCollector()) + collectors.append(SystemStatCollector()) + collectors.append(MemoryStatCollector()) + else: + pgcon = clusters[0]['pgcon'] + collectors.append(HostStatCollector(RemoteHostDataSource(pgcon))) + collectors.append(SystemStatCollector(RemoteSystemDataSource(pgcon))) + collectors.append(MemoryStatCollector(RemoteMemoryDataSource(pgcon))) + for cl in clusters: - part = PartitionStatCollector(cl['name'], cl['ver'], cl['wd'], consumer) pg = PgstatCollector(cl['pgcon'], cl['reconnect'], cl['pid'], cl['name'], cl['ver'], options.pid) groupname = cl['wd'] - groups[groupname] = {'pg': pg, 'partitions': part} - collectors.append(part) + groups[groupname] = {'pg': pg} + if use_local_data: + part = PartitionStatCollector(cl['name'], cl['ver'], cl['wd'], consumer) + groups[groupname]['partitions'] = part + collectors.append(part) collectors.append(pg) # we don't want to mix diagnostics messages with useful output, so we log the former into a file. diff --git a/pg_view/collectors/host_collector.py b/pg_view/collectors/host_collector.py index b3c4d5d..297ff1a 100644 --- a/pg_view/collectors/host_collector.py +++ b/pg_view/collectors/host_collector.py @@ -3,20 +3,83 @@ from datetime import timedelta from multiprocessing import cpu_count +import psycopg2 + from pg_view.collectors.base_collector import StatCollector from pg_view.loggers import logger from pg_view.models.outputs import COLSTATUS, COLHEADER +UPTIME_FILENAME = '/proc/uptime' + +class LocalHostDataSource(object): + def __int__(self): + pass + + def __call__(self): + try: + with open(UPTIME_FILENAME, 'rU') as f: + uptime = f.read().split() + except: + uptime = 0 + try: + ncpus = cpu_count() + except: + logger.error('multiprocessing does not support cpu_count') + ncpus = 0 + return { + 'uptime': uptime, + 'loadavg': os.getloadavg(), + 'hostname': socket.gethostname(), + 'uname': os.uname(), + 'ncpus': ncpus + } + + +class RemoteHostDataSource(object): + def __init__(self, pgcon): + self.pgcon = pgcon + + def __call__(self): + """ +CREATE OR REPLACE FUNCTION pgview.get_host_info(OUT uptime double precision[], OUT loadavg double precision[], OUT hostname text, OUT uname text[], OUT ncpus integer) + RETURNS record + LANGUAGE plpythonu +AS $function$ + import os + import socket + from multiprocessing import cpu_count + try: + with open('/proc/uptime', 'rU') as f: + uptime = f.read().split() + except: + uptime = 0 + try: + ncpus = cpu_count() + except: + ncpus = 0 + return (uptime, os.getloadavg(), socket.gethostname(), os.uname(), ncpus) +$function$ + """ + cur = self.pgcon.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + cur.execute("SELECT * FROM pgview.get_host_info()") + res = cur.fetchone() + cur.close() + self.pgcon.commit() + return res + + class HostStatCollector(StatCollector): """ General system-wide statistics """ - UPTIME_FILE = '/proc/uptime' + data = {} - def __init__(self): + def __init__(self, data_source=LocalHostDataSource()): super(HostStatCollector, self).__init__(produce_diffs=False) + self.data_source = data_source + self.transform_list_data = [{'out': 'loadavg', 'infn': self._concat_load_avg}] self.transform_uptime_data = [{'out': 'uptime', 'in': 0, 'fn': self._uptime_to_str}] self.transform_uname_data = [{'out': 'sysname', 'infn': self._construct_sysname}] @@ -65,6 +128,8 @@ def __init__(self): self.postinit() def refresh(self): + self.data = self.data_source() + raw_result = {} raw_result.update(self._read_uptime()) raw_result.update(self._read_load_average()) @@ -74,7 +139,7 @@ def refresh(self): self._do_refresh([raw_result]) def _read_load_average(self): - return self._transform_list(os.getloadavg()) + return self._transform_list(self.data['loadavg']) def _load_avg_state(self, row, col): state = {} @@ -111,15 +176,8 @@ def _load_avg_status(self, row, col, val, bound): return True return False - @staticmethod - def _read_cpus(): - cpus = 0 - try: - cpus = cpu_count() - except: - logger.error('multiprocessing does not support cpu_count') - pass - return {'cores': cpus} + def _read_cpus(self): + return {'cores': self.data['ncpus']} def _construct_sysname(self, attname, row, optional): if len(row) < 3: @@ -127,28 +185,17 @@ def _construct_sysname(self, attname, row, optional): return '{0} {1}'.format(row[0], row[2]) def _read_uptime(self): - fp = None - raw_result = [] - try: - fp = open(HostStatCollector.UPTIME_FILE, 'rU') - raw_result = fp.read().split() - except: - logger.error('Unable to read uptime from {0}'.format(HostStatCollector.UPTIME_FILE)) - finally: - fp and fp.close() - return self._transform_input(raw_result, self.transform_uptime_data) + return self._transform_input(self.data['uptime'], self.transform_uptime_data) @staticmethod def _uptime_to_str(uptime): return str(timedelta(seconds=int(float(uptime)))) - @staticmethod - def _read_hostname(): - return {'hostname': socket.gethostname()} + def _read_hostname(self): + return {'hostname': self.data['hostname']} def _read_uname(self): - uname_row = os.uname() - return self._transform_input(uname_row, self.transform_uname_data) + return self._transform_input(self.data['uname'], self.transform_uname_data) def output(self, method): return super(self.__class__, self).output(method, before_string='Host statistics', after_string='\n') diff --git a/pg_view/collectors/memory_collector.py b/pg_view/collectors/memory_collector.py index 8e32270..52a1415 100644 --- a/pg_view/collectors/memory_collector.py +++ b/pg_view/collectors/memory_collector.py @@ -1,14 +1,58 @@ +import psycopg2 + from pg_view.collectors.base_collector import StatCollector from pg_view.loggers import logger +MEMORY_STAT_FILENAME = '/proc/meminfo' + +class LocalMemoryDataSource(object): + def __init__(self): + pass + + def __call__(self): + try: + with open(MEMORY_STAT_FILENAME, 'rU') as f: + for line in f: + yield line.strip() + except IOError: + logger.error('Unable to read {0} memory statistics. Check your permissions'.format(MEMORY_STAT_FILENAME)) + + +class RemoteMemoryDataSource(object): + def __init__(self, pgcon): + self.pgcon = pgcon + + def __call__(self): + """ +CREATE OR REPLACE FUNCTION pgview.get_memory_info(OUT results text) + RETURNS SETOF text + LANGUAGE plpythonu +AS $function$ + try: + with open('/proc/meminfo', 'rU') as f: + for line in f: + yield line.strip() + except: + pass +$function$ + """ + cur = self.pgcon.cursor() + cur.execute("SELECT * FROM pgview.get_memory_info()") + res = [row[0] for row in cur.fetchall()] + cur.close() + self.pgcon.commit() + return res + + class MemoryStatCollector(StatCollector): """ Collect memory-related statistics """ - MEMORY_STAT_FILE = '/proc/meminfo' - - def __init__(self): + def __init__(self, data_source=LocalMemoryDataSource()): super(MemoryStatCollector, self).__init__(produce_diffs=False) + + self.data_source = data_source + self.transform_dict_data = [ {'in': 'MemTotal', 'out': 'total', 'fn': int}, {'in': 'MemFree', 'out': 'free', 'fn': int}, @@ -122,29 +166,22 @@ def _read_memory_data(self): MemTotal, MemFree, Buffers, Cached, Dirty, CommitLimit, Committed_AS """ result = {} - try: - fp = open(MemoryStatCollector.MEMORY_STAT_FILE, 'rU') - for l in fp: - vals = l.strip().split() - if len(vals) >= 2: - name, val = vals[:2] - # if we have units of measurement different from kB - transform the result - if len(vals) == 3 and vals[2] in ('mB', 'gB'): - if vals[2] == 'mB': - val = val + '0' * 3 - if vals[2] == 'gB': - val = val + '0' * 6 - if len(str(name)) > 1: - result[str(name)[:-1]] = val - else: - logger.error('name is too short: {0}'.format(str(name))) + for line in self.data_source(): + vals = line.split() + if len(vals) >= 2: + name, val = vals[:2] + # if we have units of measurement different from kB - transform the result + if len(vals) == 3 and vals[2] in ('mB', 'gB'): + if vals[2] == 'mB': + val = val + '0' * 3 + if vals[2] == 'gB': + val = val + '0' * 6 + if len(str(name)) > 1: + result[str(name)[:-1]] = val else: - logger.error('/proc/meminfo string is not name value: {0}'.format(vals)) - except: - logger.error('Unable to read /proc/meminfo memory statistics. Check your permissions') - return result - finally: - fp.close() + logger.error('name is too short: {0}'.format(str(name))) + else: + logger.error('/proc/meminfo string is not name value: {0}'.format(vals)) return result def calculate_kb_left_until_limit(self, colname, row, optional): diff --git a/pg_view/collectors/system_collector.py b/pg_view/collectors/system_collector.py index 410a87a..219e7cc 100644 --- a/pg_view/collectors/system_collector.py +++ b/pg_view/collectors/system_collector.py @@ -1,16 +1,60 @@ +import psycopg2 + from pg_view.collectors.base_collector import StatCollector from pg_view.loggers import logger +PROC_STAT_FILENAME = '/proc/stat' + +class LocalSystemDataSource(object): + def __init__(self): + pass + + def __call__(self): + try: + # split /proc/stat into the name - value pairs + with open(PROC_STAT_FILENAME, 'rU') as f: + for line in f: + yield line.strip() + except IOError: + logger.error('Unable to read {0}, global data will be unavailable'.format(PROC_STAT_FILENAME)) + + +class RemoteSystemDataSource(object): + def __init__(self, pgcon): + self.pgcon = pgcon + + def __call__(self): + """ +CREATE OR REPLACE FUNCTION pgview.get_system_info(OUT results text) + RETURNS SETOF text + LANGUAGE plpythonu +AS $function$ + try: + with open('/proc/stat', 'rU') as f: + for line in f: + yield line.strip() + except: + pass +$function$ + """ + cur = self.pgcon.cursor() + cur.execute("SELECT * FROM pgview.get_system_info()") + res = [row[0] for row in cur.fetchall()] + cur.close() + self.pgcon.commit() + return res + + class SystemStatCollector(StatCollector): """ Collect global system statistics, i.e. CPU/IO usage, not including memory. """ - PROC_STAT_FILENAME = '/proc/stat' - - def __init__(self): + def __init__(self, data_source=LocalSystemDataSource()): super(SystemStatCollector, self).__init__() + self.data_source = data_source + self.transform_list_data = [ {'out': 'utime', 'in': 0, 'fn': float}, {'out': 'stime', 'in': 2, 'fn': float}, @@ -173,10 +217,8 @@ def _read_proc_stat(self): raw_result = {} result = {} try: - fp = open(SystemStatCollector.PROC_STAT_FILENAME, 'rU') - # split /proc/stat into the name - value pairs - for line in fp: - elements = line.strip().split() + for line in self.data_source(): + elements = line.split() if len(elements) > 2: raw_result[elements[0]] = elements[1:] elif len(elements) > 1: diff --git a/pg_view/utils.py b/pg_view/utils.py index 6d7f3d5..e5ce220 100644 --- a/pg_view/utils.py +++ b/pg_view/utils.py @@ -91,9 +91,10 @@ def process_single_collector(st): def process_groups(groups): for name in groups: - part = groups[name]['partitions'] - pg = groups[name]['pg'] - part.ncurses_set_prefix(pg.ncurses_produce_prefix()) + part = groups[name].get('partitions') + if part: + pg = groups[name]['pg'] + part.ncurses_set_prefix(pg.ncurses_produce_prefix()) def dbversion_as_float(pgcon):