import os import time import json import datetime import subprocess from pythonping import ping from multiprocessing import Process from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS # Categorization Settings NAMESPACE = os.getenv("NAMESPACE", "None") HOSTNAME = os.getenv("HOSTNAME") # InfluxDB Settings DB_ADDRESS = os.getenv("INFLUX_DB_ADDRESS", "influxdb:8086") DB_TOKEN = os.getenv("INFLUX_DB_TOKEN", "") DB_ORG = os.getenv("INFLUX_DB_ORG", "speed-org") DB_BUCKET = os.getenv("INFLUX_DB_BUCKET", "speedtests") DB_TAGS = os.getenv("INFLUX_DB_TAGS", None) DB_VERIFY_SSL = os.getenv("INFLUX_DB_VERIFY_SSL", True) # Speedtest Settings # Time between tests (in minutes, converts to seconds). TEST_INTERVAL = int(os.getenv("SPEEDTEST_INTERVAL", "5")) * 60 # Time before retrying a failed Speedtest (in minutes, converts to seconds). TEST_FAIL_INTERVAL = int(os.getenv("SPEEDTEST_FAIL_INTERVAL", "5")) * 60 # Specific server ID SERVER_ID = os.getenv("SPEEDTEST_SERVER_ID", "") # Time between ping tests (in seconds). PING_INTERVAL = int(os.getenv("PING_INTERVAL", "5")) # Addresses to ping PING_TARGETS = os.getenv("PING_TARGETS", "1.1.1.1, 8.8.8.8") # Logging settings DEBUG = os.getenv("SPEEDTEST_DEBUG", False) client = InfluxDBClient( url=DB_ADDRESS, token=DB_TOKEN, org=DB_ORG, verify_ssl=DB_VERIFY_SSL ) write_api = client.write_api(write_options=SYNCHRONOUS) def verbose_print(statement): if DEBUG: print(statement) def pkt_loss(data): if "packetLoss" in data.keys(): return int(data["packetLoss"]) else: return 0 def tag_selection(data): tags = DB_TAGS options = {} # tag_switch takes in _data and attaches CLIoutput to more readable ids tag_switch = { "host": HOSTNAME, "namespace": NAMESPACE, "isp": data["isp"], "interface": data["interface"]["name"], "internal_ip": data["interface"]["internalIp"], "interface_mac": data["interface"]["macAddr"], "vpn_enabled": (False if data["interface"]["isVpn"] == "false" else True), "external_ip": data["interface"]["externalIp"], "server_id": data["server"]["id"], "server_name": data["server"]["name"], "server_location": data["server"]["location"], "server_country": data["server"]["country"], "server_host": data["server"]["host"], "server_port": data["server"]["port"], "server_ip": data["server"]["ip"], "speedtest_id": data["result"]["id"], "speedtest_url": data["result"]["url"], } if tags is None: tags = "namespace" elif "*" in tags: return tag_switch else: tags = "namespace, " + tags tags = tags.split(",") for tag in tags: # split the tag string, strip and add selected tags to {options} with corresponding tag_switch data tag = tag.strip() options[tag] = tag_switch[tag] return options def format_for_influx(data): # There is additional data in the speedtest-cli output but it is likely not necessary to store. influx_data = [ { "measurement": "packetLoss", "time": data["timestamp"], "fields": {"packetLoss": pkt_loss(data)}, }, { "measurement": "speeds", "time": data["timestamp"], "fields": { "ping_jitter": data["ping"]["jitter"], "ping_latency": data["ping"]["latency"], "packet_loss": pkt_loss(data), "bandwidth_down": data["download"]["bandwidth"], "bytes_down": data["download"]["bytes"], "elapsed_down": data["download"]["elapsed"], "bandwidth_up": data["upload"]["bandwidth"], "bytes_up": data["upload"]["bytes"], "elapsed_up": data["upload"]["elapsed"], }, }, ] tags = tag_selection(data) if tags is not None: for measurement in influx_data: measurement["tags"] = tags return influx_data def speedtest(): if not SERVER_ID: speedtest = subprocess.run( ["speedtest", "--accept-license", "--accept-gdpr", "-f", "json"], capture_output=True, ) verbose_print("Automatic server choice") else: speedtest = subprocess.run( [ "speedtest", "--accept-license", "--accept-gdpr", "-f", "json", "--server-id=" + SERVER_ID, ], capture_output=True, ) verbose_print("Manual server choice : ID = " + SERVER_ID) if speedtest.returncode == 0: # Speedtest was successful. verbose_print("Speedtest Successful :") data_json = json.loads(speedtest.stdout) verbose_print( "time: " + str(data_json["timestamp"]) + " - ping: " + str(data_json["ping"]["latency"]) + " ms - download: " + str(data_json["download"]["bandwidth"]) + " b/s - upload: " + str(data_json["upload"]["bandwidth"]) + " b/s - isp: " + data_json["isp"] + " - ext. IP: " + data_json["interface"]["externalIp"] + " - server id: " + str(data_json["server"]["id"]) + " (" + data_json["server"]["name"] + " @ " + data_json["server"]["location"] + ")" ) data = format_for_influx(data_json) write_api.write(bucket=DB_BUCKET, record=data) verbose_print("Data written to DB") else: # Speedtest failed. print("Speedtest Failed :") print(speedtest.stderr) print(speedtest.stdout) def pingtest(): timestamp = datetime.datetime.utcnow() for target in PING_TARGETS.split(","): target = target.strip() pingtest = ping(target, verbose=False, timeout=1, count=1, size=128) data = [ { "measurement": "pings", "time": timestamp, "tags": {"namespace": NAMESPACE, "target": target}, "fields": { "success": int(pingtest._responses[0].error_message is None), "rtt": float( 0 if pingtest._responses[0].error_message is not None else pingtest.rtt_avg_ms ), }, } ] write_api.write(bucket=DB_BUCKET, record=data) def main(): pPing = Process(target=pingtest) pSpeed = Process(target=speedtest) loopcount = 0 while 1: # Run a Speedtest and send the results to influxDB indefinitely. if PING_INTERVAL != 0 and (loopcount == 0 or loopcount % PING_INTERVAL == 0): if pPing.is_alive(): pPing.terminate() pPing = Process(target=pingtest) pPing.start() if TEST_INTERVAL != 0 and (loopcount == 0 or loopcount % TEST_INTERVAL == 0): if pSpeed.is_alive(): pSpeed.terminate() pSpeed = Process(target=speedtest) pSpeed.start() if loopcount % (PING_INTERVAL * TEST_INTERVAL) == 0: loopcount = 0 time.sleep(1) loopcount += 1 if __name__ == "__main__": print("Speedtest CLI data logger to InfluxDB started...") main() client.close()