Skip to content

Commit

Permalink
- OSNMA rearrange buffer.
Browse files Browse the repository at this point in the history
  • Loading branch information
hirokawa committed Jan 3, 2025
1 parent 73e0514 commit 47aa291
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 110 deletions.
161 changes: 99 additions & 62 deletions src/cssrlib/osnma.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@
"""

import os
import copy
import numpy as np
import bitstruct.c as bs
from cryptography.hazmat.primitives import hashes, hmac, cmac
from cryptography.hazmat.primitives import hashes, hmac, cmac, serialization
from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.exceptions import InvalidSignature
from cryptography.x509 import load_pem_x509_certificate

from binascii import unhexlify, hexlify
from enum import IntEnum
import xml.etree.ElementTree as et
from cssrlib.gnss import gpst2time, time2gst, copy_buff
from cssrlib.qznma import raw2der, load_pubkey
from cssrlib.gnss import gpst2time, time2gst, copy_buff, prn2sat, uGNSS


class uOSNMA(IntEnum):
Expand All @@ -39,6 +41,51 @@ class uOSNMA(IntEnum):
PKR_UPDATED = 32


class uCert(IntEnum):
""" class for type and format of Certification """
X509_CRT = 1
PEM = 2
DER = 3


def load_pubkey(pubk_path):
""" load public key information in crt/pem/der format """
ext = pubk_path.split('.')[-1]
if ext == 'crt':
pk_fmt = uCert.X509_CRT
mode = 'rt'
elif ext == 'pem':
pk_fmt = uCert.PEM
mode = 'rt'
elif ext == 'der':
pk_fmt = uCert.DER
mode = 'rb'
else:
return None

with open(pubk_path, mode) as f:
pubk = f.read()
if pk_fmt == uCert.X509_CRT:
pk = load_pem_x509_certificate(pubk.encode()).public_key()
elif pk_fmt == uCert.PEM:
pk = serialization.load_pem_public_key(pubk.encode())
elif pk_fmt == uCert.DER:
pk = serialization.load_der_public_key(pubk)

return pk


def raw2der(ds):
""" convert digital signature in raw-format to der-format """
lds = len(ds)
ln = lds//2

r = int.from_bytes(ds[:ln], byteorder='big')
s = int.from_bytes(ds[ln:], byteorder='big')
der = utils.encode_dss_signature(r, s)
return bytes(der)


class taginfo():
""" class to store tag """
gst_sf = bytearray(4)
Expand Down Expand Up @@ -141,10 +188,6 @@ class osnma():
mack_c = bytearray(60*GALMAX) # current MACK
tag_list = []

subfrm_n = None
subfrm_p = None
subfrm = None

# Merkle tree root (received from GSC OSNMA server)
root_mt = -1
# Public key list
Expand Down Expand Up @@ -221,9 +264,9 @@ def __init__(self, mt_file=None, logfile='log_osnma.txt'):

self.flg_dsm = {}

self.subfrm_n = bytearray(16*10*self.GALMAX)
self.subfrm_p = bytearray(16*10*self.GALMAX)
self.subfrm = bytearray(16*10*self.GALMAX)
self.subfrm_n = {}
self.subfrm_p = {}
self.subfrm = {}

self.prn_ref = -1
self.tag_list = []
Expand Down Expand Up @@ -575,20 +618,8 @@ def decode_mack(self, prn):
self.key_c = mack_c[i0:i0+lk//8]
return True

def decode_nav(self, df):
""" decode navigation message """
p1, t1, mt = bs.unpack_from('u1u1u6', df, 0)
p2, t2 = bs.unpack_from('u1u1', df, 120)
if p1 != 0 or p2 != 1 or t1 == 1 or t2 == 1:
return False
if mt > 0 and mt <= 10:
j = (mt-1)*16*8
copy_buff(df, self.subfrm, 2, j, 112)
copy_buff(df, self.subfrm, 122, j+112, 16)
return True

def load_inav(self, msg):
""" load I/NAV navigation message """
def load_gal_inav(self, msg):
""" load Galileo I/NAV navigation message """

# even page (120bit) + odd page (120bit)
# even page even/odd(1b)+page type(1b)+data1(112b)+tail(6)
Expand All @@ -609,63 +640,67 @@ def load_inav(self, msg):

return nav, nma_b

def load_nav(self, nav, prn, tow):
""" load navigation message into subframe buffer """
def save_gal_inav(self, nav, prn, tow):
""" store I/NAV navigation message into subframe buffer """
mt = bs.unpack_from('u6', nav, 0)[0]
j0 = (prn-1)*160
sat = prn2sat(uGNSS.GAL, prn)

if sat not in self.subfrm.keys():
self.subfrm[sat] = bytearray(160)
self.subfrm_n[sat] = bytearray(160)
self.subfrm_p[sat] = bytearray(160)

if tow % 30 == 1:
self.subfrm[j0:j0+160] = self.subfrm_p[j0:j0+160]
self.subfrm_p[j0:j0+160] = self.subfrm_n[j0:j0+160]
self.subfrm[sat] = copy.copy(self.subfrm_p[sat])
self.subfrm_p[sat] = copy.copy(self.subfrm_n[sat])

if mt > 0 and mt <= 10:
j = j0+(mt-1)*16
self.subfrm_n[j:j+16] = nav
j = (mt-1)*16
self.subfrm_n[sat][j:j+16] = nav

return True

def gen_navmsg(self, prn):
""" generate nav message for nma """
def gen_gal_inavmsg(self, prn):
""" generate Galileo I/NAV message for nma """
if prn < 1 or prn > self.GALMAX:
return None
j0 = 160*8*(prn-1)
sat = prn2sat(uGNSS.GAL, prn)
if sat not in self.subfrm.keys():
return None

buff = self.subfrm[sat]
for k in range(5):
mt = bs.unpack_from('u6', self.subfrm, j0+k*16*8)[0]
mt = bs.unpack_from('u6', buff, k*16*8)[0]
if mt != k+1:
return None

iodnav1 = bs.unpack_from('u10', self.subfrm, j0+6+0*16*8)[0]
iodnav1 = bs.unpack_from('u10', buff, 6+0*16*8)[0]
for k in range(1, 4):
iodnav_ = bs.unpack_from('u10', self.subfrm, j0+6+k*16*8)[0]
iodnav_ = bs.unpack_from('u10', buff, 6+k*16*8)[0]
if iodnav_ != iodnav1:
if self.monlevel > 0:
print("error: iodnav mismatch mt=%d %d %d" %
(k+1, iodnav1, iodnav_))
return None

if self.monlevel > 1:
svid = bs.unpack_from('u6', self.subfrm, j0+16+3*16*8)[0]
wn, tow = bs.unpack_from('u12u20', self.subfrm, j0+73+4*16*8)
print(f" svid={svid:2d} iodnav={iodnav1:2d} "
f"gst_wn={wn:4d} gst_tow={tow:6d}")

msg = bytearray(69)
# 549b MT1 120b, MT2 120b, MT3 122b, MT4 120b, MT5 67b
blen_t = [120, 120, 122, 120, 67]
j = 0
for mt in range(5):
copy_buff(self.subfrm, msg, j0+6+mt*16*8, j, blen_t[mt])
copy_buff(buff, msg, 6+mt*16*8, j, blen_t[mt])
j += blen_t[mt]

return msg

def gen_utcmsg(self):
""" generate utc message for nma """
j0 = 160*(self.prn_a-1)
i0 = 5*16+j0
mt6 = self.subfrm[i0:i0+16]
i0 = 9*16+j0
mt10 = self.subfrm[i0:i0+16]
def gen_gal_utcmsg(self):
""" generate Galileo utc message for nma """
sat = prn2sat(uGNSS.GAL, self.prn_a)
if sat not in self.subfrm.keys():
return None
buff = self.subfrm[sat]

i0 = 5*16
mt6 = buff[i0:i0+16]
i0 = 9*16
mt10 = buff[i0:i0+16]

t1 = bs.unpack_from('u6', mt6, 0)[0] # MT6
t2 = bs.unpack_from('u6', mt10, 0)[0] # MT10
Expand Down Expand Up @@ -881,26 +916,28 @@ def decode(self, nma_b, wn, tow, prn):
tag, prn_d, adkd, cop = self.decode_tags_info(k)
if adkd == 12 and self.flg_slowmac:
# delayed tag loading
navmsg = self.gen_navmsg(prn_d)
navmsg = self.gen_gal_inavmsg(prn_d)
tag_ = taginfo(self.gst_sf_p, prn_d,
self.prn_a, adkd, cop,
tag, ctr, navmsg)
self.tag_list.append(tag_)

if adkd == 12:
print(f"{ctr} prn_d={prn_d} adkd={adkd} slow-MAC "
"is skipped")
if self.monlevel > 0:
print(f"{ctr} prn_d={prn_d} adkd={adkd} slow-MAC "
"is skipped")
continue
elif adkd == 0:
navmsg = self.gen_navmsg(prn_d)
navmsg = self.gen_gal_inavmsg(prn_d)
elif adkd == 4:
navmsg = self.gen_utcmsg()
navmsg = self.gen_gal_utcmsg()
else:
navmsg = None

if navmsg is None:
print(f"{ctr} prn_d={prn_d} adkd={adkd} navmsg is "
"not available.")
if self.monlevel > 0:
print(f"{ctr} prn_d={prn_d} adkd={adkd} navmsg is "
"not available.")
continue

tag_ = taginfo(self.gst_sf_p, prn_d, self.prn_a,
Expand Down
52 changes: 4 additions & 48 deletions src/cssrlib/qznma.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
@author: Rui Hirokawa
"""

import copy
from binascii import unhexlify, hexlify
import numpy as np
import bitstruct.c as bs
from cssrlib.gnss import uGNSS, prn2sat, sat2prn, copy_buff
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.x509 import load_pem_x509_certificate
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from enum import IntEnum
import copy
from cssrlib.osnma import raw2der, load_pubkey

dtype_ = [('wn', 'int'), ('tow', 'float'), ('prn', 'int'),
('type', 'int'), ('len', 'int'), ('nav', 'S512')]
Expand All @@ -41,12 +41,6 @@ class uNavId(IntEnum):
GAL_INAV = 5


class uCert(IntEnum):
X509_CRT = 1
PEM = 2
DER = 3


class NavMsg():
""" class to store the navigation message """
sys = 0
Expand Down Expand Up @@ -92,44 +86,6 @@ def __init__(self, keyid, ds, salt, mt=0, nid=0, rtow=0, svid=0,
self.salt = salt


def load_pubkey(pubk_path):
""" load public key information in crt/pem/der format """
ext = pubk_path.split('.')[-1]
if ext == 'crt':
pk_fmt = uCert.X509_CRT
mode = 'rt'
elif ext == 'pem':
pk_fmt = uCert.PEM
mode = 'rt'
elif ext == 'der':
pk_fmt = uCert.DER
mode = 'rb'
else:
return None

with open(pubk_path, mode) as f:
pubk = f.read()
if pk_fmt == uCert.X509_CRT:
pk = load_pem_x509_certificate(pubk.encode()).public_key()
elif pk_fmt == uCert.PEM:
pk = serialization.load_pem_public_key(pubk.encode())
elif pk_fmt == uCert.DER:
pk = serialization.load_der_public_key(pubk)

return pk


def raw2der(ds):
""" convert digital signature in raw-format to der-format """
lds = len(ds)
ln = lds//2

r = int.from_bytes(ds[:ln], byteorder='big')
s = int.from_bytes(ds[ln:], byteorder='big')
der = utils.encode_dss_signature(r, s)
return bytes(der)


class qznma():
""" class for QZNMA processing """

Expand Down

0 comments on commit 47aa291

Please sign in to comment.