Files
Pynomicon/gps_vcones/vcones.py
2021-06-18 10:55:55 -04:00

273 lines
7.1 KiB
Python
Executable File

#! /usr/bin/python3
import os
import sys
import getopt
import csv
from math import *
from gps import *
from time import strftime, sleep
import os
from sys import platform
##############################################################################
# audio stuff
##############################################################################
def play_sound(file):
wave_obj = sa.WaveObject.from_wave_file(file)
play_obj = wave_obj.play()
##############################################################################
# buzzer stuff
##############################################################################
# https://www.instructables.com/Raspberry-Pi-Tutorial-How-to-Use-a-Buzzer/
try:
import RPi.GPIO as GPIO
buzzer=23
seconds=3
def buzzer_init():
GPIO.setwarnings(0)
GPIO.setmode(GPIO.BCM)
GPIO.setup(buzzer, GPIO.OUT)
def buzzer_on():
GPIO.output(buzzer, GPIO.HIGH)
def buzzer_off():
GPIO.output(buzzer, GPIO.LOW)
def buzzer_run():
global seconds
sec = seconds
while (sec):
buzzer_on()
sleep(0.5)
buzzer_off()
sleep(0.5)
sec = sec - 1
buzzer_init()
except:
print("Not a Raspberry Pi! No buzzer support.")
##############################################################################
# gps stuff
##############################################################################
# https://ozzmaker.com/using-python-with-a-gps-receiver-on-a-raspberry-pi/
gpsd = gps(mode=WATCH_ENABLE|WATCH_NEWSTYLE)
#
# GPS receiver is ublox from V2V-MD at 10x seconds
#
# http://www.movable-type.co.uk/scripts/latlong.html
earth_flatening = 1.0/298.257223563
earth_radius = 6378137.0
def roydistance(lat1, lon1, lat2, lon2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# Roy's method
f1 = (1.0 - earth_flatening)
top = (pow((lon2-lon1), 2) * pow(cos(lat1), 2)) + pow(lat2-lat1, 2)
bot = pow(sin(lat1), 2) + (pow(f1, 2) * pow(cos(lat1), 2))
return f1 * earth_radius * sqrt(top/bot)
##############################################################################
# log
##############################################################################
debug_on = 0
if not os.path.isdir("logs"):
os.mkdir("logs")
log = open(strftime("logs/way-%Y%m%d-%H%M.log"), 'a')
log.write("Time,Latitude,Longitude,Heading,Speed,Description\n")
##############################################################################
# virtual cones
##############################################################################
# node from csv (each line is lat, long, distance, next code# (optional)):
# 42.522040, -83.237594, 5, 1
##############################################################################
# class
##############################################################################
class VirtualCones():
def __init__(self):
self.filename = ""
self.cones = []
self.num_cones = 0
self.current_cone = 0
self.distance = 0.0
self.buzz_no = 0
self.max_buzz = 1
def clr_cones(self):
self.cones = []
self.num_cones = 0
self.current_cone = 0
def add_cone(self, lat, lon, rad):
self.cones.append( [float(lat), float(lon), float(rad)] )
self.num_cones += 1
def read_csv(self, csv_file):
file_csv = open(csv_file)
data_csv = csv.reader(file_csv)
return list(data_csv)
def read_list(self, file):
self.filename = file
self.clr_cones()
self.cones = self.read_csv(file)
if len(self.cones) == 0:
return
for i in range(0, len(self.cones)):
for j in range(0, 3):
self.cones[i][j] = float(self.cones[i][j])
self.num_cones = len(self.cones)
return self.cones
def save_list(self, filename):
with open(filename, "w") as file:
for i in range(0, self.num_cones):
file.write("%11.7f, %11.7f, %4.1f\n" % (self.cones[i][0], self.cones[i][1], self.cones[i][2]))
file.close()
def check_list(self, lat, lon, heading, speed):
trig = 0
if self.num_cones == 0:
return (0, 999.0, 0)
# end of list?
if self.current_cone >= self.num_cones:
self.current_cone = 0
self.buzz_no = 0
# check first node for reset
if self.current_cone:
# node from csv (each line is lat, long, distance)
row = self.cones[0]
cone_lat = row[0]
cone_lon = row[1]
cone_dist = row[2]
dist = roydistance(lat, lon, cone_lat, cone_lon)
if dist < cone_dist:
self.current_cone = 0
# node from csv (each line is lat, long, distance, next code#)
row = self.cones[self.current_cone]
cone_lat = row[0]
cone_lon = row[1]
cone_dist = row[2]
# next cone
cone_next = self.current_cone + 1
try:
cone_next = int(row[3])
except:
pass
# distance between vehicle and cone
self.distance = roydistance(lat, lon, cone_lat, cone_lon)
# check distance trigger
if self.distance < cone_dist:
# out = "Waypoint %s:%d reached: %f, %f, %f, %d" % (self.filename, self.current_cone, lat, lon, heading, speed)
# print(out)
if self.buzz_no < self.max_buzz:
play_sound("alert.wav")
self.buzz_no = self.buzz_no + 1
self.current_cone = cone_next
trig = 1
else:
self.buzz_no = 0
return (self.current_cone, self.distance, trig)
##############################################################################
# main
##############################################################################
opt_list = "d"
def main(argv):
global log, debug_on
virtual_list = [[],[],[],[],[],[],[],[],[]]
virtual_count = 0
try:
opts, args = getopt.getopt(argv, opt_list, ["",""])
except getopt.GetoptError:
print(sys.argv[0], '[-d] <csvfile>')
sys.exit(2)
for opt, arg in opts:
if opt in ('-h'):
print("virtual_cone_alert.py " + opt_list + " <inputfiles>")
sys.exit()
elif opt in ('-d'):
debug_on = 1
else:
print("Argument error!")
print("virtual_cone_alert.py " + opt_list + " <inputfiles>")
sys.exit()
# slurp in files for each list
for arg in args:
try:
cones = VirtualCones()
cones.read_list(arg)
virtual_list[virtual_count] = cones
virtual_count += 1
except:
print("File open/read failed: ", arg)
sys.exit(1)
try:
while True:
report = gpsd.next() #
if report['class'] == 'TPV':
time = getattr(report, 'time', 0.0)
lat = getattr(report, 'lat', 0.0)
lon = getattr(report, 'lon', 0.0)
heading = getattr(report, 'track', 0.0)
speed = getattr(report, 'speed', 0.0)
log.write("%s,%f,%f,%f,%f\n" % (time, lat, lon, heading, speed))
# print("Position: %f, %f, %d" % (lat, lon, speed))
# walk through each list
for i in range(0, virtual_count):
virtual_list[i].check_list(lat, lon, heading, speed)
# time.sleep(0.2)
except (KeyboardInterrupt, SystemExit): #when you press ctrl+c
log.close()
print("Done.\nExiting.")
main(sys.argv[1:])