#! /usr/bin/python3 import os from sys import platform from math import * from gps import * from datetime import datetime from time import time, strftime import simpleaudio as sa import gpxpy import csv import yaml from PySide6.QtGui import * from PySide6.QtWidgets import QApplication, QMainWindow, QFileDialog from PySide6.QtCore import QObject, Slot, Signal, QThread, QDir from MainWindow import Ui_MainWindow from utils.configs import * ############################################################################## # audio stuff ############################################################################## vc_path = os.path.dirname(os.path.realpath(__file__)) def play_sound(file): wave_obj = sa.WaveObject.from_wave_file(vc_path + "/" + file) play_obj = wave_obj.play() play_obj.wait_done() ############################################################################## # gps stuff ############################################################################## # https://ozzmaker.com/using-python-with-a-gps-receiver-on-a-raspberry-pi/ # # GPS receiver is ublox from V2V-MD at 10x seconds # # http://www.movable-type.co.uk/scripts/latlong.html earth_flatening = 1.0/298.257223563 earth_radius = 6378137.0 def roydistance(lat1, lon1, lat2, lon2): """ Calculate the great circle distance between two points on the earth (specified in decimal degrees) """ # convert decimal degrees to radians lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2]) # Roy's method f1 = (1.0 - earth_flatening) top = (pow((lon2-lon1), 2) * pow(cos(lat1), 2)) + pow(lat2-lat1, 2) bot = pow(sin(lat1), 2) + (pow(f1, 2) * pow(cos(lat1), 2)) return f1 * earth_radius * sqrt(top/bot) ############################################################################## # log ############################################################################## cfg = ConfigurationFile(__file__) if not os.path.isdir("logs"): os.mkdir("logs") ############################################################################## # virtual cones ############################################################################## # node from csv (each line is lat, long, distance, next code# (optional)): # 42.522040, -83.237594, 5, 1 ############################################################################## # class ############################################################################## class VirtualCones(): def __init__(self): self.filename = "" self.cones = [] self.num_cones = 0 self.current_cone = 0 self.distance = 0.0 def clr_cones(self): self.cones = [] self.num_cones = 0 self.current_cone = 0 def add_cone(self, lat, lon, rad): self.cones.append( [float(lat), float(lon), float(rad)] ) self.num_cones += 1 def read_csv(self, csv_file): file_csv = open(csv_file) data_csv = csv.reader(file_csv) return list(data_csv) def read_list(self, file): self.filename = file self.clr_cones() self.cones = self.read_csv(file) if len(self.cones) == 0: return for i in range(0, len(self.cones)): for j in range(0, 3): self.cones[i][j] = float(self.cones[i][j]) self.num_cones = len(self.cones) return self.cones def save_list(self, filename): with open(filename, "w") as file: for i in range(0, self.num_cones): file.write("%11.7f, %11.7f, %4.1f\n" % (self.cones[i][0], self.cones[i][1], self.cones[i][2])) file.close() def check_list(self, lat, lon, heading, speed, rng): trig = 0 if self.num_cones == 0: return (0, 999.0, 0) # find node in list for i in range(self.num_cones): # node from csv (each line is lat, long, distance, next code#) row = self.cones[i] cone_lat = row[0] cone_lon = row[1] cone_dist = row[2] # distance between vehicle and cone self.distance = roydistance(lat, lon, cone_lat, cone_lon) # check distance trigger if self.distance < rng: # out = "Waypoint %s:%d reached: %f, %f, %f, %d" % (self.filename, self.current_cone, lat, lon, heading, speed) # print(out) self.current_cone = i trig = 1 if self.buzz_no < self.max_buzz: play_sound("alert.wav") self.buzz_no = self.buzz_no + 1 break else: self.buzz_no = 0 return (self.current_cone, self.distance, trig) # def check_list(self, lat, lon, heading, speed, rng): # trig = 0 # if self.num_cones == 0: # return (0, 999.0, 0) # # end of list? # if self.current_cone >= self.num_cones: # self.current_cone = 0 # check first node for reset # if self.current_cone: # node from csv (each line is lat, long, distance) # row = self.cones[0] # cone_lat = row[0] # cone_lon = row[1] # cone_dist = row[2] # dist = roydistance(lat, lon, cone_lat, cone_lon) # if dist < rng: # self.current_cone = 0 # node from csv (each line is lat, long, distance, next code#) # row = self.cones[self.current_cone] # cone_lat = row[0] # cone_lon = row[1] # cone_dist = row[2] # next cone # cone_next = self.current_cone + 1 # try: # cone_next = int(row[3]) # except: # pass # distance between vehicle and cone # self.distance = roydistance(lat, lon, cone_lat, cone_lon) # check distance trigger # if self.distance < rng: # out = "Waypoint %s:%d reached: %f, %f, %f, %d" % (self.filename, self.current_cone, lat, lon, heading, speed) # print(out) # play_sound("alert.wav") # self.current_cone = cone_next # trig = 1 # return (self.current_cone, self.distance, trig) ############################################################################## # thread ############################################################################## gpsd_host = "localhost" gpsd_report = () class DataSignal(QObject): sig = Signal(int) class DataThread(QThread): def __init__(self, parent=None): super(DataThread, self).__init__(parent) self.new_data = DataSignal() # self.new_data.sig.connect(parent.update_data) def run(self): global gpsd_report gpsd = gps(host=gpsd_host, mode=WATCH_ENABLE|WATCH_NEWSTYLE) while not self.isInterruptionRequested(): gpsd_report = gpsd.next() # wait for next packet self.new_data.sig.emit(1) def stop(self): self.requestInterruption() self.wait() ############################################################################## # main ############################################################################## class MainWindow(QMainWindow, Ui_MainWindow): def __init__(self, *args, obj=None, **kwargs): super(MainWindow, self).__init__(*args, **kwargs) self.setupUi(self) class ConesWindow(MainWindow): def __init__(self, *args, obj=None, **kwargs): super(ConesWindow, self).__init__(*args, **kwargs) self.setWindowTitle("NTCNA GPS Virtual Cones V2.2") self.vlist1 = VirtualCones() self.vlist2 = VirtualCones() self.vlist3 = VirtualCones() self.vlist4 = VirtualCones() self.vlist5 = VirtualCones() self.vlist6 = VirtualCones() self.vlists = [self.vlist1, self.vlist2, self.vlist3, self.vlist4, self.vlist5, self.vlist6] # button connections self.openFiles.clicked.connect(self.read_dialog) self.exitButton.clicked.connect(self.exit_button) self.testSound.clicked.connect(self.test_sound) self.saveConfig.clicked.connect(self.save_config) self.resetList1.clicked.connect(self.reset_list1) self.addPoint1.clicked.connect(self.add_point1) self.delPoint1.clicked.connect(self.del_point1) self.insPoint1.clicked.connect(self.ins_point1) self.saveList1.clicked.connect(self.save_list1) self.resetList2.clicked.connect(self.reset_list2) self.addPoint2.clicked.connect(self.add_point2) self.delPoint2.clicked.connect(self.del_point2) self.insPoint2.clicked.connect(self.ins_point2) self.saveList2.clicked.connect(self.save_list2) self.resetList3.clicked.connect(self.reset_list3) self.addPoint3.clicked.connect(self.add_point3) self.delPoint3.clicked.connect(self.del_point3) self.insPoint3.clicked.connect(self.ins_point3) self.saveList3.clicked.connect(self.save_list3) self.resetList4.clicked.connect(self.reset_list4) self.addPoint4.clicked.connect(self.add_point4) self.delPoint4.clicked.connect(self.del_point4) self.insPoint4.clicked.connect(self.ins_point4) self.saveList4.clicked.connect(self.save_list4) self.resetList5.clicked.connect(self.reset_list5) self.addPoint5.clicked.connect(self.add_point5) self.delPoint5.clicked.connect(self.del_point5) self.insPoint5.clicked.connect(self.ins_point5) self.saveList5.clicked.connect(self.save_list5) self.resetList6.clicked.connect(self.reset_list6) self.addPoint6.clicked.connect(self.add_point6) self.delPoint6.clicked.connect(self.del_point6) self.insPoint6.clicked.connect(self.ins_point6) self.saveList6.clicked.connect(self.save_list6) # makes life easier, avoid redundant code self.labels = [self.lblList1, self.lblList2, self.lblList3, self.lblList4, self.lblList5, self.lblList6] self.lists = [self.listCones1, self.listCones2, self.listCones3, self.listCones4, self.listCones5, self.listCones6] self.dists = [self.txtDist1, self.txtDist2, self.txtDist3, self.txtDist4, self.txtDist5, self.txtDist6] self.saves = [self.saveList1, self.saveList2, self.saveList3, self.saveList4, self.saveList5, self.saveList6] for i in range(0, 6): self.labels[i].setText("") self.saves[i].hide() # txtLatitude self.latitude = 0.0 self.longitude = 0.0 # GPS log self.gpx = gpxpy.gpx.GPX() # Create first track in our GPX: self.gpx_track = gpxpy.gpx.GPXTrack() self.gpx.tracks.append(self.gpx_track) # Create first segment in our GPX track: self.gpx_segment = gpxpy.gpx.GPXTrackSegment() self.gpx_track.segments.append(self.gpx_segment) self.num_points = 0 self.my_thread = DataThread() def test_sound(self): play_sound('alert.wav') def read_dialog(self): filedialog = QFileDialog(self) filedialog.setDirectory(QDir.currentPath()) filedialog.setDefaultSuffix("csv") filedialog.setNameFilter("CSV (*.csv);; all (*.*)") filedialog.setFileMode(QFileDialog.ExistingFiles) selected = filedialog.exec() if selected: self.load_cones( filedialog.selectedFiles()[0:6] ) def write_dialog(self): filedialog = QFileDialog(self) filedialog.setDirectory(QDir.currentPath()) filedialog.setDefaultSuffix("csv") filedialog.setNameFilter("CSV (*.csv);; all (*.*)") selected = filedialog.exec() if selected: filename = filedialog.selectedFiles()[0] return filename return "" # slurp in files for each list def load_cones(self, args): count = 0 for i in range(0, 6): self.labels[i].setText("") self.lists[i].clear() self.saves[i].hide() self.vlists[i].clr_cones() for arg in args: try: cones = VirtualCones() cones.read_list(arg) self.vlists[count] = cones self.labels[count].setText(os.path.basename(arg)) self.lists[count].clear() for i in range(0, cones.num_cones): self.lists[count].addItem("%11.7f, %11.7f" % (cones.cones[i][0], cones.cones[i][1])) self.lists[count].setCurrentRow(0) count += 1 self.show() except Exception as e: print(e) continue def start_thread(self): if not self.my_thread.isRunning(): self.my_thread.new_data.sig.connect(self.update_data) self.my_thread.start() self.my_thread.setTerminationEnabled(True) def stop_thread(self): self.my_thread.stop() self.my_thread.wait(3) self.save_log() @Slot(list) def update_data(self, x): global gpsd_report report = gpsd_report if report['class'] == 'TPV': status = getattr(report, 'status', 0) mode = getattr(report, 'mode', 0) time = getattr(report, 'time', 0) lat = getattr(report, 'lat', 0.0) lon = getattr(report, 'lon', 0.0) elev = getattr(report, 'altHAE', 0.0) # meters heading = getattr(report, 'track', 0.0) speed = getattr(report, 'speed', 0.0) # m/sec self.latitude = lat self.longitude = lon # self.txtStatus.setText(fix_status[status]) # self.txtMode.setText(fix_mode[mode]) self.txtTime.setText(time) self.txtLatitude.setText("%10.7f" %(lat)) self.txtLongitude.setText("%10.7f" % (lon)) # self.txtAltitude.setText("%7.2f" % (elev)) self.txtHeading.setText("%7.2f" % (heading)) self.txtSpeed.setText("%7.2f" % (speed)) rng = self.txtRange.text() if rng == "": rng = 3.0 else: rng = float(rng) for i in range(0, 6): if self.lists[i].count() == 0: continue (cone, dist, trig) = self.vlists[i].check_list(lat, lon, heading, speed, rng) if trig: self.lists[i].setCurrentRow(cone) if dist < 10000.0: self.dists[i].setText("%6.1f" % (dist)) else: self.dists[i].setText("OOR") if self.logEnabled.isChecked(): point = gpxpy.gpx.GPXTrackPoint(lat, lon, elevation=elev) point.time = datetime.now() self.gpx_segment.points.append(point) self.num_points += 1 self.show() def reset_list1(self): self.listCones1.current_cone = 0 self.lists[0].setCurrentRow(0) def reset_list2(self): self.listCones2.current_cone = 0 self.lists[1].setCurrentRow(0) def reset_list3(self): self.listCones3.current_cone = 0 self.lists[2].setCurrentRow(0) def reset_list4(self): self.listCones4.current_cone = 0 self.lists[3].setCurrentRow(0) def reset_list5(self): self.listCones5.current_cone = 0 self.lists[4].setCurrentRow(0) def reset_list6(self): self.listCones6.current_cone = 0 self.lists[5].setCurrentRow(0) def update_list(self, list): self.vlists[list].clr_cones() for i in range(0, self.lists[list].count()): item = self.lists[list].item(i) txt = item.text() lat, lon = txt.split(", ") self.vlists[list].add_cone(lat, lon, float(self.txtRange.text())) def add_point(self, list, add): self.lists[list].insertItem(self.lists[list].currentRow()+add, "%11.7f, %11.7f" % (self.latitude, self.longitude)) self.saves[list].show() self.update_list(list) def del_point(self, list): listItems = self.lists[list].selectedItems() if not listItems: return for item in listItems: self.lists[list].takeItem(self.lists[list].row(item)) self.saves[list].show() self.update_list(list) def save_list(self, list): filename = self.labels[list].text() if filename == "": filename = self.write_dialog() if filename == "": return filename = os.path.basename(filename) self.labels[list].setText(filename) self.vlists[list].save_list(filename) self.saves[list].hide() def add_point1(self): self.add_point(0, 1) def del_point1(self): self.del_point(0) def ins_point1(self): self.add_point(0, 0) def save_list1(self): self.save_list(0) def add_point2(self): self.add_point(1, 1) def del_point2(self): self.del_point(1) def ins_point2(self): self.add_point(1, 0) def save_list2(self): self.save_list(1) def add_point3(self): self.add_point(2, 1) def del_point3(self): self.del_point(2) def ins_point3(self): self.add_point(2, 0) def save_list3(self): self.save_list(2) def add_point4(self): self.add_point(3, 1) def del_point4(self): self.del_point(3) def ins_point4(self): self.add_point(3, 0) def save_list4(self): self.save_list(3) def add_point5(self): self.add_point(4, 1) def del_point5(self): self.del_point(4) def ins_point5(self): self.add_point(4, 0) def save_list5(self): self.save_list(4) def add_point6(self): self.add_point(5, 1) def del_point6(self): self.del_point(5) def ins_point6(self): self.add_point(5, 0) def save_list6(self): self.save_list(5) def save_config(self): config = {} if gpsd_host != "localhost": config['gpsd_host'] = gpsd_host config['lists'] = [] for i in range(0, 6): if self.labels[i].text() != "": self.save_list(i) config['lists'].append( self.labels[i].text() ) cfg.write_config(config) def save_log(self): if self.num_points: log = open(strftime("logs/way-%Y%m%d-%H%M.gpx"), 'w') log.write(self.gpx.to_xml()) log.close() def closeEvent(self, event): self.stop_thread() event.accept() sys.exit(0) def exit_button(self): self.stop_thread() sys.exit(0) if __name__ == '__main__': app = QApplication(sys.argv) window = ConesWindow() window.show() config = cfg.read_config() if 'gpsd_host' in config: gpsd_host = config['gpsd_host'] if len(sys.argv[1:]): window.load_cones(sys.argv[1:]) elif 'lists' in config: window.load_cones(config['lists']) window.start_thread() ret = app.exec() sys.exit(ret)