speedtest-to-influxdb2/main.py

172 lines
6.2 KiB
Python
Raw Normal View History

import time
import json
import subprocess
import os
from influxdb import InfluxDBClient
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
# InfluxDB Settings
NAMESPACE = getEnv('NAMESPACE', '')
DB_ADDRESS = getEnv('INFLUX_DB_ADDRESS', 'influxdb')
DB_PORT = int(getEnv('INFLUX_DB_PORT', '8086'))
DB_USER = getEnv('INFLUX_DB_USER', '')
DB_PASSWORD = getEnv('INFLUX_DB_PASSWORD', '')
DB_DATABASE = getEnv('INFLUX_DB_DATABASE', 'speedtests')
DB_TAGS = getEnv('INFLUX_DB_TAGS', '')
# Speedtest Settings
2020-07-31 05:10:51 +00:00
# Time between tests (in minutes, converts to seconds).
TEST_INTERVAL = int(getEnv('SPEEDTEST_INTERVAL', '5')) * 60
2020-07-31 05:10:51 +00:00
# Time before retrying a failed Speedtest (in minutes, converts to seconds).
TEST_FAIL_INTERVAL = int(getEnv('SPEEDTEST_FAIL_INTERVAL', '5')) * 60
2020-10-16 13:30:08 +00:00
# Specific server ID
SERVER_ID = getEnv('SPEEDTEST_SERVER_ID', '')
influxdb_client = InfluxDBClient(
DB_ADDRESS, DB_PORT, DB_USER, DB_PASSWORD, None)
def init_db():
databases = influxdb_client.get_list_database()
if len(list(filter(lambda x: x['name'] == DB_DATABASE, databases))) == 0:
influxdb_client.create_database(
DB_DATABASE) # Create if does not exist.
else:
2020-07-31 03:45:23 +00:00
# Switch to if does exist.
influxdb_client.switch_database(DB_DATABASE)
def pkt_loss(data):
if 'packetLoss' in data.keys():
2020-07-31 18:53:44 +00:00
return int(data['packetLoss'])
2020-07-31 03:45:23 +00:00
else:
return 0
2020-07-31 03:45:23 +00:00
def tag_selection(data):
tags = DB_TAGS
2020-07-31 04:13:14 +00:00
if tags is None:
return None
# tag_switch takes in _data and attaches CLIoutput to more readable ids
2020-07-31 03:45:23 +00:00
tag_switch = {
'namespace': NAMESPACE,
2020-07-31 03:45:23 +00:00
'isp': data['isp'],
'interface': data['interface']['name'],
'internal_ip': data['interface']['internalIp'],
'interface_mac': data['interface']['macAddr'],
'vpn_enabled': (False if data['interface']['isVpn'] == 'false' else True),
'external_ip': data['interface']['externalIp'],
'server_id': data['server']['id'],
'server_name': data['server']['name'],
'server_location': data['server']['location'],
'server_country': data['server']['country'],
'server_host': data['server']['host'],
'server_port': data['server']['port'],
'server_ip': data['server']['ip'],
'speedtest_id': data['result']['id'],
'speedtest_url': data['result']['url']
}
2020-07-31 04:13:14 +00:00
2020-07-31 03:45:23 +00:00
options = {}
tags = 'namespace, ' + tags
2020-07-31 04:13:14 +00:00
tags = tags.split(',')
for tag in tags:
# split the tag string, strip and add selected tags to {options} with corresponding tag_switch data
tag = tag.strip()
options[tag] = tag_switch[tag]
return options
2020-07-31 03:45:23 +00:00
2020-10-16 15:20:32 +00:00
def format_for_influx(data):
# There is additional data in the speedtest-cli output but it is likely not necessary to store.
influx_data = [
{
'measurement': 'ping',
'time': data['timestamp'],
'fields': {
'namespace': NAMESPACE,
'jitter': data['ping']['jitter'],
'latency': data['ping']['latency']
}
},
{
'measurement': 'download',
'time': data['timestamp'],
'fields': {
'namespace': NAMESPACE,
# Byte to Megabit
'bandwidth': data['download']['bandwidth'] / 125000,
'bytes': data['download']['bytes'],
'elapsed': data['download']['elapsed']
}
},
{
'measurement': 'upload',
'time': data['timestamp'],
'fields': {
'namespace': NAMESPACE,
# Byte to Megabit
'bandwidth': data['upload']['bandwidth'] / 125000,
'bytes': data['upload']['bytes'],
'elapsed': data['upload']['elapsed']
}
},
{
'measurement': 'packetLoss',
'time': data['timestamp'],
'fields': {
'namespace': NAMESPACE,
'packetLoss': pkt_loss(data)
}
}
]
2020-07-31 04:13:14 +00:00
tags = tag_selection(data)
if tags is None:
return influx_data
else:
for measurement in influx_data:
measurement['tags'] = tags
return influx_data
def main():
init_db() # Setup the database if it does not already exist.
while (1): # Run a Speedtest and send the results to influxDB indefinitely.
2020-10-16 13:30:08 +00:00
server_id = SERVER_ID
if not server_id:
speedtest = subprocess.run(
["speedtest", "--accept-license", "--accept-gdpr", "-f", "json"], capture_output=True)
2020-10-16 13:30:08 +00:00
print("Automatic server choice")
2020-10-16 15:20:32 +00:00
else:
2020-10-16 13:30:08 +00:00
speedtest = subprocess.run(
["speedtest", "--accept-license", "--accept-gdpr", "-f", "json", "--server-id=" + SERVER_ID], capture_output=True)
2020-10-16 15:20:32 +00:00
print("Manual server choice : ID = " + SERVER_ID)
if speedtest.returncode == 0: # Speedtest was successful.
2020-10-16 15:20:09 +00:00
print("Speedtest Successful :")
2020-10-16 15:20:32 +00:00
data_json = json.loads(speedtest.stdout)
print("time: " + str(data_json['timestamp']) + " - ping: " + str(data_json['ping']['latency']) + " ms - download: " + str(data_json['download']['bandwidth']/125000) + " Mb/s - upload: " + str(data_json['upload']['bandwidth'] / 125000) + " Mb/s - isp: " + data_json['isp'] + " - ext. IP: " + data_json['interface']['externalIp'] + " - server id: " + str(data_json['server']['id']) + " (" + data_json['server']['name'] + " @ " + data_json['server']['location'] + ")")
data = format_for_influx(data_json)
if influxdb_client.write_points(data) == True:
print("Data written to DB successfully")
time.sleep(TEST_INTERVAL)
else: # Speedtest failed.
2020-10-16 15:20:09 +00:00
print("Speedtest Failed :")
print(speedtest.stderr)
print(speedtest.stdout)
time.sleep(TEST_FAIL_INTERVAL)
if __name__ == '__main__':
2020-10-16 15:20:09 +00:00
print('Speedtest CLI data logger to InfluxDB started...')
main()