Update for influxdb2

Updated environment options and readme
This commit is contained in:
Marty Oehme 2021-05-16 13:50:03 +02:00
parent 42d0ea02a3
commit 0def56d219
Signed by: Marty
GPG Key ID: B7538B8F50A1C800
3 changed files with 164 additions and 155 deletions

View File

@ -11,7 +11,7 @@ apt-get update && \
apt-get -q -y install --no-install-recommends apt-utils gnupg1 apt-transport-https dirmngr && \
\
# Install Python packages
pip3 install pythonping influxdb && \
pip3 install pythonping influxdb-client && \
\
# Clean up
apt-get -q -y autoremove && apt-get -q -y clean && \

View File

@ -1,20 +1,25 @@
# Speedtest to InfluxDB
# Speedtest to InfluxDBv2
---
This is a Python script that will continuously run the official Speedtest CLI application by Ookla, takes input from environment variables, formats data and writes it to an InfluxDB database.
This is a Python script that will continuously run the official Speedtest CLI application by Ookla, takes input from environment variables, formats data and writes it to an InfluxDB v2 database.
This script will allow you to measure your internet connections speed and consistency over time. It uses env variables as configuration. It's as easy to use as telling your Docker server a 1 line command and you'll be set. Using Grafana you can start exploring this data easily.
This script will allow you to measure your internet connections speed and consistency over time. It uses env variables as configuration. It's as easy to use as telling your Docker server a 1 line command and you'll be set. Using Grafana you can start exploring this data easily.
I built a grafana dashboard for this data that can be found at https://grafana.com/grafana/dashboards/13053. Additionally, other contributors have modified this dash and included a JSON file of those modifications. Use `GrafanaDash-SpeedTests.json` to import that dash into Grafana.
Breadlysm built a grafana dashboard for this data that can be found at https://grafana.com/grafana/dashboards/13053. Additionally, other contributors have modified this dash and included a JSON file of those modifications. Use `GrafanaDash-SpeedTests.json` to import that dash into Grafana.
![OriginalDash](https://user-images.githubusercontent.com/3665468/116284820-8038ca00-a75b-11eb-9b30-4a9d26434f8d.png)
![Variant](https://user-images.githubusercontent.com/945191/105287048-46f52a80-5b6c-11eb-9e57-038d63b67efb.png)
There are some added features to allow some additional details that Ookla provides as tags on your data. Some examples are your current ISP, the interface being used, the server who hosted the test. Overtime, you could identify if some serers are performing better than others.
There are some added features to allow some additional details that Ookla provides as tags on your data. Some examples are your current ISP, the interface being used, the server who hosted the test. Overtime, you could identify if some serers are performing better than others.
## Differences to InfluxDB v1 program:
* Slightly different environment variables to set up InfluxDB connection
* Speeds will be returned in bytes without transforming into MBit/s, just like the speedtest does
* The measure duplication of `speeds>bandwith_up/down` and `download>bandwith` and `upload>bandwith` has been removed, leaving only the `speeds` fields.
* Setting a ping/speedtest interval of 0 will turn the respective test off
* By default only failed tests will be printed to stdout/stderr, this can be controlled through `SPEEDTEST_DEBUG`
## Configuring the script
@ -22,25 +27,26 @@ The InfluxDB connection settings are controlled by environment variables.
The variables available are:
- NAMESPACE = default - None
- INFLUX_DB_ADDRESS = default - influxdb
- INFLUX_DB_PORT = default - 8086
- INFLUX_DB_USER = default - {blank}
- INFLUX_DB_PASSWORD = default - {blank}
- INFLUX_DB_DATABASE = default - speedtests
- INFLUX_DB_TAGS = default - None * See below for options, '*' widcard for all *
- INFLUX_DB_ADDRESS = default - influxdb:8086
- INFLUX_DB_TOKEN = no default, *required* for any operation
- INFLUX_DB_ORG = default - speed-org
- INFLUX_DB_BUCKET = default - speedtests
- INFLUX_DB_VERIFY_SSL = default - True
- INFLUX_DB_TAGS = default - None * See below for options, `'*'` widcard for all *
- SPEEDTEST_INTERVAL = default - 5 (minutes)
- SPEEDTEST_SERVER_ID = default - {blank} * id from https://c.speedtest.net/speedtest-servers-static.php *
- PING_INTERVAL = default - 5 (seconds)
- PING_TARGETS = default - 1.1.1.1, 8.8.8.8 (csv of hosts to ping)
- SPEEDTEST_DEBUG = default - False
### Variable Notes
- Intervals are in minutes. *Script will convert it to seconds.*
- If any variables are not needed, don't declare them. Functions will operate with or without most variables.
- If any variables are not needed, don't declare them. Functions will operate with or without most variables.
- Tags should be input without quotes. *INFLUX_DB_TAGS = isp, interface, external_ip, server_name, speedtest_url*
- NAMESPACE is used to collect data from multiple instances of the container into one database and select which you wish to view in Grafana. i.e. I have one monitoring my Starlink, the other my TELUS connection.
- NAMESPACE is used to collect data from multiple instances of the container into one database and select which you wish to view in Grafana. i.e. Breadlysm has one monitoring their Starlink, the other their TELUS connection.
### Tag Options
The Ookla speedtest app provides a nice set of data beyond the upload and download speed. The list is below.
The Ookla speedtest app provides a nice set of data beyond the upload and download speed. The list is below.
| Tag Name | Description |
|- |- |
@ -65,33 +71,30 @@ Be aware that this script will automatically accept the license and GDPR stateme
## Running the Script
### Ideal option, run as a Docker container.
### Ideal option, run as a Docker container.
1. Build the container.
`docker build -t breadlysm/speedtest-to-influxdb ./`
`docker build -t speedtest-to-influxdbv2 ./`
2. Run the container.
```
docker run -d -t --name speedflux \
-e 'NAMESPACE'='None' \
-e 'INFLUX_DB_ADDRESS'='influxdb' \
-e 'INFLUX_DB_PORT'='8086' \
-e 'INFLUX_DB_USER'='_influx_user_' \
-e 'INFLUX_DB_PASSWORD'='_influx_pass_' \
-e 'INFLUX_DB_DATABASE'='speedtests' \
-e 'INFLUX_DB_ADDRESS'='influxdb:8086' \
-e 'INFLUX_DB_ORG'='my-awesome-org' \
-e 'INFLUX_DB_TOKEN'='1NCR3D1BLY-L0NG-T0K3N-ST71NG' \
-e 'INFLUX_DB_BUCKET'='speedtests' \
-e 'SPEEDTEST_INTERVAL'='5' \
-e 'SPEEDTEST_FAIL_INTERVAL'='5' \
-e 'SPEEDTEST_SERVER_ID'='12746' \
breadlysm/speedtest-to-influxdb
speedtest-to-influxdbv2
```
<br>
<br>
As of now, the container is also reachable through this repository's container registry:
<sup><sub>**Pull Requests**</sub></sup>
`docker run <FLAGS> registry.gitlab.com/marty-oehme/speedtest-to-influxdb2:master`
<sub><sup>I will accept pull requests as long as core functionality and settings remain the same. Changes should be in addition to corefunctionality. I don't want a situation where a script auto-updates and ruins months/years of data or causes other headaches. Feel free to add yourself as contributing but I ask that links to containers do not change.</sub></sup>
---

254
main.py
View File

@ -4,47 +4,48 @@ import json
import datetime
import subprocess
from pythonping import ping
from influxdb import InfluxDBClient
from multiprocessing import Process
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
# InfluxDB Settings
NAMESPACE = os.getenv('NAMESPACE', 'None')
DB_ADDRESS = os.getenv('INFLUX_DB_ADDRESS', 'influxdb')
DB_PORT = int(os.getenv('INFLUX_DB_PORT', '8086'))
DB_USER = os.getenv('INFLUX_DB_USER', '')
DB_PASSWORD = os.getenv('INFLUX_DB_PASSWORD', '')
DB_DATABASE = os.getenv('INFLUX_DB_DATABASE', 'speedtests')
DB_TAGS = os.getenv('INFLUX_DB_TAGS', None)
PING_TARGETS = os.getenv('PING_TARGETS', '1.1.1.1, 8.8.8.8')
NAMESPACE = os.getenv("NAMESPACE", "None")
DB_ADDRESS = os.getenv("INFLUX_DB_ADDRESS", "influxdb:8086")
DB_TOKEN = os.getenv("INFLUX_DB_TOKEN", "")
DB_ORG = os.getenv("INFLUX_DB_ORG", "speed-org")
DB_BUCKET = os.getenv("INFLUX_DB_BUCKET", "speedtests")
DB_TAGS = os.getenv("INFLUX_DB_TAGS", None)
DB_VERIFY_SSL = os.getenv("INFLUX_DB_VERIFY_SSL", True)
# Speedtest Settings
# Time between tests (in minutes, converts to seconds).
TEST_INTERVAL = int(os.getenv('SPEEDTEST_INTERVAL', '5')) * 60
TEST_INTERVAL = int(os.getenv("SPEEDTEST_INTERVAL", "5")) * 60
# Time before retrying a failed Speedtest (in minutes, converts to seconds).
TEST_FAIL_INTERVAL = int(os.getenv('SPEEDTEST_FAIL_INTERVAL', '5')) * 60
TEST_FAIL_INTERVAL = int(os.getenv("SPEEDTEST_FAIL_INTERVAL", "5")) * 60
# Specific server ID
SERVER_ID = os.getenv('SPEEDTEST_SERVER_ID', '')
SERVER_ID = os.getenv("SPEEDTEST_SERVER_ID", "")
# Time between ping tests (in seconds).
PING_INTERVAL = int(os.getenv('PING_INTERVAL', '5'))
PING_INTERVAL = int(os.getenv("PING_INTERVAL", "5"))
# Addresses to ping
PING_TARGETS = os.getenv("PING_TARGETS", "1.1.1.1, 8.8.8.8")
influxdb_client = InfluxDBClient(
DB_ADDRESS, DB_PORT, DB_USER, DB_PASSWORD, None)
# Logging settings
DEBUG = os.getenv("SPEEDTEST_DEBUG", False)
client = InfluxDBClient(
url=DB_ADDRESS, token=DB_TOKEN, org=DB_ORG, verify_ssl=DB_VERIFY_SSL
)
write_api = client.write_api(write_options=SYNCHRONOUS)
def init_db():
databases = influxdb_client.get_list_database()
if len(list(filter(lambda x: x['name'] == DB_DATABASE, databases))) == 0:
influxdb_client.create_database(
DB_DATABASE) # Create if does not exist.
else:
# Switch to if does exist.
influxdb_client.switch_database(DB_DATABASE)
def verbose_print(statement):
if DEBUG:
print(statement)
def pkt_loss(data):
if 'packetLoss' in data.keys():
return int(data['packetLoss'])
if "packetLoss" in data.keys():
return int(data["packetLoss"])
else:
return 0
@ -55,32 +56,32 @@ def tag_selection(data):
# tag_switch takes in _data and attaches CLIoutput to more readable ids
tag_switch = {
'namespace': NAMESPACE,
'isp': data['isp'],
'interface': data['interface']['name'],
'internal_ip': data['interface']['internalIp'],
'interface_mac': data['interface']['macAddr'],
'vpn_enabled': (False if data['interface']['isVpn'] == 'false' else True),
'external_ip': data['interface']['externalIp'],
'server_id': data['server']['id'],
'server_name': data['server']['name'],
'server_location': data['server']['location'],
'server_country': data['server']['country'],
'server_host': data['server']['host'],
'server_port': data['server']['port'],
'server_ip': data['server']['ip'],
'speedtest_id': data['result']['id'],
'speedtest_url': data['result']['url']
"namespace": NAMESPACE,
"isp": data["isp"],
"interface": data["interface"]["name"],
"internal_ip": data["interface"]["internalIp"],
"interface_mac": data["interface"]["macAddr"],
"vpn_enabled": (False if data["interface"]["isVpn"] == "false" else True),
"external_ip": data["interface"]["externalIp"],
"server_id": data["server"]["id"],
"server_name": data["server"]["name"],
"server_location": data["server"]["location"],
"server_country": data["server"]["country"],
"server_host": data["server"]["host"],
"server_port": data["server"]["port"],
"server_ip": data["server"]["ip"],
"speedtest_id": data["result"]["id"],
"speedtest_url": data["result"]["url"],
}
if tags is None:
tags = 'namespace'
elif '*' in tags:
tags = "namespace"
elif "*" in tags:
return tag_switch
else:
tags = 'namespace, ' + tags
tags = "namespace, " + tags
tags = tags.split(',')
tags = tags.split(",")
for tag in tags:
# split the tag string, strip and add selected tags to {options} with corresponding tag_switch data
tag = tag.strip()
@ -94,62 +95,38 @@ def format_for_influx(data):
# There is additional data in the speedtest-cli output but it is likely not necessary to store.
influx_data = [
{
'measurement': 'ping',
'time': data['timestamp'],
'fields': {
'jitter': data['ping']['jitter'],
'latency': data['ping']['latency']
}
"measurement": "ping",
"time": data["timestamp"],
"fields": {
"jitter": data["ping"]["jitter"],
"latency": data["ping"]["latency"],
},
},
{
'measurement': 'download',
'time': data['timestamp'],
'fields': {
# Byte to Megabit
'bandwidth': data['download']['bandwidth'] / 125000,
'bytes': data['download']['bytes'],
'elapsed': data['download']['elapsed']
}
"measurement": "packetLoss",
"time": data["timestamp"],
"fields": {"packetLoss": pkt_loss(data)},
},
{
'measurement': 'upload',
'time': data['timestamp'],
'fields': {
# Byte to Megabit
'bandwidth': data['upload']['bandwidth'] / 125000,
'bytes': data['upload']['bytes'],
'elapsed': data['upload']['elapsed']
}
"measurement": "speeds",
"time": data["timestamp"],
"fields": {
"jitter": data["ping"]["jitter"],
"latency": data["ping"]["latency"],
"packetLoss": pkt_loss(data),
"bandwidth_down": data["download"]["bandwidth"],
"bytes_down": data["download"]["bytes"],
"elapsed_down": data["download"]["elapsed"],
"bandwidth_up": data["upload"]["bandwidth"],
"bytes_up": data["upload"]["bytes"],
"elapsed_up": data["upload"]["elapsed"],
},
},
{
'measurement': 'packetLoss',
'time': data['timestamp'],
'fields': {
'packetLoss': pkt_loss(data)
}
},
{
'measurement': 'speeds',
'time': data['timestamp'],
'fields': {
'jitter': data['ping']['jitter'],
'latency': data['ping']['latency'],
'packetLoss': pkt_loss(data),
# Byte to Megabit
'bandwidth_down': data['download']['bandwidth'] / 125000,
'bytes_down': data['download']['bytes'],
'elapsed_down': data['download']['elapsed'],
# Byte to Megabit
'bandwidth_up': data['upload']['bandwidth'] / 125000,
'bytes_up': data['upload']['bytes'],
'elapsed_up': data['upload']['elapsed']
}
}
]
tags = tag_selection(data)
if tags is not None:
for measurement in influx_data:
measurement['tags'] = tags
measurement["tags"] = tags
return influx_data
@ -157,77 +134,106 @@ def format_for_influx(data):
def speedtest():
if not SERVER_ID:
speedtest = subprocess.run(
["speedtest", "--accept-license", "--accept-gdpr", "-f", "json"], capture_output=True)
print("Automatic server choice")
["speedtest", "--accept-license", "--accept-gdpr", "-f", "json"],
capture_output=True,
)
verbose_print("Automatic server choice")
else:
speedtest = subprocess.run(
["speedtest", "--accept-license", "--accept-gdpr", "-f", "json", "--server-id=" + SERVER_ID], capture_output=True)
print("Manual server choice : ID = " + SERVER_ID)
[
"speedtest",
"--accept-license",
"--accept-gdpr",
"-f",
"json",
"--server-id=" + SERVER_ID,
],
capture_output=True,
)
verbose_print("Manual server choice : ID = " + SERVER_ID)
if speedtest.returncode == 0: # Speedtest was successful.
print("Speedtest Successful :")
verbose_print("Speedtest Successful :")
data_json = json.loads(speedtest.stdout)
print("time: " + str(data_json['timestamp']) + " - ping: " + str(data_json['ping']['latency']) + " ms - download: " + str(data_json['download']['bandwidth']/125000) + " Mb/s - upload: " + str(data_json['upload']['bandwidth'] / 125000) + " Mb/s - isp: " + data_json['isp'] + " - ext. IP: " + data_json['interface']['externalIp'] + " - server id: " + str(data_json['server']['id']) + " (" + data_json['server']['name'] + " @ " + data_json['server']['location'] + ")")
verbose_print(
"time: "
+ str(data_json["timestamp"])
+ " - ping: "
+ str(data_json["ping"]["latency"])
+ " ms - download: "
+ str(data_json["download"]["bandwidth"])
+ " Mb/s - upload: "
+ str(data_json["upload"]["bandwidth"])
+ " Mb/s - isp: "
+ data_json["isp"]
+ " - ext. IP: "
+ data_json["interface"]["externalIp"]
+ " - server id: "
+ str(data_json["server"]["id"])
+ " ("
+ data_json["server"]["name"]
+ " @ "
+ data_json["server"]["location"]
+ ")"
)
data = format_for_influx(data_json)
if influxdb_client.write_points(data) == True:
print("Data written to DB successfully")
write_api.write(bucket=DB_BUCKET, record=data)
verbose_print("Data written to DB")
else: # Speedtest failed.
print("Speedtest Failed :")
print(speedtest.stderr)
print(speedtest.stdout)
# time.sleep(TEST_FAIL_INTERVAL)
def pingtest():
timestamp = datetime.datetime.utcnow()
for target in PING_TARGETS.split(','):
for target in PING_TARGETS.split(","):
target = target.strip()
pingtest = ping(target, verbose=False, timeout=1, count=1, size=128)
data = [
{
'measurement': 'pings',
'time': timestamp,
'tags': {
'namespace': NAMESPACE,
'target' : target
"measurement": "pings",
"time": timestamp,
"tags": {"namespace": NAMESPACE, "target": target},
"fields": {
"success": int(pingtest._responses[0].error_message is None),
"rtt": float(
0
if pingtest._responses[0].error_message is not None
else pingtest.rtt_avg_ms
),
},
'fields': {
'success' : int(pingtest._responses[0].error_message is None),
'rtt': float(0 if pingtest._responses[0].error_message is not None else pingtest.rtt_avg_ms)
}
}
]
if influxdb_client.write_points(data) == True:
print("Ping data written to DB successfully")
else: # Speedtest failed.
print("Ping Failed.")
write_api.write(bucket=DB_BUCKET, record=data)
def main():
pPing = Process(target=pingtest)
pSpeed = Process(target=speedtest)
init_db() # Setup the database if it does not already exist.
loopcount = 0
while (1): # Run a Speedtest and send the results to influxDB indefinitely.
if loopcount == 0 or loopcount % PING_INTERVAL == 0:
while 1: # Run a Speedtest and send the results to influxDB indefinitely.
if PING_INTERVAL != 0 and (loopcount == 0 or loopcount % PING_INTERVAL == 0):
if pPing.is_alive():
pPing.terminate()
pPing = Process(target=pingtest)
pPing.start()
if loopcount == 0 or loopcount % TEST_INTERVAL == 0:
if TEST_INTERVAL != 0 and (loopcount == 0 or loopcount % TEST_INTERVAL == 0):
if pSpeed.is_alive():
pSpeed.terminate()
pSpeed = Process(target=speedtest)
pSpeed.start()
if loopcount % ( PING_INTERVAL * TEST_INTERVAL ) == 0:
if loopcount % (PING_INTERVAL * TEST_INTERVAL) == 0:
loopcount = 0
time.sleep(1)
loopcount += 1
if __name__ == '__main__':
print('Speedtest CLI data logger to InfluxDB started...')
if __name__ == "__main__":
print("Speedtest CLI data logger to InfluxDB started...")
main()
if client then client.close()