Point Perfect
PointPerfect is an advanced GNSS augmentation data service designed from the ground up to be accurate, reliable, and immediately available.
The service answers the fast-growing demand for high precision GNSS solutions: UAV, service robots, machinery automation, AD and ADAS, lane-accurate navigation, and telematics.
This example application connects to the PointPerfect service and runs a callback function everytime it receives a message.
The callback function prints the “callback” word, message topic, message length and 10 first digits of the message payload.
Note
Make sure to adapt the network connection to yours (fixed IP or DHCP) by setting the Parameters class variables.
You need to download from u-blox Thingstream portal the credentials in JSON format and save them on the SBC memory without changing the name.
Please enter your PointPerfect credentials id on the pp_id parameter and set the topic on the topic variable.
import time
import network
import uasyncio as asyncio
import sbc
import asmqtt
import ujson
class EGM:
def __init__(self):
self.par = self.Parameters()
class Parameters:
def __init__(self):
self.uart_baud_rate = '115200'
self.lan_dhcp = True
self.lan_ip = '10.0.0.1'
self.lan_sn = '255.255.255.0'
self.lan_gw = '10.0.0.2'
self.lan_dns = '8.8.8.8'
self.pp_active = True
self.pp_srv = 'pp.services.u-blox.com'
self.pp_id = '02e7321d-f6fe-41a2-b493-92996c0a4e20'
self.pp_topic = '/pp/ip/eu'
def connectNetwork():
try:
lan.active(True)
if(not egm.par.lan_dhcp):
lan.ifconfig((egm.par.lan_ip, egm.par.lan_sn, egm.par.lan_gw, egm.par.lan_dns))
while(lan.status() != 3):
print('Connecting to LAN')
time.sleep(1)
print('LAN connected, configuration (ip, subnet, gateway, dns): ', lan.ifconfig())
return(1)
except:
print("Check Ethernet cable is properly plugged")
return(0)
lan = network.LAN()
egm = EGM()
while(connectNetwork() == 0):
time.sleep(1)
def is_connected(lan):
status = lan.phy_status()
auto_negotiation_complete = (status>>5)&0x01
link_up = (status>>2)&0x01
return auto_negotiation_complete and link_up
async def task_network(lan):
while(True):
if(not is_connected(lan)):
print("not is_connected(lan)")
while(not is_connected(lan)):
while(connectNetwork() == 0):
await asyncio.sleep(1)
else:
print("is_connected(lan)")
while(is_connected(lan)):
await asyncio.sleep(1)
await asyncio.sleep(1)
async def task_point_perfect(client_id, server, port, key, cert, topic):
ssl_params={"key" : key, "cert": cert, "server_side":False, "server_hostname":server, "do_handshake": True}
mqtt = asmqtt.asMQTTClient(client_id, server, port=port, ssl=True, ssl_params=ssl_params)
async def callback(topic, msg):
print("callback", topic, len(msg), msg[0:10])
print(msg)
mqtt.set_callback(callback)
await mqtt.connect()
await mqtt.subscribe(b"/pp/ubx/0236/ip")
await mqtt.subscribe(topic)
while(True):
await mqtt.wait_msg()
await asyncio.sleep(0.1)
await mqtt.disconnect()
loop = asyncio.new_event_loop()
loop.create_task(task_network(lan))
try:
fl = open('device-'+egm.par.pp_id+'-ucenter-config.json', "rb")
pp_json_credentials = ujson.loads(fl.read())
fl.close()
pp_key = sbc.cert_tools.pem2der(pp_json_credentials['MQTT']['Connectivity']['ClientCredentials']['Key'])
pp_cert = sbc.cert_tools.crt2der(pp_json_credentials['MQTT']['Connectivity']['ClientCredentials']['Cert'])
loop.create_task(task_point_perfect(egm.par.pp_id, egm.par.pp_srv, 8883, pp_key, pp_cert, egm.par.pp_topic))
print("Point Perfect initialized")
except Exception as e:
print('Problem starting Point Perfect, exception: ', e)
try: print('Point Perfect server response: ' + str(response.content))
except: pass
loop.run_forever()