Point Perfect

PointPerfect is an advanced GNSS augmentation data service designed from the ground up to be accurate, reliable, and immediately available.
The service answers the fast-growing demand for high precision GNSS solutions: UAV, service robots, machinery automation, AD and ADAS, lane-accurate navigation, and telematics.

This example application connects to the PointPerfect service and runs a callback function everytime it receives a message.
The callback function prints the “callback” word, message topic, message length and 10 first digits of the message payload.

Note

Make sure to adapt the network connection to yours (fixed IP or DHCP) by setting the Parameters class variables.
You need to download from u-blox Thingstream portal the credentials in JSON format and save them on the SBC memory without changing the name.
Please enter your PointPerfect credentials id on the pp_id parameter and set the topic on the topic variable.

import time
import network
import uasyncio as asyncio
import sbc
import asmqtt
import ujson

class EGM:
        def __init__(self):
                self.par = self.Parameters()

        class Parameters:
                def __init__(self):
                        self.uart_baud_rate = '115200'
                        self.lan_dhcp = True
                        self.lan_ip = '10.0.0.1'
                        self.lan_sn = '255.255.255.0'
                        self.lan_gw = '10.0.0.2'
                        self.lan_dns = '8.8.8.8'
                        self.pp_active = True
                        self.pp_srv = 'pp.services.u-blox.com'
                        self.pp_id = '02e7321d-f6fe-41a2-b493-92996c0a4e20'
                        self.pp_topic = '/pp/ip/eu'

def connectNetwork():
        try:
                lan.active(True)
                if(not egm.par.lan_dhcp):
                        lan.ifconfig((egm.par.lan_ip, egm.par.lan_sn, egm.par.lan_gw, egm.par.lan_dns))
                while(lan.status() != 3):
                        print('Connecting to LAN')
                        time.sleep(1)
                print('LAN connected, configuration (ip, subnet, gateway, dns): ', lan.ifconfig())
                return(1)
        except:
                print("Check Ethernet cable is properly plugged")
                return(0)

lan = network.LAN()
egm = EGM()

while(connectNetwork() == 0):
        time.sleep(1)

def is_connected(lan):
        status = lan.phy_status()
        auto_negotiation_complete = (status>>5)&0x01
        link_up = (status>>2)&0x01
        return auto_negotiation_complete and link_up

async def task_network(lan):
        while(True):
                if(not is_connected(lan)):
                        print("not is_connected(lan)")
                        while(not is_connected(lan)):
                                while(connectNetwork() == 0):
                                        await asyncio.sleep(1)
                else:
                        print("is_connected(lan)")
                        while(is_connected(lan)):
                                await asyncio.sleep(1)
                await asyncio.sleep(1)

async def task_point_perfect(client_id, server, port, key, cert, topic):
        ssl_params={"key" : key, "cert": cert, "server_side":False, "server_hostname":server, "do_handshake": True}
        mqtt = asmqtt.asMQTTClient(client_id, server, port=port, ssl=True, ssl_params=ssl_params)

        async def callback(topic, msg):
                print("callback", topic, len(msg), msg[0:10])
                print(msg)

        mqtt.set_callback(callback)

        await mqtt.connect()
        await mqtt.subscribe(b"/pp/ubx/0236/ip")
        await mqtt.subscribe(topic)

        while(True):
                await mqtt.wait_msg()
                await asyncio.sleep(0.1)

        await mqtt.disconnect()

loop = asyncio.new_event_loop()
loop.create_task(task_network(lan))

try:
        fl = open('device-'+egm.par.pp_id+'-ucenter-config.json', "rb")
        pp_json_credentials = ujson.loads(fl.read())
        fl.close()
        pp_key = sbc.cert_tools.pem2der(pp_json_credentials['MQTT']['Connectivity']['ClientCredentials']['Key'])
        pp_cert = sbc.cert_tools.crt2der(pp_json_credentials['MQTT']['Connectivity']['ClientCredentials']['Cert'])

        loop.create_task(task_point_perfect(egm.par.pp_id, egm.par.pp_srv, 8883, pp_key, pp_cert, egm.par.pp_topic))
        print("Point Perfect initialized")
except Exception as e:
        print('Problem starting Point Perfect, exception: ', e)
        try: print('Point Perfect server response: ' + str(response.content))
        except: pass

loop.run_forever()