Skip to content

Commit

Permalink
Rsdk-7332: slow position reporting (viamrobotics#3856)
Browse files Browse the repository at this point in the history
  • Loading branch information
susmitaSanyal authored May 2, 2024
1 parent 0491b8e commit 4794a92
Showing 1 changed file with 29 additions and 59 deletions.
88 changes: 29 additions & 59 deletions components/movementsensor/gpsrtkserial/gpsrtkserial.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,26 @@ type rtkSerial struct {
cancelFunc func()

activeBackgroundWorkers sync.WaitGroup
mu sync.Mutex

ntripClient *gpsutils.NtripInfo
isConnectedToNtrip bool
isClosed bool

err movementsensor.LastError
lastposition movementsensor.LastPosition
lastcompassheading movementsensor.LastCompassHeading
InputProtocol string
isClosed bool

mu sync.Mutex

cachedData *gpsutils.CachedData
correctionWriter io.ReadWriteCloser
writePath string
wbaud int
isVirtualBase bool
readerWriter *bufio.ReadWriter
writer io.Writer
reader io.Reader
// everything below this comment is protected by mu
isConnectedToNtrip bool
ntripClient *gpsutils.NtripInfo
cachedData *gpsutils.CachedData
correctionWriter io.ReadWriteCloser
writePath string
wbaud int
isVirtualBase bool
readerWriter *bufio.ReadWriter
writer io.Writer
reader io.Reader
}

// Reconfigure reconfigures attributes.
Expand Down Expand Up @@ -229,19 +230,15 @@ func (g *rtkSerial) start() error {

// getStream attempts to connect to ntrip stream. We give up after maxAttempts unsuccessful tries.
func (g *rtkSerial) getStream(mountPoint string, maxAttempts int) error {
g.mu.Lock()
defer g.mu.Unlock()
success := false
attempts := 0

var rc io.ReadCloser
var err error

g.logger.Debug("Getting NTRIP stream")

for attempts := 0; attempts < maxAttempts; attempts++ {
if g.isClosed {
return g.err.Get()
}

for !success && attempts < maxAttempts {
select {
case <-g.cancelCtx.Done():
return errors.New("Canceled")
Expand All @@ -252,24 +249,22 @@ func (g *rtkSerial) getStream(mountPoint string, maxAttempts int) error {
return g.ntripClient.Client.GetStream(mountPoint)
}()
if err == nil {
g.logger.Debug("Connected to stream")

g.ntripClient.Stream = rc
return g.err.Get()
success = true
}
attempts++
}
// If we get here, we had errors on every connection attempt.

// Errors about the old ICY protocol are not "real" errors, but all others are.
if !strings.Contains(err.Error(), "ICY") {
if err != nil {
g.logger.Errorf("Can't connect to NTRIP stream: %s", err)
return err
}
g.logger.Debug("Connected to stream")

g.mu.Lock()
defer g.mu.Unlock()

// The error was related to the old ICY protocol. Try storing the ReadCloser anyway.
g.logger.Warnf("Detected old HTTP protocol: %s", err)
g.ntripClient.Stream = rc
return err
return g.err.Get()
}

// openPort opens the serial port for writing.
Expand All @@ -282,9 +277,6 @@ func (g *rtkSerial) openPort() error {
MinimumReadSize: 1,
}

g.mu.Lock()
defer g.mu.Unlock()

if err := g.cancelCtx.Err(); err != nil {
return err
}
Expand Down Expand Up @@ -460,25 +452,26 @@ func (g *rtkSerial) receiveAndWriteSerial() {
g.mu.Lock()
g.isConnectedToNtrip = true
g.mu.Unlock()

continue
}
}
}
}

// Most of the movementsensor functions here don't have mutex locks since g.cachedData is protected by
// it's own mutex and not having mutex around g.err is alright.

// Position returns the current geographic location of the MOVEMENTSENSOR.
func (g *rtkSerial) Position(ctx context.Context, extra map[string]interface{}) (*geo.Point, float64, error) {
g.mu.Lock()
lastError := g.err.Get()
if lastError != nil {
lastPosition := g.lastposition.GetLastPosition()
g.mu.Unlock()
if lastPosition != nil {
return lastPosition, 0, nil
}
return geo.NewPoint(math.NaN(), math.NaN()), math.NaN(), lastError
}
g.mu.Unlock()

position, alt, err := g.cachedData.Position(ctx, extra)
if err != nil {
Expand All @@ -500,8 +493,6 @@ func (g *rtkSerial) Position(ctx context.Context, extra map[string]interface{})

// LinearVelocity passthrough.
func (g *rtkSerial) LinearVelocity(ctx context.Context, extra map[string]interface{}) (r3.Vector, error) {
g.mu.Lock()
defer g.mu.Unlock()
lastError := g.err.Get()
if lastError != nil {
return r3.Vector{}, lastError
Expand All @@ -521,9 +512,6 @@ func (g *rtkSerial) LinearAcceleration(ctx context.Context, extra map[string]int

// AngularVelocity passthrough.
func (g *rtkSerial) AngularVelocity(ctx context.Context, extra map[string]interface{}) (spatialmath.AngularVelocity, error) {
g.mu.Lock()
defer g.mu.Unlock()

lastError := g.err.Get()
if lastError != nil {
return spatialmath.AngularVelocity{}, lastError
Expand All @@ -534,9 +522,6 @@ func (g *rtkSerial) AngularVelocity(ctx context.Context, extra map[string]interf

// CompassHeading passthrough.
func (g *rtkSerial) CompassHeading(ctx context.Context, extra map[string]interface{}) (float64, error) {
g.mu.Lock()
defer g.mu.Unlock()

lastError := g.err.Get()
if lastError != nil {
return 0, lastError
Expand All @@ -546,9 +531,6 @@ func (g *rtkSerial) CompassHeading(ctx context.Context, extra map[string]interfa

// Orientation passthrough.
func (g *rtkSerial) Orientation(ctx context.Context, extra map[string]interface{}) (spatialmath.Orientation, error) {
g.mu.Lock()
defer g.mu.Unlock()

lastError := g.err.Get()
if lastError != nil {
return spatialmath.NewZeroOrientation(), lastError
Expand All @@ -558,9 +540,6 @@ func (g *rtkSerial) Orientation(ctx context.Context, extra map[string]interface{

// readFix passthrough.
func (g *rtkSerial) readFix(ctx context.Context) (int, error) {
g.mu.Lock()
defer g.mu.Unlock()

lastError := g.err.Get()
if lastError != nil {
return 0, lastError
Expand All @@ -570,9 +549,6 @@ func (g *rtkSerial) readFix(ctx context.Context) (int, error) {

// readSatsInView returns the number of satellites in view.
func (g *rtkSerial) readSatsInView(ctx context.Context) (int, error) {
g.mu.Lock()
defer g.mu.Unlock()

lastError := g.err.Get()
if lastError != nil {
return 0, lastError
Expand All @@ -583,9 +559,6 @@ func (g *rtkSerial) readSatsInView(ctx context.Context) (int, error) {

// Properties passthrough.
func (g *rtkSerial) Properties(ctx context.Context, extra map[string]interface{}) (*movementsensor.Properties, error) {
g.mu.Lock()
defer g.mu.Unlock()

lastError := g.err.Get()
if lastError != nil {
return &movementsensor.Properties{}, lastError
Expand All @@ -597,9 +570,6 @@ func (g *rtkSerial) Properties(ctx context.Context, extra map[string]interface{}
// Accuracy passthrough.
func (g *rtkSerial) Accuracy(ctx context.Context, extra map[string]interface{}) (*movementsensor.Accuracy, error,
) {
g.mu.Lock()
defer g.mu.Unlock()

lastError := g.err.Get()
if lastError != nil {
return nil, lastError
Expand Down

0 comments on commit 4794a92

Please sign in to comment.