.. |br| raw:: html
.. _ex_pp: Point Perfect ============= PointPerfect is an advanced GNSS augmentation data service designed from the ground up to be accurate, reliable, and immediately available. |br| The service answers the fast-growing demand for high precision GNSS solutions: UAV, service robots, machinery automation, AD and ADAS, lane-accurate navigation, and telematics. This example application connects to the PointPerfect service and runs a callback function everytime it receives a message. |br| The callback function prints the "callback" word, message topic, message length and 10 first digits of the message payload. .. note:: Make sure to adapt the network connection to yours (fixed IP or DHCP) by setting the Parameters class variables. |br| You need to download from u-blox Thingstream portal the credentials in JSON format and save them on the SBC memory without changing the name. |br| Please enter your PointPerfect credentials id on the pp_id parameter and set the topic on the topic variable. :: import time import network import uasyncio as asyncio import sbc import asmqtt import ujson class EGM: def __init__(self): self.par = self.Parameters() class Parameters: def __init__(self): self.uart_baud_rate = '115200' self.lan_dhcp = True self.lan_ip = '10.0.0.1' self.lan_sn = '255.255.255.0' self.lan_gw = '10.0.0.2' self.lan_dns = '8.8.8.8' self.pp_active = True self.pp_srv = 'pp.services.u-blox.com' self.pp_id = '02e7321d-f6fe-41a2-b493-92996c0a4e20' self.pp_topic = '/pp/ip/eu' def connectNetwork(): try: lan.active(True) if(not egm.par.lan_dhcp): lan.ifconfig((egm.par.lan_ip, egm.par.lan_sn, egm.par.lan_gw, egm.par.lan_dns)) while(lan.status() != 3): print('Connecting to LAN') time.sleep(1) print('LAN connected, configuration (ip, subnet, gateway, dns): ', lan.ifconfig()) return(1) except: print("Check Ethernet cable is properly plugged") return(0) lan = network.LAN() egm = EGM() while(connectNetwork() == 0): time.sleep(1) def is_connected(lan): status = lan.phy_status() auto_negotiation_complete = (status>>5)&0x01 link_up = (status>>2)&0x01 return auto_negotiation_complete and link_up async def task_network(lan): while(True): if(not is_connected(lan)): print("not is_connected(lan)") while(not is_connected(lan)): while(connectNetwork() == 0): await asyncio.sleep(1) else: print("is_connected(lan)") while(is_connected(lan)): await asyncio.sleep(1) await asyncio.sleep(1) async def task_point_perfect(client_id, server, port, key, cert, topic): ssl_params={"key" : key, "cert": cert, "server_side":False, "server_hostname":server, "do_handshake": True} mqtt = asmqtt.asMQTTClient(client_id, server, port=port, ssl=True, ssl_params=ssl_params) async def callback(topic, msg): print("callback", topic, len(msg), msg[0:10]) print(msg) mqtt.set_callback(callback) await mqtt.connect() await mqtt.subscribe(b"/pp/ubx/0236/ip") await mqtt.subscribe(topic) while(True): await mqtt.wait_msg() await asyncio.sleep(0.1) await mqtt.disconnect() loop = asyncio.new_event_loop() loop.create_task(task_network(lan)) try: fl = open('device-'+egm.par.pp_id+'-ucenter-config.json', "rb") pp_json_credentials = ujson.loads(fl.read()) fl.close() pp_key = sbc.cert_tools.pem2der(pp_json_credentials['MQTT']['Connectivity']['ClientCredentials']['Key']) pp_cert = sbc.cert_tools.crt2der(pp_json_credentials['MQTT']['Connectivity']['ClientCredentials']['Cert']) loop.create_task(task_point_perfect(egm.par.pp_id, egm.par.pp_srv, 8883, pp_key, pp_cert, egm.par.pp_topic)) print("Point Perfect initialized") except Exception as e: print('Problem starting Point Perfect, exception: ', e) try: print('Point Perfect server response: ' + str(response.content)) except: pass loop.run_forever()