A better status API

When using the Python API, the mechanisms to queue commands to the robot in order of ugliness are:

  1. Sleep before pushing the next command, and hope that the previous command is done
  2. Sleep for a sufficient amout of time, hoping that the robot starts moving, and then check status['is_moving_filtered'], and hope that you get a clean, filtered status.

The fundamental problem here is that high-level functions like base.translate_by implement some kind of PID control, so they do more than simply pushing commands to the motors – and yet, their trajectory progress is not exposed directly, and we have to rely on direct motor status.

I think it’s worth thinking of exposing a better status API, or a better queuing API, where pushing a second command to the same motors doesn’t invalidate the first.

I’m not too familiar with the firmware code, but it looks like maybe something like this flag can be exposed: stretch_firmware/TrajectoryManager.cpp at master · hello-robot/stretch_firmware · GitHub

What do you all think?

Related thread: Stretch_body Python SDK and Python Multithreading - #6 by aedsinger

Thank you for starting the conversation on this.

It would be helpful to understand what type of behavior you’re looking to generate by using queued rather than preempting incremental moves. Perhaps you can share some pseudo code?

The currently available incremental move interface is designed with servoing applications in mind. In this case, an external ‘observer’ commands a series of small motions until a target is achieved. Preemption is desirable as it allows for dynamic response to changes in the servo target.

Your goals may be achievable with the current control interface. If not, we can extend the interface for you.

Also, the progress of the joint trajectory is available as motor.status['is_mg_moving']. On the other thread I just posted details on working with it.

Finally, we are about to release a significant upgrade to the Stretch Body Python SDK that introduces splined waypoint trajectories. This interface may be of interest depending on your use case.

Thanks for the reply.

I think pre-emption is not fundamentally in conflict with queuing (one should be able to cancel or dequeue with a good queueing API).

I think, my primary issue with the current interface is that it is doing high-level PID control, it isn’t a low-level API that is simply sending current or signals to the motor – and yet it doesn’t behave like a high-level API.

As a high-level and long-running trajectory-execution API (with internal PID control, and soon to come splined waypoint control), it should be able to give a richer status update as to where it is within it’s trajectory execution.
What we are doing now is to ask it to execute a trajectory (like translate_by, rotate), and then getting the status of executing that trajectory in a very low-bandwidth / unreliable status API, with a lot of uncertainty. This seems very very round-about.

Ideally, we’d have something more in the order of:

status_handle = base.translate_by(...)

And then the status_handle gives us a status of the corresponding command:

status_handle.is_finished()
status_handle.check_exception()
status_handle.progress()

I feel like this is going to be more important as you introduce even more long-running, high-level and complex trajectory control with the new upgrade.

For our use-cases, we’re actually okay with imperfect movement, and the general approximate movements of the optical encoders are fine. We don’t need precise servo-like control.

So, we commonly want pseudo-code like:

def move_robot(...):
    future1 = bot.translate_by(...)
    future2 = bot.rotate_by(...)
    future3 = bot.translate_by(...)
   return [future1, future2, future3]

def detect_objects(...):
   ...

def look_at_object(...):
   ...
   camera.set_pitch(...)
   camera.set_yaw(...)

def 

def check_obstacle(...):
   ...

while True:
    # doing exploration
    futures = move_robot(random_location)
    safe_moving_thread(futures, check_obstacle) # stops the movements if obstacle is detected
   
   objects = detect_objects()
   if objects is not None:
        [future.cancel() for future in futures]
        futures = move_robot(objects[0].location)
        safe_moving_thread(futures, check_obstacle)

I am simplifying quite a bit, as most of the hello API is used within top of our navigation/planning/low-level services, but hopefully it gives a sense of what we’re trying to do.

We care about clean robot trajectories, where the robot is moving constantly, and doesn’t look unnatural in it’s movement, and we are fine with imperfect movement / error, as we have a SLAM implementation that deals with it and plans it’s next things to do just fine (i.e. the error doesn’t accumulate over time, even if we didn’t reach the exact point that we wanted the robot to go to)

Thank you, that’s helpful context. We believe that you’ll be able to accomplish your example using the new splined trajectory interface (with a bit of Python wrapping). Maybe we should sync on a call and then report back here when we have a solution?