607 lines
17 KiB
Python
Executable File
607 lines
17 KiB
Python
Executable File
#! /usr/bin/python3
|
|
|
|
import os
|
|
from sys import platform
|
|
|
|
from math import *
|
|
from gps import *
|
|
from datetime import datetime
|
|
from time import time, strftime
|
|
import simpleaudio as sa
|
|
|
|
import gpxpy
|
|
import csv
|
|
import yaml
|
|
|
|
from PySide6.QtGui import *
|
|
from PySide6.QtWidgets import QApplication, QMainWindow, QFileDialog
|
|
from PySide6.QtCore import QObject, Slot, Signal, QThread, QDir
|
|
|
|
from MainWindow import Ui_MainWindow
|
|
|
|
from utils.configs import *
|
|
|
|
##############################################################################
|
|
# audio stuff
|
|
##############################################################################
|
|
|
|
vc_path = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
def play_sound(file):
|
|
wave_obj = sa.WaveObject.from_wave_file(vc_path + "/" + file)
|
|
play_obj = wave_obj.play()
|
|
play_obj.wait_done()
|
|
|
|
##############################################################################
|
|
# gps stuff
|
|
##############################################################################
|
|
|
|
# https://ozzmaker.com/using-python-with-a-gps-receiver-on-a-raspberry-pi/
|
|
|
|
#
|
|
# GPS receiver is ublox from V2V-MD at 10x seconds
|
|
#
|
|
|
|
# http://www.movable-type.co.uk/scripts/latlong.html
|
|
|
|
earth_flatening = 1.0/298.257223563
|
|
earth_radius = 6378137.0
|
|
|
|
def roydistance(lat1, lon1, lat2, lon2):
|
|
"""
|
|
Calculate the great circle distance between two points
|
|
on the earth (specified in decimal degrees)
|
|
"""
|
|
# convert decimal degrees to radians
|
|
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
|
|
|
|
# Roy's method
|
|
f1 = (1.0 - earth_flatening)
|
|
|
|
top = (pow((lon2-lon1), 2) * pow(cos(lat1), 2)) + pow(lat2-lat1, 2)
|
|
bot = pow(sin(lat1), 2) + (pow(f1, 2) * pow(cos(lat1), 2))
|
|
|
|
return f1 * earth_radius * sqrt(top/bot)
|
|
|
|
##############################################################################
|
|
# log
|
|
##############################################################################
|
|
|
|
cfg = ConfigurationFile(__file__)
|
|
|
|
if not os.path.isdir("logs"):
|
|
os.mkdir("logs")
|
|
|
|
##############################################################################
|
|
# virtual cones
|
|
##############################################################################
|
|
|
|
# node from csv (each line is lat, long, distance, next code# (optional)):
|
|
# 42.522040, -83.237594, 5, 1
|
|
|
|
##############################################################################
|
|
# class
|
|
##############################################################################
|
|
|
|
class VirtualCones():
|
|
def __init__(self):
|
|
self.filename = ""
|
|
self.cones = []
|
|
self.num_cones = 0
|
|
self.current_cone = 0
|
|
self.distance = 0.0
|
|
|
|
def clr_cones(self):
|
|
self.cones = []
|
|
self.num_cones = 0
|
|
self.current_cone = 0
|
|
|
|
def add_cone(self, lat, lon, rad):
|
|
self.cones.append( [float(lat), float(lon), float(rad)] )
|
|
self.num_cones += 1
|
|
|
|
def read_csv(self, csv_file):
|
|
file_csv = open(csv_file)
|
|
data_csv = csv.reader(file_csv)
|
|
return list(data_csv)
|
|
|
|
def read_list(self, file):
|
|
self.filename = file
|
|
self.clr_cones()
|
|
self.cones = self.read_csv(file)
|
|
if len(self.cones) == 0:
|
|
return
|
|
for i in range(0, len(self.cones)):
|
|
for j in range(0, 3):
|
|
self.cones[i][j] = float(self.cones[i][j])
|
|
self.num_cones = len(self.cones)
|
|
return self.cones
|
|
|
|
def save_list(self, filename):
|
|
with open(filename, "w") as file:
|
|
for i in range(0, self.num_cones):
|
|
file.write("%11.7f, %11.7f, %4.1f\n" % (self.cones[i][0], self.cones[i][1], self.cones[i][2]))
|
|
file.close()
|
|
|
|
def check_list(self, lat, lon, heading, speed, rng):
|
|
trig = 0
|
|
if self.num_cones == 0:
|
|
return (0, 999.0, 0)
|
|
|
|
# find node in list
|
|
for i in range(self.num_cones):
|
|
# node from csv (each line is lat, long, distance, next code#)
|
|
row = self.cones[i]
|
|
cone_lat = row[0]
|
|
cone_lon = row[1]
|
|
cone_dist = row[2]
|
|
|
|
# distance between vehicle and cone
|
|
self.distance = roydistance(lat, lon, cone_lat, cone_lon)
|
|
|
|
# check distance trigger
|
|
if self.distance < rng:
|
|
# out = "Waypoint %s:%d reached: %f, %f, %f, %d" % (self.filename, self.current_cone, lat, lon, heading, speed)
|
|
# print(out)
|
|
self.current_cone = i
|
|
trig = 1
|
|
if self.buzz_no < self.max_buzz:
|
|
play_sound("alert.wav")
|
|
self.buzz_no = self.buzz_no + 1
|
|
break
|
|
else:
|
|
self.buzz_no = 0
|
|
|
|
return (self.current_cone, self.distance, trig)
|
|
|
|
# def check_list(self, lat, lon, heading, speed, rng):
|
|
# trig = 0
|
|
# if self.num_cones == 0:
|
|
# return (0, 999.0, 0)
|
|
|
|
# # end of list?
|
|
# if self.current_cone >= self.num_cones:
|
|
# self.current_cone = 0
|
|
|
|
# check first node for reset
|
|
# if self.current_cone:
|
|
# node from csv (each line is lat, long, distance)
|
|
# row = self.cones[0]
|
|
# cone_lat = row[0]
|
|
# cone_lon = row[1]
|
|
# cone_dist = row[2]
|
|
# dist = roydistance(lat, lon, cone_lat, cone_lon)
|
|
# if dist < rng:
|
|
# self.current_cone = 0
|
|
|
|
# node from csv (each line is lat, long, distance, next code#)
|
|
# row = self.cones[self.current_cone]
|
|
# cone_lat = row[0]
|
|
# cone_lon = row[1]
|
|
# cone_dist = row[2]
|
|
|
|
# next cone
|
|
# cone_next = self.current_cone + 1
|
|
# try:
|
|
# cone_next = int(row[3])
|
|
# except:
|
|
# pass
|
|
|
|
# distance between vehicle and cone
|
|
# self.distance = roydistance(lat, lon, cone_lat, cone_lon)
|
|
|
|
# check distance trigger
|
|
# if self.distance < rng:
|
|
# out = "Waypoint %s:%d reached: %f, %f, %f, %d" % (self.filename, self.current_cone, lat, lon, heading, speed)
|
|
# print(out)
|
|
# play_sound("alert.wav")
|
|
# self.current_cone = cone_next
|
|
# trig = 1
|
|
|
|
# return (self.current_cone, self.distance, trig)
|
|
|
|
##############################################################################
|
|
# thread
|
|
##############################################################################
|
|
|
|
gpsd_host = "localhost"
|
|
gpsd_report = ()
|
|
|
|
class DataSignal(QObject):
|
|
sig = Signal(int)
|
|
|
|
class DataThread(QThread):
|
|
def __init__(self, parent=None):
|
|
super(DataThread, self).__init__(parent)
|
|
self.new_data = DataSignal()
|
|
# self.new_data.sig.connect(parent.update_data)
|
|
|
|
def run(self):
|
|
global gpsd_report
|
|
gpsd = gps(host=gpsd_host, mode=WATCH_ENABLE|WATCH_NEWSTYLE)
|
|
|
|
while not self.isInterruptionRequested():
|
|
gpsd_report = gpsd.next() # wait for next packet
|
|
self.new_data.sig.emit(1)
|
|
|
|
def stop(self):
|
|
self.requestInterruption()
|
|
self.wait()
|
|
|
|
##############################################################################
|
|
# main
|
|
##############################################################################
|
|
|
|
class MainWindow(QMainWindow, Ui_MainWindow):
|
|
def __init__(self, *args, obj=None, **kwargs):
|
|
super(MainWindow, self).__init__(*args, **kwargs)
|
|
self.setupUi(self)
|
|
|
|
class ConesWindow(MainWindow):
|
|
def __init__(self, *args, obj=None, **kwargs):
|
|
super(ConesWindow, self).__init__(*args, **kwargs)
|
|
|
|
self.setWindowTitle("NTCNA GPS Virtual Cones V2.2")
|
|
|
|
self.vlist1 = VirtualCones()
|
|
self.vlist2 = VirtualCones()
|
|
self.vlist3 = VirtualCones()
|
|
self.vlist4 = VirtualCones()
|
|
self.vlist5 = VirtualCones()
|
|
self.vlist6 = VirtualCones()
|
|
self.vlists = [self.vlist1, self.vlist2, self.vlist3, self.vlist4, self.vlist5, self.vlist6]
|
|
|
|
# button connections
|
|
self.openFiles.clicked.connect(self.read_dialog)
|
|
self.exitButton.clicked.connect(self.exit_button)
|
|
self.testSound.clicked.connect(self.test_sound)
|
|
self.saveConfig.clicked.connect(self.save_config)
|
|
|
|
self.resetList1.clicked.connect(self.reset_list1)
|
|
self.addPoint1.clicked.connect(self.add_point1)
|
|
self.delPoint1.clicked.connect(self.del_point1)
|
|
self.insPoint1.clicked.connect(self.ins_point1)
|
|
self.saveList1.clicked.connect(self.save_list1)
|
|
|
|
self.resetList2.clicked.connect(self.reset_list2)
|
|
self.addPoint2.clicked.connect(self.add_point2)
|
|
self.delPoint2.clicked.connect(self.del_point2)
|
|
self.insPoint2.clicked.connect(self.ins_point2)
|
|
self.saveList2.clicked.connect(self.save_list2)
|
|
|
|
self.resetList3.clicked.connect(self.reset_list3)
|
|
self.addPoint3.clicked.connect(self.add_point3)
|
|
self.delPoint3.clicked.connect(self.del_point3)
|
|
self.insPoint3.clicked.connect(self.ins_point3)
|
|
self.saveList3.clicked.connect(self.save_list3)
|
|
|
|
self.resetList4.clicked.connect(self.reset_list4)
|
|
self.addPoint4.clicked.connect(self.add_point4)
|
|
self.delPoint4.clicked.connect(self.del_point4)
|
|
self.insPoint4.clicked.connect(self.ins_point4)
|
|
self.saveList4.clicked.connect(self.save_list4)
|
|
|
|
self.resetList5.clicked.connect(self.reset_list5)
|
|
self.addPoint5.clicked.connect(self.add_point5)
|
|
self.delPoint5.clicked.connect(self.del_point5)
|
|
self.insPoint5.clicked.connect(self.ins_point5)
|
|
self.saveList5.clicked.connect(self.save_list5)
|
|
|
|
self.resetList6.clicked.connect(self.reset_list6)
|
|
self.addPoint6.clicked.connect(self.add_point6)
|
|
self.delPoint6.clicked.connect(self.del_point6)
|
|
self.insPoint6.clicked.connect(self.ins_point6)
|
|
self.saveList6.clicked.connect(self.save_list6)
|
|
|
|
# makes life easier, avoid redundant code
|
|
self.labels = [self.lblList1, self.lblList2, self.lblList3, self.lblList4, self.lblList5, self.lblList6]
|
|
self.lists = [self.listCones1, self.listCones2, self.listCones3, self.listCones4, self.listCones5, self.listCones6]
|
|
self.dists = [self.txtDist1, self.txtDist2, self.txtDist3, self.txtDist4, self.txtDist5, self.txtDist6]
|
|
self.saves = [self.saveList1, self.saveList2, self.saveList3, self.saveList4, self.saveList5, self.saveList6]
|
|
|
|
for i in range(0, 6):
|
|
self.labels[i].setText("")
|
|
self.saves[i].hide()
|
|
|
|
# txtLatitude
|
|
self.latitude = 0.0
|
|
self.longitude = 0.0
|
|
|
|
# GPS log
|
|
self.gpx = gpxpy.gpx.GPX()
|
|
# Create first track in our GPX:
|
|
self.gpx_track = gpxpy.gpx.GPXTrack()
|
|
self.gpx.tracks.append(self.gpx_track)
|
|
# Create first segment in our GPX track:
|
|
self.gpx_segment = gpxpy.gpx.GPXTrackSegment()
|
|
self.gpx_track.segments.append(self.gpx_segment)
|
|
|
|
self.num_points = 0
|
|
self.my_thread = DataThread()
|
|
|
|
def test_sound(self):
|
|
play_sound('alert.wav')
|
|
|
|
def read_dialog(self):
|
|
filedialog = QFileDialog(self)
|
|
filedialog.setDirectory(QDir.currentPath())
|
|
filedialog.setDefaultSuffix("csv")
|
|
filedialog.setNameFilter("CSV (*.csv);; all (*.*)")
|
|
filedialog.setFileMode(QFileDialog.ExistingFiles)
|
|
selected = filedialog.exec()
|
|
if selected:
|
|
self.load_cones( filedialog.selectedFiles()[0:6] )
|
|
|
|
def write_dialog(self):
|
|
filedialog = QFileDialog(self)
|
|
filedialog.setDirectory(QDir.currentPath())
|
|
filedialog.setDefaultSuffix("csv")
|
|
filedialog.setNameFilter("CSV (*.csv);; all (*.*)")
|
|
selected = filedialog.exec()
|
|
if selected:
|
|
filename = filedialog.selectedFiles()[0]
|
|
return filename
|
|
return ""
|
|
|
|
# slurp in files for each list
|
|
def load_cones(self, args):
|
|
count = 0
|
|
for i in range(0, 6):
|
|
self.labels[i].setText("")
|
|
self.lists[i].clear()
|
|
self.saves[i].hide()
|
|
self.vlists[i].clr_cones()
|
|
|
|
for arg in args:
|
|
try:
|
|
cones = VirtualCones()
|
|
cones.read_list(arg)
|
|
self.vlists[count] = cones
|
|
self.labels[count].setText(os.path.basename(arg))
|
|
self.lists[count].clear()
|
|
|
|
for i in range(0, cones.num_cones):
|
|
self.lists[count].addItem("%11.7f, %11.7f" % (cones.cones[i][0], cones.cones[i][1]))
|
|
self.lists[count].setCurrentRow(0)
|
|
count += 1
|
|
self.show()
|
|
except Exception as e:
|
|
print(e)
|
|
continue
|
|
|
|
def start_thread(self):
|
|
if not self.my_thread.isRunning():
|
|
self.my_thread.new_data.sig.connect(self.update_data)
|
|
self.my_thread.start()
|
|
self.my_thread.setTerminationEnabled(True)
|
|
|
|
def stop_thread(self):
|
|
self.my_thread.stop()
|
|
self.my_thread.wait(3)
|
|
self.save_log()
|
|
|
|
@Slot(list)
|
|
def update_data(self, x):
|
|
global gpsd_report
|
|
report = gpsd_report
|
|
|
|
if report['class'] == 'TPV':
|
|
status = getattr(report, 'status', 0)
|
|
mode = getattr(report, 'mode', 0)
|
|
time = getattr(report, 'time', 0)
|
|
lat = getattr(report, 'lat', 0.0)
|
|
lon = getattr(report, 'lon', 0.0)
|
|
elev = getattr(report, 'altHAE', 0.0) # meters
|
|
heading = getattr(report, 'track', 0.0)
|
|
speed = getattr(report, 'speed', 0.0) # m/sec
|
|
self.latitude = lat
|
|
self.longitude = lon
|
|
|
|
# self.txtStatus.setText(fix_status[status])
|
|
# self.txtMode.setText(fix_mode[mode])
|
|
self.txtTime.setText(time)
|
|
self.txtLatitude.setText("%10.7f" %(lat))
|
|
self.txtLongitude.setText("%10.7f" % (lon))
|
|
# self.txtAltitude.setText("%7.2f" % (elev))
|
|
self.txtHeading.setText("%7.2f" % (heading))
|
|
self.txtSpeed.setText("%7.2f" % (speed))
|
|
|
|
rng = self.txtRange.text()
|
|
if rng == "":
|
|
rng = 3.0
|
|
else:
|
|
rng = float(rng)
|
|
|
|
for i in range(0, 6):
|
|
if self.lists[i].count() == 0:
|
|
continue
|
|
(cone, dist, trig) = self.vlists[i].check_list(lat, lon, heading, speed, rng)
|
|
if trig:
|
|
self.lists[i].setCurrentRow(cone)
|
|
if dist < 10000.0:
|
|
self.dists[i].setText("%6.1f" % (dist))
|
|
else:
|
|
self.dists[i].setText("OOR")
|
|
|
|
if self.logEnabled.isChecked():
|
|
point = gpxpy.gpx.GPXTrackPoint(lat, lon, elevation=elev)
|
|
point.time = datetime.now()
|
|
self.gpx_segment.points.append(point)
|
|
self.num_points += 1
|
|
|
|
self.show()
|
|
|
|
def reset_list1(self):
|
|
self.listCones1.current_cone = 0
|
|
self.lists[0].setCurrentRow(0)
|
|
|
|
def reset_list2(self):
|
|
self.listCones2.current_cone = 0
|
|
self.lists[1].setCurrentRow(0)
|
|
|
|
def reset_list3(self):
|
|
self.listCones3.current_cone = 0
|
|
self.lists[2].setCurrentRow(0)
|
|
|
|
def reset_list4(self):
|
|
self.listCones4.current_cone = 0
|
|
self.lists[3].setCurrentRow(0)
|
|
|
|
def reset_list5(self):
|
|
self.listCones5.current_cone = 0
|
|
self.lists[4].setCurrentRow(0)
|
|
|
|
def reset_list6(self):
|
|
self.listCones6.current_cone = 0
|
|
self.lists[5].setCurrentRow(0)
|
|
|
|
def update_list(self, list):
|
|
self.vlists[list].clr_cones()
|
|
for i in range(0, self.lists[list].count()):
|
|
item = self.lists[list].item(i)
|
|
txt = item.text()
|
|
lat, lon = txt.split(", ")
|
|
self.vlists[list].add_cone(lat, lon, float(self.txtRange.text()))
|
|
|
|
def add_point(self, list, add):
|
|
self.lists[list].insertItem(self.lists[list].currentRow()+add, "%11.7f, %11.7f" % (self.latitude, self.longitude))
|
|
self.saves[list].show()
|
|
self.update_list(list)
|
|
|
|
def del_point(self, list):
|
|
listItems = self.lists[list].selectedItems()
|
|
if not listItems: return
|
|
for item in listItems:
|
|
self.lists[list].takeItem(self.lists[list].row(item))
|
|
self.saves[list].show()
|
|
self.update_list(list)
|
|
|
|
def save_list(self, list):
|
|
filename = self.labels[list].text()
|
|
if filename == "":
|
|
filename = self.write_dialog()
|
|
if filename == "":
|
|
return
|
|
filename = os.path.basename(filename)
|
|
self.labels[list].setText(filename)
|
|
self.vlists[list].save_list(filename)
|
|
self.saves[list].hide()
|
|
|
|
def add_point1(self):
|
|
self.add_point(0, 1)
|
|
|
|
def del_point1(self):
|
|
self.del_point(0)
|
|
|
|
def ins_point1(self):
|
|
self.add_point(0, 0)
|
|
|
|
def save_list1(self):
|
|
self.save_list(0)
|
|
|
|
def add_point2(self):
|
|
self.add_point(1, 1)
|
|
|
|
def del_point2(self):
|
|
self.del_point(1)
|
|
|
|
def ins_point2(self):
|
|
self.add_point(1, 0)
|
|
|
|
def save_list2(self):
|
|
self.save_list(1)
|
|
|
|
def add_point3(self):
|
|
self.add_point(2, 1)
|
|
|
|
def del_point3(self):
|
|
self.del_point(2)
|
|
|
|
def ins_point3(self):
|
|
self.add_point(2, 0)
|
|
|
|
def save_list3(self):
|
|
self.save_list(2)
|
|
|
|
def add_point4(self):
|
|
self.add_point(3, 1)
|
|
|
|
def del_point4(self):
|
|
self.del_point(3)
|
|
|
|
def ins_point4(self):
|
|
self.add_point(3, 0)
|
|
|
|
def save_list4(self):
|
|
self.save_list(3)
|
|
|
|
def add_point5(self):
|
|
self.add_point(4, 1)
|
|
|
|
def del_point5(self):
|
|
self.del_point(4)
|
|
|
|
def ins_point5(self):
|
|
self.add_point(4, 0)
|
|
|
|
def save_list5(self):
|
|
self.save_list(4)
|
|
|
|
def add_point6(self):
|
|
self.add_point(5, 1)
|
|
|
|
def del_point6(self):
|
|
self.del_point(5)
|
|
|
|
def ins_point6(self):
|
|
self.add_point(5, 0)
|
|
|
|
def save_list6(self):
|
|
self.save_list(5)
|
|
|
|
def save_config(self):
|
|
config = {}
|
|
if gpsd_host != "localhost":
|
|
config['gpsd_host'] = gpsd_host
|
|
config['lists'] = []
|
|
for i in range(0, 6):
|
|
if self.labels[i].text() != "":
|
|
self.save_list(i)
|
|
config['lists'].append( self.labels[i].text() )
|
|
cfg.write_config(config)
|
|
|
|
def save_log(self):
|
|
if self.num_points:
|
|
log = open(strftime("logs/way-%Y%m%d-%H%M.gpx"), 'w')
|
|
log.write(self.gpx.to_xml())
|
|
log.close()
|
|
|
|
def closeEvent(self, event):
|
|
self.stop_thread()
|
|
event.accept()
|
|
sys.exit(0)
|
|
|
|
def exit_button(self):
|
|
self.stop_thread()
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app = QApplication(sys.argv)
|
|
|
|
window = ConesWindow()
|
|
window.show()
|
|
|
|
config = cfg.read_config()
|
|
if 'gpsd_host' in config:
|
|
gpsd_host = config['gpsd_host']
|
|
|
|
if len(sys.argv[1:]):
|
|
window.load_cones(sys.argv[1:])
|
|
elif 'lists' in config:
|
|
window.load_cones(config['lists'])
|
|
|
|
window.start_thread()
|
|
ret = app.exec()
|
|
sys.exit(ret)
|