Skip to content

Commit

Permalink
Merge pull request #45 from AndreHauschild/devel
Browse files Browse the repository at this point in the history
  • Loading branch information
hirokawa authored Mar 20, 2024
2 parents 78b901c + 071361f commit 88b2128
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 41 deletions.
13 changes: 10 additions & 3 deletions src/cssrlib/cssr_has.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,20 @@ def decode_head(self, msg, i, st=-1):
return head, i

def decode_cssr(self, msg, i=0):
if self.msgtype != 1: # only MT=1 defined
# Galileo HAS-SIS only defines MT=1, MT=4073 is CSSR message used in the
# Galileo HAS test dataset
#
if self.msgtype != 1 and self.msgtype != 4073:
print(f"invalid MT={self.msgtype}")
return False
# time of hour
# flags: mask,orbit,clock,clock subset,cbias,pbias,mask_id,iodset_id
self.toh, flags, res, mask_id, self.iodssr = \
bs.unpack_from('u12u6u4u5u5', msg, i)
try:
self.toh, flags, res, mask_id, self.iodssr = \
bs.unpack_from('u12u6u4u5u5', msg, i)
except:
print(f"invalid content={self.msgtype}")
return False
i += 32

if self.monlevel > 0 and self.fh is not None:
Expand Down
20 changes: 9 additions & 11 deletions src/cssrlib/cssrlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,6 @@ def decode_cssr_cbias(self, msg, i, inet=0):
self.lc[inet].cbias = {}
for k in range(nsat):
sat = self.sat_n[k]
sys, _ = sat2prn(sat)
self.lc[inet].cbias[sat] = {}
for j in range(len(self.sig_n[sat])):
rsig = self.sig_n[sat][j].toTyp(uTYP.C)
Expand All @@ -713,7 +712,6 @@ def decode_cssr_pbias(self, msg, i, inet=0):
self.lc[inet].di = {}
for k in range(nsat):
sat = self.sat_n[k]
sys, _ = sat2prn(sat)
self.lc[inet].pbias[sat] = {}
self.lc[inet].di[sat] = {}
for j in range(len(self.sig_n[sat])):
Expand Down Expand Up @@ -914,8 +912,8 @@ def decode_cssr_sinfo(self, msg, i):
def decode_cssr_comb(self, msg, i, inet=0):
"""decode MT4073,11 Orbit,Clock Combined Correction message """
head, i = self.decode_head(msg, i)
# if self.iodssr != head['iodssr']:
# return -1
if self.iodssr != head['iodssr']:
return -1
dfm = bs.unpack_from_dict('b1b1b1', ['orb', 'clk', 'net'], msg, i)
i += 3
self.flg_net = dfm['net']
Expand Down Expand Up @@ -1132,7 +1130,7 @@ def out_log(self):

if self.subtype == sCSSR.CLOCK:
self.fh.write(" {:s}\t{:s}\n".format("SatID", "dclk [m]"))
for k, sat_ in enumerate(self.lc[0].dclk.keys()):
for sat_ in self.lc[0].dclk.keys():
if np.isnan(self.lc[0].dclk[sat_]):
continue
self.fh.write(" {:s}\t{:8.4f}\n"
Expand All @@ -1143,7 +1141,7 @@ def out_log(self):
self.fh.write(" {:s}\t{:s}\t{:s}\t{:s}\t{:s}\n"
.format("SatID", "IODE", "Radial[m]",
"Along[m]", "Cross[m]"))
for k, sat_ in enumerate(self.lc[0].dorb.keys()):
for sat_ in self.lc[0].dorb.keys():
if np.isnan(self.lc[0].dorb[sat_][0]):
continue
self.fh.write(" {:s}\t{:3d}\t{:6.3f}\t{:6.3f}\t{:6.3f}\n"
Expand All @@ -1157,7 +1155,7 @@ def out_log(self):
self.fh.write(" {:s}\t{:s}\t{:s}\t{:s}\t{:s}\t{:s}\n"
.format("SatID", "IODE", "Radial[m]",
"Along[m]", "Cross[m]", "dclk[m]"))
for k, sat_ in enumerate(self.lc[0].dorb.keys()):
for sat_ in self.lc[0].dorb.keys():
if np.isnan(self.lc[0].dorb[sat_][0]) or \
np.isnan(self.lc[0].dclk[sat_]):
continue
Expand All @@ -1173,9 +1171,9 @@ def out_log(self):
elif self.subtype == sCSSR.CBIAS:
self.fh.write(" {:s}\t{:s}\t{:s}\t{:s}\n"
.format("SatID", "SigID", "Code Bias[m]", "..."))
for k, sat_ in enumerate(self.lc[0].cbias.keys()):
for sat_ in self.lc[0].cbias.keys():
self.fh.write(" {:s}\t".format(sat2id(sat_)))
for sig_ in range(self.lc[0].cbias[sat_].keys()):
for sig_ in self.lc[0].cbias[sat_].keys():
self.fh.write("{:s}\t{:5.2f}\t"
.format(sig_.str(),
self.lc[0].cbias[sat_][sig_]))
Expand All @@ -1184,9 +1182,9 @@ def out_log(self):
elif self.subtype == sCSSR.PBIAS:
self.fh.write(" {:s}\t{:s}\t{:s}\t{:s}\n"
.format("SatID", "SigID", "Phase Bias[m]", "..."))
for k, sat_ in enumerate(self.lc[0].pbias.keys()):
for sat_ in self.lc[0].pbias.keys():
self.fh.write(" {:s}\t".format(sat2id(sat_)))
for sig_ in range(self.lc[0].pbias[sat_].keys()):
for sig_ in self.lc[0].pbias[sat_].keys():
self.fh.write("{:s}\t{:5.2f}\t"
.format(sig_.str(),
self.lc[0].pbias[sat_][sig_]))
Expand Down
217 changes: 212 additions & 5 deletions src/cssrlib/ephemeris.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,20 @@ def geph2pos(time: gtime_t, geph: Geph, flg_v=False, TSTEP=1.0):
x = glorbit(tt, x, geph.acc)
t -= tt

rs = x[0:3]
vs = x[3:6]
rs = x[0:3]
vs = x[3:6]

if flg_v:
return rs, vs, dts

return rs, dts
else:
return rs, dts


def geph2clk(time: gtime_t, geph: Geph):
""" calculate GLONASS satellite clock offset based on ephemeris """
ts = timediff(time, geph.toe)
t = ts
for i in range(2):
for _ in range(2):
t = ts - (-geph.taun+geph.gamn*t)
return -geph.taun + geph.gamn*t

Expand Down Expand Up @@ -223,6 +223,213 @@ def eph2clk(time, eph):
return dts


def satpos(sat, t, nav, cs=None, orb=None):
"""
Calculate pos/vel/clk for single satellite
The satellite position, velocity and clock offset are computed at epoch.
The satellite clock is already corrected for the relativistic effects. The
satellite health indicator is extracted from the broadcast navigation
message.
Parameters
----------
sat :
satellite ID
t : time_t()
epoch
nav : Nav()
contains coarse satellite orbit and clock offset information
cs : cssr_has()
contains precise SSR corrections for satellite orbit and clock offset
obs : peph()
contains precise satellite orbit and clock offset information
Returns
-------
rs : np.array() of float
satellite position in ECEF [m]
vs : np.array() of float
satellite velocity in ECEF [m/s]
dts : np.array() of float
satellite clock offset [s]
svh : np.array() of int
satellite health code [-]
"""

n = 1
rs = np.zeros((n, 3))
vs = np.zeros((n, 3))
dts = np.zeros(n)
svh = np.zeros(n, dtype=int)
iode = -1

i = 0
sys, _ = sat2prn(sat)

if nav.ephopt == 4:

rs_, dts_, _ = orb.peph2pos(t, sat, nav)
if rs_ is None or dts_ is None or np.isnan(dts_[0]):
return rs, vs, dts, svh
dt = dts_[0]

if len(nav.eph) > 0:
eph = findeph(nav.eph, t, sat)
if eph is None:
svh[i] = 1
return rs, vs, dts, svh
svh[i] = eph.svh
else:
svh[i] = 0

else:

if cs is not None:

if cs.iodssr >= 0 and cs.iodssr_c[sCType.ORBIT] == cs.iodssr:
if sat not in cs.sat_n:
return rs, vs, dts, svh
elif cs.iodssr_p >= 0 and \
cs.iodssr_c[sCType.ORBIT] == cs.iodssr_p:
if sat not in cs.sat_n_p:
return rs, vs, dts, svh
else:
return rs, vs, dts, svh

if sat not in cs.lc[0].iode.keys():
return rs, vs, dts, svh

iode = cs.lc[0].iode[sat]
dorb = cs.lc[0].dorb[sat] # radial,along-track,cross-track

if cs.cssrmode in (sc.PVS_PPP, sc.SBAS_L1, sc.SBAS_L5):
dorb += cs.lc[0].dvel[sat] * \
(timediff(t, cs.lc[0].t0[sat][sCType.ORBIT]))

if cs.cssrmode == sc.BDS_PPP: # consistency check for IOD corr

if cs.lc[0].iodc[sat] == cs.lc[0].iodc_c[sat]:
dclk = cs.lc[0].dclk[sat]
else:
if cs.lc[0].iodc[sat] == cs.lc[0].iodc_c_p[sat]:
dclk = cs.lc[0].dclk_p[sat]
else:
return rs, vs, dts, svh

else:

if cs.cssrmode == sc.GAL_HAS_SIS: # HAS only
if cs.mask_id != cs.mask_id_clk: # mask has changed
if sat not in cs.sat_n_p:
return rs, vs, dts, svh
else:
if cs.iodssr_c[sCType.CLOCK] == cs.iodssr:
if sat not in cs.sat_n:
return rs, vs, dts, svh
else:
if cs.iodssr_c[sCType.CLOCK] == cs.iodssr_p:
if sat not in cs.sat_n_p:
return rs, vs, dts, svh
else:
return rs, vs, dts, svh

dclk = cs.lc[0].dclk[sat]

if cs.lc[0].cstat & (1 << sCType.HCLOCK) and \
sat in cs.lc[0].hclk.keys() and \
not np.isnan(cs.lc[0].hclk[sat]):
dclk += cs.lc[0].hclk[sat]

if cs.cssrmode in (sc.PVS_PPP, sc.SBAS_L1, sc.SBAS_L5):
dclk += cs.lc[0].ddft[sat] * \
(timediff(t, cs.lc[0].t0[sat][sCType.CLOCK]))

if np.isnan(dclk) or np.isnan(dorb@dorb):
return rs, vs, dts, svh

mode = cs.nav_mode[sys]

else:

mode = 0

if sys == uGNSS.GLO:
geph = findeph(nav.geph, t, sat, iode, mode=mode)
if geph is None:
svh[i] = 1
return rs, vs, dts, svh

svh[i] = geph.svh
dt = geph2clk(t, geph)

if sat not in nav.glo_ch:
nav.glo_ch[sat] = geph.frq

else:
eph = findeph(nav.eph, t, sat, iode, mode=mode)
if eph is None:
svh[i] = 1
return rs, vs, dts, svh

svh[i] = eph.svh
dt = eph2clk(t, eph)

t = timeadd(t, -dt)

if nav.ephopt == 4: # precise ephemeris

rs_, dts_, _ = orb.peph2pos(t, sat, nav)
rs[i, :] = rs_[0: 3]
vs[i, :] = rs_[3: 6]
dts[i] = dts_[0]

else:

if sys == uGNSS.GLO:
rs[i, :], vs[i, :], dts[i] = geph2pos(t, geph, True)
else:
rs[i, :], vs[i, :], dts[i] = eph2pos(t, eph, True)

# Apply SSR correction
#
if cs is not None:

if cs.cssrmode == sc.BDS_PPP:
er = vnorm(rs[i, :])
rc = np.cross(rs[i, :], vs[i, :])
ec = vnorm(rc)
ea = np.cross(ec, er)
A = np.array([er, ea, ec])
else:
ea = vnorm(vs[i, :])
rc = np.cross(rs[i, :], vs[i, :])
ec = vnorm(rc)
er = np.cross(ea, ec)
A = np.array([er, ea, ec])

if cs.cssrmode in (sc.PVS_PPP, sc.SBAS_L1, sc.SBAS_L5):
dorb_e = dorb
else:
dorb_e = dorb@A

rs[i, :] -= dorb_e
dts[i] += dclk/rCST.CLIGHT

if cs.cssrmode in (sc.PVS_PPP, sc.SBAS_L1, sc.SBAS_L5,
sc.DGPS):
dts[i] -= eph.tgd

elif nav.smode == 1 and nav.nf == 1: # standalone positioning
dts[i] -= eph.tgd

if cs is not None:
if sat in cs.lc[0].t0 and sCType.ORBIT in cs.lc[0].t0[sat]:
nav.time_p = cs.lc[0].t0[sat][sCType.ORBIT]

return rs, vs, dts, svh


def satposs(obs, nav, cs=None, orb=None):
"""
Calculate pos/vel/clk for observed satellites
Expand Down
Loading

0 comments on commit 88b2128

Please sign in to comment.