#! /usr/bin/python3 import os import sys import getopt import csv from math import * from gps import * from time import strftime, sleep import os from sys import platform ############################################################################## # audio stuff ############################################################################## def play_sound(file): wave_obj = sa.WaveObject.from_wave_file(file) play_obj = wave_obj.play() ############################################################################## # buzzer stuff ############################################################################## # https://www.instructables.com/Raspberry-Pi-Tutorial-How-to-Use-a-Buzzer/ try: import RPi.GPIO as GPIO buzzer=23 seconds=3 def buzzer_init(): GPIO.setwarnings(0) GPIO.setmode(GPIO.BCM) GPIO.setup(buzzer, GPIO.OUT) def buzzer_on(): GPIO.output(buzzer, GPIO.HIGH) def buzzer_off(): GPIO.output(buzzer, GPIO.LOW) def buzzer_run(): global seconds sec = seconds while (sec): buzzer_on() sleep(0.5) buzzer_off() sleep(0.5) sec = sec - 1 buzzer_init() except: print("Not a Raspberry Pi! No buzzer support.") ############################################################################## # gps stuff ############################################################################## # https://ozzmaker.com/using-python-with-a-gps-receiver-on-a-raspberry-pi/ gpsd = gps(mode=WATCH_ENABLE|WATCH_NEWSTYLE) # # GPS receiver is ublox from V2V-MD at 10x seconds # # http://www.movable-type.co.uk/scripts/latlong.html earth_flatening = 1.0/298.257223563 earth_radius = 6378137.0 def roydistance(lat1, lon1, lat2, lon2): """ Calculate the great circle distance between two points on the earth (specified in decimal degrees) """ # convert decimal degrees to radians lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2]) # Roy's method f1 = (1.0 - earth_flatening) top = (pow((lon2-lon1), 2) * pow(cos(lat1), 2)) + pow(lat2-lat1, 2) bot = pow(sin(lat1), 2) + (pow(f1, 2) * pow(cos(lat1), 2)) return f1 * earth_radius * sqrt(top/bot) ############################################################################## # log ############################################################################## debug_on = 0 if not os.path.isdir("logs"): os.mkdir("logs") log = open(strftime("logs/way-%Y%m%d-%H%M.log"), 'a') log.write("Time,Latitude,Longitude,Heading,Speed,Description\n") ############################################################################## # virtual cones ############################################################################## # node from csv (each line is lat, long, distance, next code# (optional)): # 42.522040, -83.237594, 5, 1 ############################################################################## # class ############################################################################## class VirtualCones(): def __init__(self): self.filename = "" self.cones = [] self.num_cones = 0 self.current_cone = 0 self.distance = 0.0 self.buzz_no = 0 self.max_buzz = 1 def clr_cones(self): self.cones = [] self.num_cones = 0 self.current_cone = 0 def add_cone(self, lat, lon, rad): self.cones.append( [float(lat), float(lon), float(rad)] ) self.num_cones += 1 def read_csv(self, csv_file): file_csv = open(csv_file) data_csv = csv.reader(file_csv) return list(data_csv) def read_list(self, file): self.filename = file self.clr_cones() self.cones = self.read_csv(file) if len(self.cones) == 0: return for i in range(0, len(self.cones)): for j in range(0, 3): self.cones[i][j] = float(self.cones[i][j]) self.num_cones = len(self.cones) return self.cones def save_list(self, filename): with open(filename, "w") as file: for i in range(0, self.num_cones): file.write("%11.7f, %11.7f, %4.1f\n" % (self.cones[i][0], self.cones[i][1], self.cones[i][2])) file.close() def check_list(self, lat, lon, heading, speed): trig = 0 if self.num_cones == 0: return (0, 999.0, 0) # end of list? if self.current_cone >= self.num_cones: self.current_cone = 0 self.buzz_no = 0 # check first node for reset if self.current_cone: # node from csv (each line is lat, long, distance) row = self.cones[0] cone_lat = row[0] cone_lon = row[1] cone_dist = row[2] dist = roydistance(lat, lon, cone_lat, cone_lon) if dist < cone_dist: self.current_cone = 0 # node from csv (each line is lat, long, distance, next code#) row = self.cones[self.current_cone] cone_lat = row[0] cone_lon = row[1] cone_dist = row[2] # next cone cone_next = self.current_cone + 1 try: cone_next = int(row[3]) except: pass # distance between vehicle and cone self.distance = roydistance(lat, lon, cone_lat, cone_lon) # check distance trigger if self.distance < cone_dist: # out = "Waypoint %s:%d reached: %f, %f, %f, %d" % (self.filename, self.current_cone, lat, lon, heading, speed) # print(out) if self.buzz_no < self.max_buzz: play_sound("alert.wav") self.buzz_no = self.buzz_no + 1 self.current_cone = cone_next trig = 1 else: self.buzz_no = 0 return (self.current_cone, self.distance, trig) ############################################################################## # main ############################################################################## opt_list = "d" def main(argv): global log, debug_on virtual_list = [[],[],[],[],[],[],[],[],[]] virtual_count = 0 try: opts, args = getopt.getopt(argv, opt_list, ["",""]) except getopt.GetoptError: print(sys.argv[0], '[-d] ') sys.exit(2) for opt, arg in opts: if opt in ('-h'): print("virtual_cone_alert.py " + opt_list + " ") sys.exit() elif opt in ('-d'): debug_on = 1 else: print("Argument error!") print("virtual_cone_alert.py " + opt_list + " ") sys.exit() # slurp in files for each list for arg in args: try: cones = VirtualCones() cones.read_list(arg) virtual_list[virtual_count] = cones virtual_count += 1 except: print("File open/read failed: ", arg) sys.exit(1) try: while True: report = gpsd.next() # if report['class'] == 'TPV': time = getattr(report, 'time', 0.0) lat = getattr(report, 'lat', 0.0) lon = getattr(report, 'lon', 0.0) heading = getattr(report, 'track', 0.0) speed = getattr(report, 'speed', 0.0) log.write("%s,%f,%f,%f,%f\n" % (time, lat, lon, heading, speed)) # print("Position: %f, %f, %d" % (lat, lon, speed)) # walk through each list for i in range(0, virtual_count): virtual_list[i].check_list(lat, lon, heading, speed) # time.sleep(0.2) except (KeyboardInterrupt, SystemExit): #when you press ctrl+c log.close() print("Done.\nExiting.") main(sys.argv[1:])