From 2d27de83aef2a79138ae41ef6ba68e52570fcdfc Mon Sep 17 00:00:00 2001 From: Leigh Phillips Date: Tue, 19 Jan 2021 23:04:06 -0800 Subject: [PATCH] Update main.py Add uptime ping test (separate interval), unify speed tests under new measurement. --- main.py | 120 +++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 88 insertions(+), 32 deletions(-) diff --git a/main.py b/main.py index 99fd4d2..3eb60e4 100755 --- a/main.py +++ b/main.py @@ -1,7 +1,9 @@ +import os import time import json +import datetime import subprocess -import os +from pythonping import ping from influxdb import InfluxDBClient # InfluxDB Settings @@ -12,6 +14,7 @@ DB_USER = os.getenv('INFLUX_DB_USER', '') DB_PASSWORD = os.getenv('INFLUX_DB_PASSWORD', '') DB_DATABASE = os.getenv('INFLUX_DB_DATABASE', 'speedtests') DB_TAGS = os.getenv('INFLUX_DB_TAGS', None) +PING_TARGETS = os.getenv('PING_TARGETS', '1.1.1.1, 8.8.8.8') # Speedtest Settings # Time between tests (in minutes, converts to seconds). @@ -20,6 +23,8 @@ TEST_INTERVAL = int(os.getenv('SPEEDTEST_INTERVAL', '5')) * 60 TEST_FAIL_INTERVAL = int(os.getenv('SPEEDTEST_FAIL_INTERVAL', '5')) * 60 # Specific server ID SERVER_ID = os.getenv('SPEEDTEST_SERVER_ID', '') +# Time between ping tests (in seconds). +PING_INTERVAL = int(os.getenv('PING_INTERVAL', '5')) influxdb_client = InfluxDBClient( DB_ADDRESS, DB_PORT, DB_USER, DB_PASSWORD, None) @@ -50,7 +55,7 @@ def tag_selection(data): # tag_switch takes in _data and attaches CLIoutput to more readable ids tag_switch = { 'namespace': NAMESPACE, - 'isp': data['isp'], + 'isp': data['isp'], 'interface': data['interface']['name'], 'internal_ip': data['interface']['internalIp'], 'interface_mac': data['interface']['macAddr'], @@ -66,7 +71,7 @@ def tag_selection(data): 'speedtest_id': data['result']['id'], 'speedtest_url': data['result']['url'] } - + if tags is None: tags = 'namespace' elif '*' in tags: @@ -79,12 +84,12 @@ def tag_selection(data): # split the tag string, strip and add selected tags to {options} with corresponding tag_switch data tag = tag.strip() options[tag] = tag_switch[tag] - + return options def format_for_influx(data): - + # There is additional data in the speedtest-cli output but it is likely not necessary to store. influx_data = [ { @@ -121,45 +126,96 @@ def format_for_influx(data): 'fields': { 'packetLoss': pkt_loss(data) } + }, + { + 'measurement': 'speeds', + 'time': data['timestamp'], + 'fields': { + 'jitter': data['ping']['jitter'], + 'latency': data['ping']['latency'], + 'packetLoss': pkt_loss(data), + # Byte to Megabit + 'bandwidth_down': data['download']['bandwidth'] / 125000, + 'bytes_down': data['download']['bytes'], + 'elapsed_down': data['download']['elapsed'], + # Byte to Megabit + 'bandwidth_up': data['upload']['bandwidth'] / 125000, + 'bytes_up': data['upload']['bytes'], + 'elapsed_up': data['upload']['elapsed'] + } } ] tags = tag_selection(data) - if tags is None: - return influx_data - else: + if tags is not None: for measurement in influx_data: measurement['tags'] = tags - return influx_data + return influx_data + + +def speedtest(): + if not SERVER_ID: + speedtest = subprocess.run( + ["speedtest", "--accept-license", "--accept-gdpr", "-f", "json"], capture_output=True) + print("Automatic server choice") + else: + speedtest = subprocess.run( + ["speedtest", "--accept-license", "--accept-gdpr", "-f", "json", "--server-id=" + SERVER_ID], capture_output=True) + print("Manual server choice : ID = " + SERVER_ID) + + if speedtest.returncode == 0: # Speedtest was successful. + print("Speedtest Successful :") + data_json = json.loads(speedtest.stdout) + print("time: " + str(data_json['timestamp']) + " - ping: " + str(data_json['ping']['latency']) + " ms - download: " + str(data_json['download']['bandwidth']/125000) + " Mb/s - upload: " + str(data_json['upload']['bandwidth'] / 125000) + " Mb/s - isp: " + data_json['isp'] + " - ext. IP: " + data_json['interface']['externalIp'] + " - server id: " + str(data_json['server']['id']) + " (" + data_json['server']['name'] + " @ " + data_json['server']['location'] + ")") + data = format_for_influx(data_json) + if influxdb_client.write_points(data) == True: + print("Data written to DB successfully") + else: # Speedtest failed. + print("Speedtest Failed :") + print(speedtest.stderr) + print(speedtest.stdout) +# time.sleep(TEST_FAIL_INTERVAL) + + +def pingtest(): + timestamp = datetime.datetime.utcnow() + for target in PING_TARGETS.split(','): + target = target.strip() + pingtest = ping(target, verbose=False, timeout=1, count=1, size=128) + data = [ + { + 'measurement': 'pings', + 'time': timestamp, + 'tags': { + 'target' : target + }, + 'fields': { + 'success' : pingtest.success(), + 'rtt': pingtest.rtt_avg_ms + } + } + ] + if influxdb_client.write_points(data) == True: + print("Ping data written to DB successfully") + else: # Speedtest failed. + print("Ping Failed.") def main(): init_db() # Setup the database if it does not already exist. + loopcount = 1 while (1): # Run a Speedtest and send the results to influxDB indefinitely. - server_id = SERVER_ID - if not server_id: - speedtest = subprocess.run( - ["speedtest", "--accept-license", "--accept-gdpr", "-f", "json"], capture_output=True) - print("Automatic server choice") - else: - speedtest = subprocess.run( - ["speedtest", "--accept-license", "--accept-gdpr", "-f", "json", "--server-id=" + SERVER_ID], capture_output=True) - print("Manual server choice : ID = " + SERVER_ID) - - if speedtest.returncode == 0: # Speedtest was successful. - print("Speedtest Successful :") - data_json = json.loads(speedtest.stdout) - print("time: " + str(data_json['timestamp']) + " - ping: " + str(data_json['ping']['latency']) + " ms - download: " + str(data_json['download']['bandwidth']/125000) + " Mb/s - upload: " + str(data_json['upload']['bandwidth'] / 125000) + " Mb/s - isp: " + data_json['isp'] + " - ext. IP: " + data_json['interface']['externalIp'] + " - server id: " + str(data_json['server']['id']) + " (" + data_json['server']['name'] + " @ " + data_json['server']['location'] + ")") - data = format_for_influx(data_json) - if influxdb_client.write_points(data) == True: - print("Data written to DB successfully") - time.sleep(TEST_INTERVAL) - else: # Speedtest failed. - print("Speedtest Failed :") - print(speedtest.stderr) - print(speedtest.stdout) - time.sleep(TEST_FAIL_INTERVAL) + if loopcount % PING_INTERVAL == 0: + pingtest() + if loopcount % TEST_INTERVAL == 0: + speedtest() + + if loopcount % ( PING_INTERVAL * TEST_INTERVAL ) == 0: + loopcount = 0 + + time.sleep(1) + loopcount += 1 if __name__ == '__main__': print('Speedtest CLI data logger to InfluxDB started...')