.. |br| raw:: html
.. _ex_pp:
Point Perfect
=============
PointPerfect is an advanced GNSS augmentation data service designed from the ground up to be accurate, reliable, and immediately available. |br|
The service answers the fast-growing demand for high precision GNSS solutions: UAV, service robots, machinery automation, AD and ADAS, lane-accurate navigation, and telematics.
This example application connects to the PointPerfect service and runs a callback function everytime it receives a message. |br|
The callback function prints the "callback" word, message topic, message length and 10 first digits of the message payload.
.. note:: Make sure to adapt the network connection to yours (fixed IP or DHCP) by setting the Parameters class variables. |br| You need to download from u-blox Thingstream portal the credentials in JSON format and save them on the SBC memory without changing the name. |br| Please enter your PointPerfect credentials id on the pp_id parameter and set the topic on the topic variable.
::
import time
import network
import uasyncio as asyncio
import sbc
import asmqtt
import ujson
class EGM:
def __init__(self):
self.par = self.Parameters()
class Parameters:
def __init__(self):
self.uart_baud_rate = '115200'
self.lan_dhcp = True
self.lan_ip = '10.0.0.1'
self.lan_sn = '255.255.255.0'
self.lan_gw = '10.0.0.2'
self.lan_dns = '8.8.8.8'
self.pp_active = True
self.pp_srv = 'pp.services.u-blox.com'
self.pp_id = '02e7321d-f6fe-41a2-b493-92996c0a4e20'
self.pp_topic = '/pp/ip/eu'
def connectNetwork():
try:
lan.active(True)
if(not egm.par.lan_dhcp):
lan.ifconfig((egm.par.lan_ip, egm.par.lan_sn, egm.par.lan_gw, egm.par.lan_dns))
while(lan.status() != 3):
print('Connecting to LAN')
time.sleep(1)
print('LAN connected, configuration (ip, subnet, gateway, dns): ', lan.ifconfig())
return(1)
except:
print("Check Ethernet cable is properly plugged")
return(0)
lan = network.LAN()
egm = EGM()
while(connectNetwork() == 0):
time.sleep(1)
def is_connected(lan):
status = lan.phy_status()
auto_negotiation_complete = (status>>5)&0x01
link_up = (status>>2)&0x01
return auto_negotiation_complete and link_up
async def task_network(lan):
while(True):
if(not is_connected(lan)):
print("not is_connected(lan)")
while(not is_connected(lan)):
while(connectNetwork() == 0):
await asyncio.sleep(1)
else:
print("is_connected(lan)")
while(is_connected(lan)):
await asyncio.sleep(1)
await asyncio.sleep(1)
async def task_point_perfect(client_id, server, port, key, cert, topic):
ssl_params={"key" : key, "cert": cert, "server_side":False, "server_hostname":server, "do_handshake": True}
mqtt = asmqtt.asMQTTClient(client_id, server, port=port, ssl=True, ssl_params=ssl_params)
async def callback(topic, msg):
print("callback", topic, len(msg), msg[0:10])
print(msg)
mqtt.set_callback(callback)
await mqtt.connect()
await mqtt.subscribe(b"/pp/ubx/0236/ip")
await mqtt.subscribe(topic)
while(True):
await mqtt.wait_msg()
await asyncio.sleep(0.1)
await mqtt.disconnect()
loop = asyncio.new_event_loop()
loop.create_task(task_network(lan))
try:
fl = open('device-'+egm.par.pp_id+'-ucenter-config.json', "rb")
pp_json_credentials = ujson.loads(fl.read())
fl.close()
pp_key = sbc.cert_tools.pem2der(pp_json_credentials['MQTT']['Connectivity']['ClientCredentials']['Key'])
pp_cert = sbc.cert_tools.crt2der(pp_json_credentials['MQTT']['Connectivity']['ClientCredentials']['Cert'])
loop.create_task(task_point_perfect(egm.par.pp_id, egm.par.pp_srv, 8883, pp_key, pp_cert, egm.par.pp_topic))
print("Point Perfect initialized")
except Exception as e:
print('Problem starting Point Perfect, exception: ', e)
try: print('Point Perfect server response: ' + str(response.content))
except: pass
loop.run_forever()