from juham.base import Base
from juham.web.rthread import RThread, IWorkerThread
from influxdb_client_3 import Point
import json
import math
import time
import atexit
class RTrackerThread(IWorkerThread):
"""A tracker simulation thread generating and publishing geographic
coordinates.
Args:
IWorkerThread (class): super class
"""
_class_id = "RTrackerThread"
def __init__(self, client=None):
super().__init__(client)
self.sensor_topic = None
self.topic = None
self.interval = 60
self.lon = 25.63
self.lat = 60.95
self.rad = 3
def update(self):
super().update()
self.update_track(1, "fixed", 0.5, 1.0, 0, 0)
self.update_track(2, "rotary", 0.7, 1.8, -0.9, 0)
def update_track(self, id, type, radLon, radLat, offLon, offLat):
epoc = time.time()
rec = {
"ts": epoc,
"lon": math.sin(epoc / 360000.0) * math.sin(epoc * 0.001) * radLon
+ self.lon
+ offLon,
"lat": 0.5 * math.cos(epoc / 360000.0) * math.sin(epoc * 0.001) * radLat
+ self.lat
+ offLat,
"alt": math.cos(epoc / 360000.0) * (10 * id) + 100,
"fom": math.cos(epoc / 360000.0) * (0.1 * id) * 10 + 100,
"type": type,
"id": str(id),
}
self.mqtt_client.publish(self.topic, json.dumps(rec), qos=1, retain=False)
self.debug("Track " + str(id) + " moved")
@classmethod
def register(cls):
Base.register_class(cls._class_id, cls)
[docs]
class RTracker(RThread):
"""A tracker automation object. Spawns async thread to generate geographic
coordinates at specific rate, and writes them to time series database.
Args:
RThread (class): super class
"""
_class_id = None
workerThreadId = RTrackerThread.get_class_id()
lon = 25.636786
lat = 60.968117
rad = 3
update_interval = 60
topic = Base.mqtt_root_topic + "/tracks"
def __init__(self, name="rtracker"):
super().__init__(name)
[docs]
def on_connect(self, client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
if rc == 0:
self.subscribe(self.topic)
[docs]
def on_message(self, client, userdata, msg):
if msg.topic == self.topic:
em = json.loads(msg.payload.decode())
self.on_sensor(em)
else:
super().on_message(client, userdata, msg)
[docs]
def on_sensor(self, msg):
point = (
Point("track")
.tag("id", msg["id"])
.field("lon", msg["lon"])
.field("lat", msg["lat"])
.field("alt", msg["alt"])
.field("type", msg["type"])
.field("fom", msg["fom"])
.time(self.epoc2utc(msg["ts"]))
)
self.write(point)
self.debug(
f"Track {msg['type']} {msg['lat']} {msg['lon']} recorded to timeseries"
)
[docs]
def run(self):
self.worker = Base.instantiate(RTracker.workerThreadId)
self.worker.lon = self.lon
self.worker.lat = self.lat
self.worker.rad = self.rad
self.worker.interval = self.update_interval
self.worker.topic = self.topic
super().run()
[docs]
@classmethod
def register(cls):
if cls._class_id is None:
RThread.register()
RTrackerThread.register()
cls.initialize_class()