Files
linux_tools/bin/xgps
2021-01-22 10:13:40 -05:00

979 lines
36 KiB
Python
Executable File

#! /usr/bin/python
# -*- coding: UTF-8
'''
xgps -- test client for gpsd
usage: xgps [-D level] [-hV?] [-l degmfmt] [-u units] [-r rotation]
[server[:port[:device]]]
'''
# This file is Copyright (c) 2010 by the GPSD project
# BSD terms apply: see the file COPYING in the distribution root for details.
#
# This code runs compatibly under Python 2 and 3.x for x >= 2.
# Preserve this property!
from __future__ import absolute_import, print_function, division
import math
import socket
import sys
import time
import gps
import gps.clienthelpers
import cairo
# Gtk3 imports. Gtk3 requires the require_version(), which then causes
# pylint to complain about the subsequent "non-top" imports.
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import GObject # pylint: disable=wrong-import-position
from gi.repository import Gtk # pylint: disable=wrong-import-position
from gi.repository import Gdk # pylint: disable=wrong-import-position
gui_about = '''\
This is xgps, a test client for the gpsd daemon.
By Eric S. Raymond for the GPSD project, December 2009
'''
# Use our own MAXCHANNELS value, due to the tradeoff between max sats and
# the window size. Ideally, this should be dynamic.
MAXCHANNELS = 20
# how to sort the Satellite List
# some of ("PRN","el","az","ss","used") with optional '-' to reverse sort
# by default, used at the top, then sort PRN
SKY_VIEW_SORT_FIELDS = ('-used', 'PRN')
class unit_adjustments(object):
"Encapsulate adjustments for unit systems."
def __init__(self, units=None):
self.altfactor = 1.0
self.altunits = "m"
self.speedfactor = gps.MPS_TO_KPH
self.speedunits = "kph"
if units is None:
units = gps.clienthelpers.gpsd_units()
if units in (gps.clienthelpers.unspecified, gps.clienthelpers.imperial,
"imperial", "i"):
pass
elif units in (gps.clienthelpers.nautical, "nautical", "n"):
self.altfactor = gps.METERS_TO_FEET
self.altunits = "ft"
self.speedfactor = gps.MPS_TO_KNOTS
self.speedunits = "knots"
elif units in (gps.clienthelpers.metric, "metric", "m"):
self.altfactor = 1.0
self.altunits = "m"
self.speedfactor = gps.MPS_TO_KPH
self.speedunits = "kph"
else:
raise ValueError # Should never happen
def fit_to_grid(x, y, line_width):
"Adjust coordinates to produce sharp lines."
if line_width % 1.0 != 0:
# Can't have sharp lines for non-integral line widths.
return float(x), float(y) # Be consistent about returning floats
if line_width % 2 == 0:
# Round to a pixel corner.
return round(x), round(y)
else:
# Round to a pixel center.
return int(x) + 0.5, int(y) + 0.5
def fit_circle_to_grid(x, y, radius, line_width):
"""Adjust circle coordinates and radius to produce sharp horizontal
and vertical tangents."""
r = radius
x1, y1 = fit_to_grid(x - r, y - r, line_width)
x2, y2 = fit_to_grid(x + r, y + r, line_width)
x, y = (x1 + x2) / 2, (y1 + y2) / 2
r = (x2 - x1 + y2 - y1) / 4
return x, y, r
class SkyView(Gtk.DrawingArea):
"Satellite skyview, encapsulates pygtk's draw-on-expose behavior."
# See <http://faq.pygtk.org/index.py?req=show&file=faq18.008.htp>
HORIZON_PAD = 40 # How much whitespace to leave around horizon
SAT_RADIUS = 5 # Diameter of satellite circle
def __init__(self, rotate=0.0):
Gtk.DrawingArea.__init__(self)
# GObject.GObject.__init__(self)
self.set_size_request(400, 400)
self.cr = None # New cairo context for each expose event
self.step_of_grid = 30 # default step of polar grid
self.connect('size-allocate', self.on_size_allocate)
self.connect('draw', self.on_draw)
self.satellites = []
self.center_x = self.center_y = self.radius = None
self.rotate = rotate
def on_size_allocate(self, _unused, allocation):
width = allocation.width
height = allocation.height
x = width // 2
y = height // 2
r = (min(width, height) - SkyView.HORIZON_PAD) // 2
x, y, r = fit_circle_to_grid(x, y, r, 1)
self.center_x = x
self.center_y = y
self.radius = r
def set_color(self, spec):
"Set foreground color for drawing."
gdkcolor = Gdk.color_parse(spec)
r = gdkcolor.red / 65535.0
g = gdkcolor.green / 65535.0
b = gdkcolor.blue / 65535.0
self.cr.set_source_rgb(r, g, b)
def draw_circle(self, x, y, radius, filled=False):
"Draw a circle centered on the specified midpoint."
lw = self.cr.get_line_width()
r = int(2 * radius + 0.5) // 2
x, y, r = fit_circle_to_grid(x, y, radius, lw)
self.cr.arc(x, y, r, 0, math.pi * 2.0)
self.cr.close_path()
if filled:
self.cr.fill()
else:
self.cr.stroke()
def draw_line(self, x1, y1, x2, y2):
"Draw a line between specified points."
lw = self.cr.get_line_width()
x1, y1 = fit_to_grid(x1, y1, lw)
x2, y2 = fit_to_grid(x2, y2, lw)
self.cr.move_to(x1, y1)
self.cr.line_to(x2, y2)
self.cr.stroke()
def draw_square(self, x, y, radius, filled=False):
"Draw a square centered on the specified midpoint."
lw = self.cr.get_line_width()
x1, y1 = fit_to_grid(x - radius, y - radius, lw)
x2, y2 = fit_to_grid(x + radius, y + radius, lw)
self.cr.rectangle(x1, y1, x2 - x1, y2 - y1)
if filled:
self.cr.fill()
else:
self.cr.stroke()
def draw_string(self, x, y, text, centered=True):
"Draw a text on the skyview."
self.cr.select_font_face("Sans", cairo.FONT_SLANT_NORMAL,
cairo.FONT_WEIGHT_BOLD)
self.cr.set_font_size(10)
if centered:
extents = self.cr.text_extents(text)
# width / 2 + x_bearing
x -= extents[2] / 2 + extents[0]
# height / 2 + y_bearing
y -= extents[3] / 2 + extents[1]
self.cr.move_to(x, y)
self.cr.show_text(text)
self.cr.new_path()
def pol2cart(self, az, el):
"Polar to Cartesian coordinates within the horizon circle."
az = (az - self.rotate) % 360.0
az *= (math.pi / 180) # Degrees to radians
# Exact spherical projection would be like this:
# el = sin((90.0 - el) * DEG_2_RAD);
el = ((90.0 - el) / 90.0)
xout = self.center_x + math.sin(az) * el * self.radius
yout = self.center_y - math.cos(az) * el * self.radius
return (xout, yout)
def on_draw(self, widget, _unused):
self.cr = widget.get_window().cairo_create()
self.cr.set_line_width(1)
self.cr.set_source_rgb(0, 0, 0)
self.cr.paint()
self.cr.set_source_rgb(1, 1, 1)
# The zenith marker
self.draw_circle(self.center_x, self.center_y, 6, filled=False)
# The horizon circle
if self.step_of_grid == 45:
# The circle corresponding to 45 degrees elevation.
# There are two ways we could plot this. Projecting the sphere
# on the display plane, the circle would have a diameter of
# sin(45) ~ 0.7. But the naive linear mapping, just splitting
# the horizon diameter in half, seems to work better visually.
self.draw_circle(self.center_x, self.center_y, self.radius / 2,
filled=False)
elif self.step_of_grid == 30:
self.draw_circle(self.center_x, self.center_y, self.radius * 2 / 3,
filled=False)
self.draw_circle(self.center_x, self.center_y, self.radius / 3,
filled=False)
self.draw_circle(self.center_x, self.center_y, self.radius,
filled=False)
(x1, y1) = self.pol2cart(0, 0)
(x2, y2) = self.pol2cart(180, 0)
self.draw_line(x1, y1, x2, y2)
(x1, y1) = self.pol2cart(90, 0)
(x2, y2) = self.pol2cart(270, 0)
self.draw_line(x1, y1, x2, y2)
# The compass-point letters
(x, y) = self.pol2cart(0, -5)
self.draw_string(x, y, "N")
(x, y) = self.pol2cart(90, -5)
self.draw_string(x, y, "E")
(x, y) = self.pol2cart(180, -5)
self.draw_string(x, y, "S")
(x, y) = self.pol2cart(270, -5)
self.draw_string(x, y, "W")
# The satellites
self.cr.set_line_width(2)
for sat in self.satellites:
if sat.az == 0 and sat.el == 0:
continue # Skip satellites with unknown position
(x, y) = self.pol2cart(sat.az, sat.el)
if sat.ss < 10:
self.set_color("Gray")
elif sat.ss < 30:
self.set_color("Red")
elif sat.ss < 35:
self.set_color("Yellow")
elif sat.ss < 40:
self.set_color("Green3")
else:
self.set_color("Green1")
if gps.is_sbas(sat.PRN):
self.draw_square(x, y, SkyView.SAT_RADIUS, sat.used)
else:
self.draw_circle(x, y, SkyView.SAT_RADIUS, sat.used)
self.cr.set_source_rgb(1, 1, 1)
self.draw_string(x + SkyView.SAT_RADIUS,
y + (SkyView.SAT_RADIUS * 2), str(sat.PRN),
centered=False)
self.cr = None
def redraw(self, satellites):
"Redraw the skyview."
self.satellites = satellites
self.queue_draw()
class NoiseView(object):
"Encapsulate view object for watching noise statistics."
COLUMNS = 2
ROWS = 4
noisefields = (
# First column
("Time", "time"),
("Latitude", "lat"),
("Longitude", "lon"),
("Altitude", "alt"),
# Second column
("RMS", "rms"),
("Major", "major"),
("Minor", "minor"),
("Orient", "orient"),
)
def __init__(self):
self.widget = Gtk.Table(NoiseView.COLUMNS, NoiseView.ROWS, False)
self.noisewidgets = []
for i in range(len(NoiseView.noisefields)):
colbase = (i // NoiseView.ROWS) * 2
label = Gtk.Label(label=NoiseView.noisefields[i][0] + ": ")
# Wacky way to force right alignment
label.set_alignment(xalign=1, yalign=0.5)
self.widget.attach(
label, colbase, colbase + 1,
i % NoiseView.ROWS, i % NoiseView.ROWS + 1)
entry = Gtk.Entry()
# The right size for the ISO8601 timestamp
entry.set_width_chars(20)
entry.set_text("n/a")
self.widget.attach(
entry, colbase + 1, colbase + 2,
i % NoiseView.ROWS, i % NoiseView.ROWS + 1)
self.noisewidgets.append((NoiseView.noisefields[i][1], entry))
def update(self, noise):
"Update the GPGST data fields."
for (attrname, widget) in self.noisewidgets:
if hasattr(noise, attrname):
widget.set_text(str(getattr(noise, attrname)))
else:
widget.set_text("n/a")
class MaidenheadView(object):
"Encapsulate view object for watching Maidenhead grid location."
def __init__(self):
self.widget = Gtk.Entry()
self.widget.set_editable(False)
def update(self, tpv):
if ((tpv.mode >= gps.MODE_2D
and hasattr(tpv, "lat")
and hasattr(tpv, "lon"))):
self.widget.set_text(gps.clienthelpers.maidenhead(tpv.lat,
tpv.lon))
else:
return self.widget.set_text("n/a")
class AISView(object):
"Encapsulate store and view objects for watching AIS data."
AIS_ENTRIES = 10
DWELLTIME = 360
def __init__(self, deg_type):
"Initialize the store and view."
self.deg_type = deg_type
self.name_to_mmsi = {}
self.named = {}
self.store = Gtk.ListStore(str, str, str, str, str, str)
self.widget = Gtk.ScrolledWindow()
self.widget.set_policy(Gtk.PolicyType.AUTOMATIC,
Gtk.PolicyType.AUTOMATIC)
self.view = Gtk.TreeView(model=self.store)
self.widget.set_size_request(-1, 300)
self.widget.add_with_viewport(self.view)
for (i, label) in enumerate(('#', 'Name:', 'Callsign:',
'Destination:', "Lat/Lon:",
"Information")):
column = Gtk.TreeViewColumn(label)
renderer = Gtk.CellRendererText()
column.pack_start(renderer, expand=True)
column.add_attribute(renderer, 'text', i)
self.view.append_column(column)
def enter(self, ais, name):
"Add a named object (ship or station) to the store."
if ais.mmsi in self.named:
return False
else:
ais.entry_time = time.time()
self.named[ais.mmsi] = ais
self.name_to_mmsi[name] = ais.mmsi
# Garbage-collect old entries
try:
for i in range(len(self.store)):
here = self.store.get_iter(i)
name = self.store.get_value(here, 1)
mmsi = self.name_to_mmsi[name]
if ((self.named[mmsi].entry_time
< time.time() - AISView.DWELLTIME)):
del self.named[mmsi]
if name in self.name_to_mmsi:
del self.name_to_mmsi[name]
self.store.remove(here)
except (ValueError, KeyError): # Invalid TreeIters throw these
pass
return True
def latlon(self, lat, lon):
"Latitude/longitude display in nice format."
if lat < 0:
latsuff = "S"
elif lat > 0:
latsuff = "N"
else:
latsuff = ""
lat = abs(lat)
lat = gps.clienthelpers.deg_to_str(self.deg_type, lat)
if lon < 0:
lonsuff = "W"
elif lon > 0:
lonsuff = "E"
else:
lonsuff = ""
lon = abs(lon)
lon = gps.clienthelpers.deg_to_str(gps.clienthelpers.deg_ddmmss, lon)
return lat + latsuff + "/" + lon + lonsuff
def update(self, ais):
"Update the AIS data fields."
if ais.type in (1, 2, 3, 18):
if ais.mmsi in self.named:
for i in range(len(self.store)):
here = self.store.get_iter(i)
name = self.store.get_value(here, 1)
if name in self.name_to_mmsi:
mmsi = self.name_to_mmsi[name]
if mmsi == ais.mmsi:
latlon = self.latlon(ais.lat, ais.lon)
self.store.set_value(here, 4, latlon)
elif ais.type == 4:
if self.enter(ais, ais.mmsi):
where = self.latlon(ais.lat, ais.lon)
self.store.prepend(
(ais.type, ais.mmsi, "(shore)", ais.timestamp, where,
ais.epfd_text))
elif ais.type == 5:
if self.enter(ais, ais.shipname):
self.store.prepend(
(ais.type, ais.shipname, ais.callsign, ais.destination,
"", ais.shiptype))
elif ais.type == 12:
sender = ais.mmsi
if sender in self.named:
sender = self.named[sender].shipname
recipient = ais.dest_mmsi
if ((recipient in self.named
and hasattr(self.named[recipient], "shipname"))):
recipient = self.named[recipient].shipname
self.store.prepend(
(ais.type, sender, "", recipient, "", ais.text))
elif ais.type == 14:
sender = ais.mmsi
if sender in self.named:
sender = self.named[sender].shipname
self.store.prepend(
(ais.type, sender, "", "(broadcast)", "", ais.text))
elif ais.type in (19, 24):
if self.enter(ais, ais.shipname):
self.store.prepend(
(ais.type, ais.shipname, "(class B)", "", "",
ais.shiptype_text))
elif ais.type == 21:
if self.enter(ais, ais.name):
where = self.latlon(ais.lat, ais.lon)
self.store.prepend(
(ais.type, ais.name, "(%s navaid)" % ais.epfd_text,
"", where, ais.aid_type_text))
class Base(object):
COLUMNS = 3
ROWS = 7
gpsfields = (
# First column
("Time", lambda s, r: s.update_time(r)),
("Latitude", lambda s, r: s.update_latitude(r)),
("Longitude", lambda s, r: s.update_longitude(r)),
("Altitude", lambda s, r: s.update_altitude(r)),
("Speed", lambda s, r: s.update_speed(r)),
("Climb", lambda s, r: s.update_climb(r)),
("Track", lambda s, r: s.update_track(r)),
# Second column
("Status", lambda s, r: s.update_status(r)),
("EPX", lambda s, r: s.update_err(r, "epx")),
("EPY", lambda s, r: s.update_err(r, "epy")),
("EPV", lambda s, r: s.update_err(r, "epv")),
("EPS", lambda s, r: s.update_err_speed(r, "eps")),
("EPC", lambda s, r: s.update_err_speed(r, "epc")),
("EPD", lambda s, r: s.update_err_degrees(r, "epd")),
)
def __init__(self, deg_type, rotate=0.0, target=""):
self.deg_type = deg_type
self.rotate = rotate
self.conversions = unit_adjustments()
self.saved_mode = -1
self.ais_latch = False
self.noise_latch = False
self.window = Gtk.Window(Gtk.WindowType.TOPLEVEL)
if not self.window.get_display():
raise Exception("Can't open display")
if len(target):
target = " " + target
self.window.set_title("xgps" + target)
self.window.connect("delete-event", self.delete_event)
self.window.set_resizable(False)
vbox = Gtk.VBox(False, 0)
self.window.add(vbox)
self.window.connect("destroy", lambda _unused: Gtk.main_quit())
self.uimanager = Gtk.UIManager()
self.accelgroup = self.uimanager.get_accel_group()
self.window.add_accel_group(self.accelgroup)
self.actiongroup = Gtk.ActionGroup('xgps')
self.actiongroup.add_actions(
[('Quit', Gtk.STOCK_QUIT, '_Quit', None,
'Quit the Program', lambda _unused: Gtk.main_quit()),
('File', None, '_File'),
('View', None, '_View'),
('Units', None, '_Units'),
('Step of grid', None, '_Step of grid')])
self.actiongroup.add_toggle_actions(
[('Skyview', None, '_Skyview', '<Control>s',
'Enable Skyview', self.view_toggle),
('Responses', None, '_Responses', '<Control>r',
'Enable Response Reports', self.view_toggle),
('GPS', None, '_GPS Data', '<Control>g',
'Enable GPS Data', self.view_toggle),
('Noise', None, '_Noise Statistics', '<Control>n',
'Enable Noise Statistics', self.view_toggle),
('Maidenhead', None, '_Maidenhead', '<Control>m',
'Enable Maidenhead locator', self.view_toggle),
('AIS', None, '_AIS Data', '<Control>a',
'Enable AIS Data', self.view_toggle),
])
self.actiongroup.add_radio_actions(
[('Imperial', None, '_Imperial', '<Control>i',
'Imperial units', 0),
('Nautical', None, '_Nautical', '<Control>n',
'Nautical units', 1),
('Metric', None, '_Metric', '<Control>m',
'Metric Units', 2),
], 2, lambda a, _unused: self.set_units(
['i', 'n', 'm'][a.get_current_value()]))
self.actiongroup.add_radio_actions(
[('30deg', None, '30°', None, '30°', 30),
('45deg', None, '45°', None, '45°', 45),
('Off', None, 'Off', None, 'Off', 0),
], 30, lambda a, _unused: self.set_step_of_grid(
a.get_current_value()))
self.uimanager.insert_action_group(self.actiongroup, 0)
self.uimanager.add_ui_from_string('''
<ui>
<menubar name="MenuBar">
<menu action="File">
<menuitem action="Quit"/>
</menu>
<menu action="View">
<menuitem action="Skyview"/>
<menuitem action="Responses"/>
<menuitem action="GPS"/>
<menuitem action="Noise"/>
<menuitem action="Maidenhead"/>
<menuitem action="AIS"/>
</menu>
<menu action="Units">
<menuitem action="Imperial"/>
<menuitem action="Nautical"/>
<menuitem action="Metric"/>
</menu>
<menu action="Step of grid">
<menuitem action="30deg"/>
<menuitem action="45deg"/>
<menuitem action="Off"/>
</menu>
</menubar>
</ui>
''')
self.uimanager.get_widget('/MenuBar/View/Skyview').set_active(True)
self.uimanager.get_widget('/MenuBar/View/Responses').set_active(True)
self.uimanager.get_widget('/MenuBar/View/GPS').set_active(True)
self.uimanager.get_widget('/MenuBar/View/Noise').set_active(True)
self.uimanager.get_widget('/MenuBar/View/Maidenhead').set_active(True)
self.uimanager.get_widget('/MenuBar/View/AIS').set_active(True)
menubar = self.uimanager.get_widget('/MenuBar')
vbox.pack_start(menubar, expand=False, fill=True, padding=0)
self.satbox = Gtk.HBox(False, 0)
vbox.add(self.satbox)
skyframe = Gtk.Frame(label="Satellite List")
self.satbox.add(skyframe)
self.satlist = Gtk.ListStore(str, str, str, str, str)
view = Gtk.TreeView(model=self.satlist)
for (i, label) in enumerate(('PRN:', 'Elev:', 'Azim:', 'SNR:',
'Used:')):
column = Gtk.TreeViewColumn(label)
renderer = Gtk.CellRendererText()
column.pack_start(renderer, expand=True)
column.add_attribute(renderer, 'text', i)
view.append_column(column)
self.row_iters = []
for i in range(MAXCHANNELS):
self.satlist.append(["", "", "", "", ""])
self.row_iters.append(self.satlist.get_iter(i))
skyframe.add(view)
viewframe = Gtk.Frame(label="Skyview")
self.satbox.add(viewframe)
self.skyview = SkyView(self.rotate)
viewframe.add(self.skyview)
self.rawdisplay = Gtk.Entry()
self.rawdisplay.set_editable(False)
vbox.add(self.rawdisplay)
self.dataframe = Gtk.Frame(label="GPS data")
datatable = Gtk.Table(Base.COLUMNS, Base.ROWS, False)
self.dataframe.add(datatable)
gpswidgets = []
for i in range(len(Base.gpsfields)):
colbase = (i // Base.ROWS) * 2
label = Gtk.Label(label=Base.gpsfields[i][0] + ": ")
# Wacky way to force right alignment
label.set_alignment(xalign=1, yalign=0.5)
datatable.attach(label, colbase, colbase + 1,
i % Base.ROWS, i % Base.ROWS + 1)
entry = Gtk.Entry()
# The right size for the ISO8601 timestamp
entry.set_width_chars(20)
entry.set_text("n/a")
datatable.attach(entry, colbase + 1, colbase + 2,
i % Base.ROWS, i % Base.ROWS + 1)
gpswidgets.append(entry)
vbox.add(self.dataframe)
self.noisebox = Gtk.HBox(False, 0)
vbox.add(self.noisebox)
noiseframe = Gtk.Frame(label="Noise Statistics")
self.noisebox.add(noiseframe)
self.noiseview = NoiseView()
noiseframe.add(self.noiseview.widget)
self.gsbox = Gtk.HBox(False, 0)
vbox.add(self.gsbox)
gsframe = Gtk.Frame(label="Maidenhead Grid Square")
self.gsbox.add(gsframe)
self.gsview = MaidenheadView()
gsframe.add(self.gsview.widget)
self.aisbox = Gtk.HBox(False, 0)
vbox.add(self.aisbox)
aisframe = Gtk.Frame(label="AIS Data")
self.aisbox.add(aisframe)
self.aisview = AISView(self.deg_type)
aisframe.add(self.aisview.widget)
self.window.show_all()
# Hide the Noise Statistics window until user selects it.
self.uimanager.get_widget('/MenuBar/View/Noise').set_active(False)
self.noisebox.hide()
# Hide the Maidenhead window until user selects it.
self.uimanager.get_widget('/MenuBar/View/Maidenhead').set_active(False)
self.gsbox.hide()
# Hide the AIS window until user selects it.
self.uimanager.get_widget('/MenuBar/View/AIS').set_active(False)
self.aisbox.hide()
self.view_name_to_widget = {
"Skyview": self.satbox,
"Responses": self.rawdisplay,
"GPS": self.dataframe,
"Noise": self.noisebox,
"Maidenhead": self.gsbox,
"AIS": self.aisbox}
# Discard field labels and associate data hooks with their widgets
Base.gpsfields = [(label_hook_widget[0][1], label_hook_widget[1])
for label_hook_widget
in zip(Base.gpsfields, gpswidgets)]
def view_toggle(self, action):
# print("View toggle:", action.get_active(), action.get_name())
if hasattr(self, 'view_name_to_widget'):
if action.get_active():
self.view_name_to_widget[action.get_name()].show()
else:
self.view_name_to_widget[action.get_name()].hide()
# The effect we're after is to make the top-level window
# resize itself to fit when we show or hide widgets.
# This is undocumented magic to do that.
self.window.resize(1, 1)
def set_satlist_field(self, row, column, value):
"Set a specified field in the satellite list."
try:
self.satlist.set_value(self.row_iters[row], column, str(value))
except IndexError:
sys.stderr.write("xgps: channel = %d, MAXCHANNELS = %d\n"
% (row, MAXCHANNELS))
def delete_event(self, _widget, _event, _data=None):
Gtk.main_quit()
return False
# State updates
def update_time(self, data):
if hasattr(data, "time"):
# str() just in case we get an old-style float.
return str(data.time)
else:
return "n/a"
def update_latitude(self, data):
if data.mode >= gps.MODE_2D and hasattr(data, "lat"):
lat = gps.clienthelpers.deg_to_str(self.deg_type, abs(data.lat))
if data.lat < 0:
ns = 'S'
else:
ns = 'N'
return "%s %s" % (lat, ns)
else:
return "n/a"
def update_longitude(self, data):
if data.mode >= gps.MODE_2D and hasattr(data, "lon"):
lon = gps.clienthelpers.deg_to_str(self.deg_type, abs(data.lon))
if data.lon < 0:
ew = 'W'
else:
ew = 'E'
return "%s %s" % (lon, ew)
else:
return "n/a"
def update_altitude(self, data):
if data.mode >= gps.MODE_3D and hasattr(data, "alt"):
return "%.3f %s" % (
data.alt * self.conversions.altfactor,
self.conversions.altunits)
else:
return "n/a"
def update_speed(self, data):
if hasattr(data, "speed"):
return "%.3f %s" % (
data.speed * self.conversions.speedfactor,
self.conversions.speedunits)
else:
return "n/a"
def update_climb(self, data):
if hasattr(data, "climb"):
return "%.3f %s" % (
data.climb * self.conversions.speedfactor,
self.conversions.speedunits)
else:
return "n/a"
def update_track(self, data):
if hasattr(data, "track"):
return gps.clienthelpers.deg_to_str(self.deg_type, abs(data.track))
else:
return "n/a"
def update_err(self, data, errtype):
if hasattr(data, errtype):
return "%.3f %s" % (
getattr(data, errtype) * self.conversions.altfactor,
self.conversions.altunits)
else:
return "n/a"
def update_err_speed(self, data, errtype):
if hasattr(data, errtype):
return "%.3f %s" % (
getattr(data, errtype) * self.conversions.speedfactor,
self.conversions.speedunits)
else:
return "n/a"
def update_err_degrees(self, data, errtype):
if hasattr(data, errtype):
return "%.3f °" % (getattr(data, errtype))
else:
return "n/a"
def update_status(self, data):
if data.mode == gps.MODE_2D:
status = "2D FIX"
elif data.mode == gps.MODE_3D:
status = "3D FIX"
else:
status = "NO FIX"
if hasattr(data, 'status') and data.status == gps.STATUS_DGPS_FIX:
status += " DIFF"
if data.mode != self.saved_mode:
self.last_transition = time.time()
self.saved_mode = data.mode
return status + " (%d secs)" % (time.time() - self.last_transition)
def update_gpsdata(self, tpv):
"Update the GPS data fields."
# the first 14 fields are updated using TPV data
for (hook, widget) in Base.gpsfields[:14]:
if hook: # Remove this guard when we have all hooks
widget.set_text(hook(self, tpv))
self.gsview.update(tpv)
def update_skyview(self, data):
"Update the satellite list and skyview."
if hasattr(data, 'satellites'):
satellites = data.satellites
for fld in reversed(SKY_VIEW_SORT_FIELDS):
rev = (fld[0] == '-')
if rev:
fld = fld[1:]
satellites = sorted(
satellites[:],
key=lambda x: x[fld], reverse=rev)
for (i, satellite) in enumerate(satellites):
self.set_satlist_field(i, 0, satellite.PRN)
self.set_satlist_field(i, 1, satellite.el)
self.set_satlist_field(i, 2, satellite.az)
self.set_satlist_field(i, 3, satellite.ss)
yesno = 'N'
if satellite.used:
yesno = 'Y'
self.set_satlist_field(i, 4, yesno)
for i in range(len(satellites), MAXCHANNELS):
for j in range(0, 5):
self.set_satlist_field(i, j, "")
self.skyview.redraw(satellites)
# Preferences
def set_units(self, system):
"Change the display units."
self.conversions = unit_adjustments(system)
def set_step_of_grid(self, system):
"Change the step of grid."
self.skyview.step_of_grid = system
# I/O monitoring and gtk housekeeping
def watch(self, daemon, device):
"Set up monitoring of a daemon instance."
self.daemon = daemon
self.device = device
GObject.io_add_watch(daemon.sock, GObject.IO_IN, self.handle_response)
GObject.io_add_watch(daemon.sock, GObject.IO_ERR, self.handle_hangup)
GObject.io_add_watch(daemon.sock, GObject.IO_HUP, self.handle_hangup)
def handle_response(self, source, condition):
"Handle ordinary I/O ready condition from the daemon."
if self.daemon.read() == -1:
self.handle_hangup(source, condition)
if self.daemon.valid & gps.PACKET_SET:
if ((self.device
and "device" in self.daemon.data
and self.device != self.daemon.data["device"])):
return True
self.rawdisplay.set_text(self.daemon.response.strip())
if self.daemon.data["class"] == "SKY":
self.update_skyview(self.daemon.data)
elif self.daemon.data["class"] == "TPV":
self.update_gpsdata(self.daemon.data)
elif self.daemon.data["class"] == "GST":
self.noiseview.update(self.daemon.data)
if not self.noise_latch:
self.noise_latch = True
self.uimanager.get_widget(
'/MenuBar/View/Noise').set_active(True)
self.noisebox.show()
elif self.daemon.data["class"] == "AIS":
self.aisview.update(self.daemon.data)
if not self.ais_latch:
self.ais_latch = True
self.uimanager.get_widget(
'/MenuBar/View/AIS').set_active(True)
self.aisbox.show()
return True
def handle_hangup(self, _source, _condition):
"Handle hangup condition from the daemon."
w = Gtk.MessageDialog(parent=self.window,
type=Gtk.MessageType.ERROR,
flags=Gtk.DialogFlags.DESTROY_WITH_PARENT,
buttons=Gtk.ButtonsType.CANCEL)
w.connect("destroy", lambda _unused: Gtk.main_quit())
w.set_markup("gpsd has stopped sending data.")
w.run()
Gtk.main_quit()
return True
def main(self):
Gtk.main()
if __name__ == "__main__":
try:
import getopt
(options, arguments) = getopt.getopt(sys.argv[1:], "D:hl:u:r:V?",
['verbose'])
debug = 0
degreefmt = 'd'
unit_system = None
rotate = 0.0
for (opt, val) in options:
if opt in '-D':
debug = int(val)
elif opt == '-l':
degreeformat = val
elif opt == '-u':
unit_system = val
elif opt == '-r':
try:
rotate = float(val)
except ValueError:
rotate = 0.0
elif opt in ('-?', '-h', '--help'):
print(__doc__)
sys.exit(0)
elif opt == 'V':
sys.stderr.write("xgps 1.0\n")
sys.exit(0)
degreefmt = {'d': gps.clienthelpers.deg_dd,
'm': gps.clienthelpers.deg_ddmm,
's': gps.clienthelpers.deg_ddmmss}[degreefmt]
(host, port, device) = ("localhost", gps.GPSD_PORT, None)
if len(arguments):
args = arguments[0].split(":")
if len(args) >= 1 and args[0]:
host = args[0]
if len(args) >= 2 and args[1]:
port = args[1]
if len(args) >= 3:
device = args[2]
target = ":".join(arguments[0:])
else:
target = ""
base = Base(deg_type=degreefmt, rotate=rotate, target=target)
base.set_units(unit_system)
try:
daemon = gps.gps(host=host,
port=port,
mode=gps.WATCH_ENABLE | gps.WATCH_JSON
| gps.WATCH_SCALED,
verbose=debug)
base.watch(daemon, device)
base.main()
except socket.error:
w = Gtk.MessageDialog(parent=base.window,
type=Gtk.MessageType.ERROR,
flags=Gtk.DialogFlags.DESTROY_WITH_PARENT,
buttons=Gtk.ButtonsType.CANCEL)
w.set_markup("gpsd is not running.")
w.run()
w.destroy()
except KeyboardInterrupt:
pass