273 lines
7.1 KiB
Python
Executable File
273 lines
7.1 KiB
Python
Executable File
#! /usr/bin/python3
|
|
|
|
import os
|
|
import sys
|
|
import getopt
|
|
import csv
|
|
|
|
from math import *
|
|
from gps import *
|
|
from time import strftime, sleep
|
|
|
|
import os
|
|
from sys import platform
|
|
|
|
##############################################################################
|
|
# audio stuff
|
|
##############################################################################
|
|
|
|
def play_sound(file):
|
|
wave_obj = sa.WaveObject.from_wave_file(file)
|
|
play_obj = wave_obj.play()
|
|
|
|
##############################################################################
|
|
# buzzer stuff
|
|
##############################################################################
|
|
|
|
# https://www.instructables.com/Raspberry-Pi-Tutorial-How-to-Use-a-Buzzer/
|
|
|
|
try:
|
|
import RPi.GPIO as GPIO
|
|
|
|
buzzer=23
|
|
seconds=3
|
|
|
|
def buzzer_init():
|
|
GPIO.setwarnings(0)
|
|
GPIO.setmode(GPIO.BCM)
|
|
GPIO.setup(buzzer, GPIO.OUT)
|
|
|
|
def buzzer_on():
|
|
GPIO.output(buzzer, GPIO.HIGH)
|
|
|
|
def buzzer_off():
|
|
GPIO.output(buzzer, GPIO.LOW)
|
|
|
|
def buzzer_run():
|
|
global seconds
|
|
sec = seconds
|
|
while (sec):
|
|
buzzer_on()
|
|
sleep(0.5)
|
|
buzzer_off()
|
|
sleep(0.5)
|
|
sec = sec - 1
|
|
|
|
buzzer_init()
|
|
except:
|
|
print("Not a Raspberry Pi! No buzzer support.")
|
|
|
|
##############################################################################
|
|
# gps stuff
|
|
##############################################################################
|
|
|
|
# https://ozzmaker.com/using-python-with-a-gps-receiver-on-a-raspberry-pi/
|
|
|
|
gpsd = gps(mode=WATCH_ENABLE|WATCH_NEWSTYLE)
|
|
|
|
#
|
|
# GPS receiver is ublox from V2V-MD at 10x seconds
|
|
#
|
|
|
|
# http://www.movable-type.co.uk/scripts/latlong.html
|
|
|
|
earth_flatening = 1.0/298.257223563
|
|
earth_radius = 6378137.0
|
|
|
|
def roydistance(lat1, lon1, lat2, lon2):
|
|
"""
|
|
Calculate the great circle distance between two points
|
|
on the earth (specified in decimal degrees)
|
|
"""
|
|
# convert decimal degrees to radians
|
|
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
|
|
|
|
# Roy's method
|
|
f1 = (1.0 - earth_flatening)
|
|
|
|
top = (pow((lon2-lon1), 2) * pow(cos(lat1), 2)) + pow(lat2-lat1, 2)
|
|
bot = pow(sin(lat1), 2) + (pow(f1, 2) * pow(cos(lat1), 2))
|
|
|
|
return f1 * earth_radius * sqrt(top/bot)
|
|
|
|
##############################################################################
|
|
# log
|
|
##############################################################################
|
|
|
|
debug_on = 0
|
|
|
|
if not os.path.isdir("logs"):
|
|
os.mkdir("logs")
|
|
log = open(strftime("logs/way-%Y%m%d-%H%M.log"), 'a')
|
|
log.write("Time,Latitude,Longitude,Heading,Speed,Description\n")
|
|
|
|
##############################################################################
|
|
# virtual cones
|
|
##############################################################################
|
|
|
|
# node from csv (each line is lat, long, distance, next code# (optional)):
|
|
# 42.522040, -83.237594, 5, 1
|
|
|
|
|
|
##############################################################################
|
|
# class
|
|
##############################################################################
|
|
|
|
class VirtualCones():
|
|
def __init__(self):
|
|
self.filename = ""
|
|
self.cones = []
|
|
self.num_cones = 0
|
|
self.current_cone = 0
|
|
self.distance = 0.0
|
|
self.buzz_no = 0
|
|
self.max_buzz = 1
|
|
|
|
def clr_cones(self):
|
|
self.cones = []
|
|
self.num_cones = 0
|
|
self.current_cone = 0
|
|
|
|
def add_cone(self, lat, lon, rad):
|
|
self.cones.append( [float(lat), float(lon), float(rad)] )
|
|
self.num_cones += 1
|
|
|
|
def read_csv(self, csv_file):
|
|
file_csv = open(csv_file)
|
|
data_csv = csv.reader(file_csv)
|
|
return list(data_csv)
|
|
|
|
def read_list(self, file):
|
|
self.filename = file
|
|
self.clr_cones()
|
|
self.cones = self.read_csv(file)
|
|
if len(self.cones) == 0:
|
|
return
|
|
for i in range(0, len(self.cones)):
|
|
for j in range(0, 3):
|
|
self.cones[i][j] = float(self.cones[i][j])
|
|
self.num_cones = len(self.cones)
|
|
return self.cones
|
|
|
|
def save_list(self, filename):
|
|
with open(filename, "w") as file:
|
|
for i in range(0, self.num_cones):
|
|
file.write("%11.7f, %11.7f, %4.1f\n" % (self.cones[i][0], self.cones[i][1], self.cones[i][2]))
|
|
file.close()
|
|
|
|
def check_list(self, lat, lon, heading, speed):
|
|
trig = 0
|
|
if self.num_cones == 0:
|
|
return (0, 999.0, 0)
|
|
|
|
# end of list?
|
|
if self.current_cone >= self.num_cones:
|
|
self.current_cone = 0
|
|
self.buzz_no = 0
|
|
|
|
# check first node for reset
|
|
if self.current_cone:
|
|
# node from csv (each line is lat, long, distance)
|
|
row = self.cones[0]
|
|
cone_lat = row[0]
|
|
cone_lon = row[1]
|
|
cone_dist = row[2]
|
|
dist = roydistance(lat, lon, cone_lat, cone_lon)
|
|
if dist < cone_dist:
|
|
self.current_cone = 0
|
|
|
|
# node from csv (each line is lat, long, distance, next code#)
|
|
row = self.cones[self.current_cone]
|
|
cone_lat = row[0]
|
|
cone_lon = row[1]
|
|
cone_dist = row[2]
|
|
|
|
# next cone
|
|
cone_next = self.current_cone + 1
|
|
try:
|
|
cone_next = int(row[3])
|
|
except:
|
|
pass
|
|
|
|
# distance between vehicle and cone
|
|
self.distance = roydistance(lat, lon, cone_lat, cone_lon)
|
|
|
|
# check distance trigger
|
|
if self.distance < cone_dist:
|
|
# out = "Waypoint %s:%d reached: %f, %f, %f, %d" % (self.filename, self.current_cone, lat, lon, heading, speed)
|
|
# print(out)
|
|
if self.buzz_no < self.max_buzz:
|
|
play_sound("alert.wav")
|
|
self.buzz_no = self.buzz_no + 1
|
|
self.current_cone = cone_next
|
|
trig = 1
|
|
else:
|
|
self.buzz_no = 0
|
|
|
|
return (self.current_cone, self.distance, trig)
|
|
|
|
##############################################################################
|
|
# main
|
|
##############################################################################
|
|
|
|
opt_list = "d"
|
|
|
|
def main(argv):
|
|
global log, debug_on
|
|
virtual_list = [[],[],[],[],[],[],[],[],[]]
|
|
virtual_count = 0
|
|
|
|
try:
|
|
opts, args = getopt.getopt(argv, opt_list, ["",""])
|
|
except getopt.GetoptError:
|
|
print(sys.argv[0], '[-d] <csvfile>')
|
|
sys.exit(2)
|
|
|
|
for opt, arg in opts:
|
|
if opt in ('-h'):
|
|
print("virtual_cone_alert.py " + opt_list + " <inputfiles>")
|
|
sys.exit()
|
|
elif opt in ('-d'):
|
|
debug_on = 1
|
|
else:
|
|
print("Argument error!")
|
|
print("virtual_cone_alert.py " + opt_list + " <inputfiles>")
|
|
sys.exit()
|
|
|
|
# slurp in files for each list
|
|
for arg in args:
|
|
try:
|
|
cones = VirtualCones()
|
|
cones.read_list(arg)
|
|
virtual_list[virtual_count] = cones
|
|
virtual_count += 1
|
|
except:
|
|
print("File open/read failed: ", arg)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
while True:
|
|
report = gpsd.next() #
|
|
if report['class'] == 'TPV':
|
|
|
|
time = getattr(report, 'time', 0.0)
|
|
lat = getattr(report, 'lat', 0.0)
|
|
lon = getattr(report, 'lon', 0.0)
|
|
heading = getattr(report, 'track', 0.0)
|
|
speed = getattr(report, 'speed', 0.0)
|
|
|
|
log.write("%s,%f,%f,%f,%f\n" % (time, lat, lon, heading, speed))
|
|
# print("Position: %f, %f, %d" % (lat, lon, speed))
|
|
|
|
# walk through each list
|
|
for i in range(0, virtual_count):
|
|
virtual_list[i].check_list(lat, lon, heading, speed)
|
|
|
|
# time.sleep(0.2)
|
|
|
|
except (KeyboardInterrupt, SystemExit): #when you press ctrl+c
|
|
log.close()
|
|
print("Done.\nExiting.")
|
|
|
|
main(sys.argv[1:])
|