Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(perception reproducer): some improvements in perception reproducer. #72

Merged
merged 11 commits into from
Jul 10, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,16 @@ def on_timer(self):
if objects_msg:
objects_msg.header.stamp = timestamp
if self.args.detected_object:
if not self.ego_pose:
print("No ego pose found.")
if not self.ego_odom:
print("No ego odom found.")
return

ego_odom = self.find_ego_odom_by_timestamp(self.bag_timestamp)
if not ego_odom:
return
log_ego_pose = ego_odom.pose.pose

translate_objects_coordinate(self.ego_pose, log_ego_pose, objects_msg)
translate_objects_coordinate(self.ego_odom.pose.pose, log_ego_pose, objects_msg)
self.objects_pub.publish(objects_msg)

# traffic signals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, args, name):
super().__init__(name)
self.args = args

self.ego_pose = None
self.ego_odom = None
self.rosbag_objects_data = []
self.rosbag_ego_odom_data = []
self.rosbag_traffic_signals_data = []
Expand Down Expand Up @@ -110,7 +110,7 @@ def __init__(self, args, name):
time.sleep(1.0)

def on_odom(self, odom):
self.ego_pose = odom.pose.pose
self.ego_odom = odom

def load_rosbag(self, rosbag2_path: str):
reader = open_reader(str(rosbag2_path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,15 @@
from utils import create_empty_pointcloud
from utils import translate_objects_coordinate

dist_eps = 1e-3 # (meters)


class PerceptionReproducer(PerceptionReplayerCommon):
def __init__(self, args):
self.rosbag_ego_odom_search_radius = (
args.search_radius
) # (m) the range of the ego odom to search,
self.ego_odom_search_radius = (
self.rosbag_ego_odom_search_radius
) # it may be set by an individual parameter.

self.reproduce_cool_down = (
args.reproduce_cool_down if args.search_radius != 0.0 else 0.0
) # (sec) the cool down time for republishing published data, please make sure that it's greater than the ego's stopping time.
self.rosbag_ego_odom_search_radius = args.search_radius
self.ego_odom_search_radius = self.rosbag_ego_odom_search_radius
self.reproduce_cool_down = args.reproduce_cool_down if args.search_radius != 0.0 else 0.0

super().__init__(args, "perception_reproducer")

self.reproduce_sequence_indices = deque() # contains ego_odom_idx
self.cool_down_indices = deque() # contains ego_odom_idx
self.ego_odom_id2last_published_timestamp = {} # for checking last published timestamp.

self.prev_ego_pos = None
self.prev_ego_odom_msg = None
self.perv_objects_msg = None
self.prev_traffic_signals_msg = None

self.stopwatch = StopWatch(self.args.verbose) # for debug

# refresh cool down for setting initial pose in psim.
Expand All @@ -63,17 +45,27 @@ def __init__(self, args):
# to make some data to accelerate computation
self.preprocess_data()

self.reproduce_sequence_indices = deque() # contains ego_odom_idx
self.cool_down_indices = deque() # contains ego_odom_idx
self.ego_odom_id2last_published_timestamp = {} # for checking last published timestamp.
self.last_sequenced_ego_pose = None

pose_timestamp, self.prev_ego_odom_msg = self.rosbag_ego_odom_data[0]
self.perv_objects_msg, self.prev_traffic_signals_msg = self.find_topics_by_timestamp(
pose_timestamp
)
self.memorized_original_objects_msg = (
self.memorized_noised_objects_msg
) = self.perv_objects_msg

# start main timer callback
time_diffs = []
prev_stamp = self.rosbag_ego_odom_data[0][0]
for stamp, _ in self.rosbag_ego_odom_data[1:]:
time_diff = (stamp - prev_stamp) / 1e9
time_diffs.append(time_diff)
prev_stamp = stamp

average_ego_odom_interval = sum(time_diffs) / len(time_diffs)
# slow down the publication speed.
average_ego_odom_interval *= args.publishing_speed_factor

average_ego_odom_interval = np.mean(
[
(self.rosbag_ego_odom_data[i][0] - self.rosbag_ego_odom_data[i - 1][0]) / 1e9
for i in range(1, len(self.rosbag_ego_odom_data))
]
)
self.timer = self.create_timer(average_ego_odom_interval, self.on_timer)

# kill perception process to avoid a conflict of the perception topics
Expand Down Expand Up @@ -105,30 +97,34 @@ def on_timer(self):
pointcloud_msg = create_empty_pointcloud(timestamp_msg)
self.pointcloud_pub.publish(pointcloud_msg)

if not self.ego_pose:
print("No ego pose found.")
if not self.ego_odom:
print("No ego odom found.")
return

# Update reproduce list if ego_pos is moved.
if (
self.ego_pose is None
or self.prev_ego_pos is None
or (
(self.ego_pose.position.x - self.prev_ego_pos.position.x) ** 2
+ (self.ego_pose.position.y - self.prev_ego_pos.position.y) ** 2
ego_pose = self.ego_odom.pose.pose
dist_moved = (
np.sqrt(
(ego_pose.position.x - self.last_sequenced_ego_pose.position.x) ** 2
+ (ego_pose.position.y - self.last_sequenced_ego_pose.position.y) ** 2
)
> dist_eps**2
):
if self.last_sequenced_ego_pose
else 999
)

# Update the reproduce sequence if the distance moved is greater than the search radius.
if dist_moved > self.ego_odom_search_radius:
self.last_sequenced_ego_pose = ego_pose

# find the nearest ego odom by simulation observation
self.stopwatch.tic("find_nearest_ego_odom_by_observation")
nearby_ego_odom_indies = self.find_nearby_ego_odom_indies(
[self.ego_pose], self.ego_odom_search_radius
[ego_pose], self.ego_odom_search_radius
)
nearby_ego_odom_indies = [
self.rosbag_ego_odom_data[idx][1].pose.pose for idx in nearby_ego_odom_indies
]
if not nearby_ego_odom_indies:
nearest_ego_odom_ind = self.find_nearest_ego_odom_index(self.ego_pose)
nearest_ego_odom_ind = self.find_nearest_ego_odom_index(ego_pose)
nearby_ego_odom_indies += [
self.rosbag_ego_odom_data[nearest_ego_odom_ind][1].pose.pose
]
Expand Down Expand Up @@ -157,78 +153,80 @@ def on_timer(self):
]
ego_odom_indices = sorted(ego_odom_indices)
self.reproduce_sequence_indices = deque(ego_odom_indices)

self.stopwatch.toc("update reproduce_sequence")

self.prev_ego_pos = self.ego_pose
if self.args.verbose:
print("reproduce_sequence_indices: ", list(self.reproduce_sequence_indices)[:20])

# Get messages
repeat_flag = len(self.reproduce_sequence_indices) == 0

# get data to publish
repeat_trigger = len(self.reproduce_sequence_indices) == 0
if not repeat_trigger: # pop data from reproduce_sequence if sequence is not empty.
# Add an additional constraint to avoid publishing too fast when there is a speed gap between the ego and the rosbag's ego when ego is departing/stopping while rosbag's ego is moving.
if not repeat_flag:
ego_speed = np.sqrt(
self.ego_odom.twist.twist.linear.x**2 + self.ego_odom.twist.twist.linear.y**2
)
ego_odom_idx = self.reproduce_sequence_indices[0]
_, ego_odom_msg = self.rosbag_ego_odom_data[ego_odom_idx]
ego_rosbag_speed = np.sqrt(
ego_odom_msg.twist.twist.linear.x**2 + ego_odom_msg.twist.twist.linear.y**2
)

ego_rosbag_dist = np.sqrt(
(ego_pose.position.x - ego_odom_msg.pose.pose.position.x) ** 2
+ (ego_pose.position.y - ego_odom_msg.pose.pose.position.y) ** 2
)
repeat_flag = ego_rosbag_speed > ego_speed * 5 and ego_rosbag_dist > 1.0
# set the speed threshold to many (5) times then ego_speed because this constraint is mainly for departing/stopping (ego speed is close to 0).

if not repeat_flag:
self.stopwatch.tic("find_topics_by_timestamp")
ego_odom_idx = self.reproduce_sequence_indices.popleft()
# extract messages by the nearest ego odom timestamp
pose_timestamp, ego_odom_msg = self.rosbag_ego_odom_data[ego_odom_idx]
# extract message by the nearest ego odom timestamp
self.stopwatch.tic("find_topics_by_timestamp")
objects_msg, traffic_signals_msg = self.find_topics_by_timestamp(pose_timestamp)
self.stopwatch.toc("find_topics_by_timestamp")
# if self.args.verbose:
# print("reproduce_sequence_indices: ", list(self.reproduce_sequence_indices)[:20])

else: # get perv data to publish if reproduce_sequence is empty.
# update cool down info.
self.ego_odom_id2last_published_timestamp[ego_odom_idx] = timestamp
self.cool_down_indices.append(ego_odom_idx)
else:
ego_odom_msg = self.prev_ego_odom_msg
objects_msg = self.perv_objects_msg
traffic_signals_msg = self.prev_traffic_signals_msg

# transform and publish current data.
# Transform and publish messages.
self.stopwatch.tic("transform and publish")

# ego odom
self.recorded_ego_pub.publish(ego_odom_msg)

if ego_odom_msg:
self.prev_ego_odom_msg = ego_odom_msg
self.recorded_ego_pub.publish(ego_odom_msg)
# objects
objects_msg = objects_msg if objects_msg else self.perv_objects_msg
if objects_msg:
self.perv_objects_msg = objects_msg
objects_msg.header.stamp = timestamp_msg

# add noise to repeat published objects
if repeat_flag and self.args.noise:
objects_msg = self.add_perception_noise(objects_msg)

if self.args.detected_object:
# copy the messages
self.stopwatch.tic("message deepcopy")
objects_msg_copied = pickle.loads(
pickle.dumps(objects_msg)
) # this is x5 faster than deepcopy
self.stopwatch.toc("message deepcopy")

log_ego_pose = ego_odom_msg.pose.pose
translate_objects_coordinate(self.ego_pose, log_ego_pose, objects_msg_copied)
else:
objects_msg_copied = objects_msg
self.objects_pub.publish(objects_msg_copied)
objects_msg = self.copy_message(objects_msg)
translate_objects_coordinate(ego_pose, ego_odom_msg.pose.pose, objects_msg)

self.objects_pub.publish(objects_msg)

# traffic signals
traffic_signals_msg = (
traffic_signals_msg if traffic_signals_msg else self.prev_traffic_signals_msg
)
if traffic_signals_msg:
traffic_signals_msg.stamp = timestamp_msg
self.traffic_signals_pub.publish(traffic_signals_msg)
self.prev_traffic_signals_msg = traffic_signals_msg
elif self.prev_traffic_signals_msg:
self.prev_traffic_signals_msg.stamp = timestamp_msg
self.traffic_signals_pub.publish(traffic_signals_msg)

self.traffic_signals_pub.publish(self.prev_traffic_signals_msg)
self.stopwatch.toc("transform and publish")

if not repeat_trigger:
# save data for repeat publication.
self.prev_ego_odom_msg = ego_odom_msg
self.perv_objects_msg = (
objects_msg if objects_msg is not None else self.perv_objects_msg
)
self.prev_traffic_signals_msg = (
traffic_signals_msg
if traffic_signals_msg is not None
else self.prev_traffic_signals_msg
)

# update cool down info.
self.ego_odom_id2last_published_timestamp[ego_odom_idx] = timestamp
self.cool_down_indices.append(ego_odom_idx)

self.stopwatch.toc("total on_timer")

def find_nearest_ego_odom_index(self, ego_pose):
Expand All @@ -247,10 +245,47 @@ def find_nearby_ego_odom_indies(self, ego_poses: list, search_radius: float):

return nearby_indices

def copy_message(self, msg):
self.stopwatch.tic("message deepcopy")
objects_msg_copied = pickle.loads(pickle.dumps(msg)) # this is x5 faster than deepcopy
self.stopwatch.toc("message deepcopy")
return objects_msg_copied

def add_perception_noise(
self, objects_msg, update_rate=0.03, x_noise_std=0.1, y_noise_std=0.05
):
if self.memorized_original_objects_msg != objects_msg:
self.memorized_noised_objects_msg = self.memorized_original_objects_msg = objects_msg

if np.random.rand() < update_rate:
self.stopwatch.tic("add noise")
self.memorized_noised_objects_msg = self.copy_message(
self.memorized_original_objects_msg
)
for obj in self.memorized_noised_objects_msg.objects:
noise_x = np.random.normal(0, x_noise_std)
noise_y = np.random.normal(0, y_noise_std)
if self.args.detected_object or self.args.tracked_object:
obj.kinematics.pose_with_covariance.pose.position.x += noise_x
obj.kinematics.pose_with_covariance.pose.position.y += noise_y
else:
obj.kinematics.initial_pose_with_covariance.pose.position.x += noise_x
obj.kinematics.initial_pose_with_covariance.pose.position.y += noise_y
self.stopwatch.toc("add noise")

return self.memorized_noised_objects_msg


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--bag", help="rosbag", default=None)
parser.add_argument(
"-n",
"--noise",
help="apply perception noise to the objects when publishing repeated messages",
action="store_true",
default=True,
)
parser.add_argument(
"-d", "--detected-object", help="publish detected object", action="store_true"
)
Expand All @@ -273,16 +308,10 @@ def find_nearby_ego_odom_indies(self, ego_poses: list, search_radius: float):
parser.add_argument(
"-c",
"--reproduce-cool-down",
help="The cool down time for republishing published messages (default is 80.0 seconds)",
help="The cool down time for republishing published messages (default is 80.0 seconds), please make sure that it's greater than the ego's stopping time.",
type=float,
default=80.0,
)
parser.add_argument(
"--publishing-speed-factor",
type=float,
default=1.2,
help="A factor to slow down the publication speed.",
)

args = parser.parse_args()

Expand Down
Loading