Skip to content

Commit

Permalink
revamp Packet type
Browse files Browse the repository at this point in the history
As described in #47 and #58: revamp this type to keep the full raw
packet, and provide accessors rather than raw fields. It's also
smaller now, which is nice.
  • Loading branch information
scottlamb committed Apr 28, 2022
1 parent 7ea09dd commit 2e34bf9
Show file tree
Hide file tree
Showing 17 changed files with 1,133 additions and 720 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
with each packet.
* BREAKING: `PacketItem` and `CodecItem` are now `#[non_exhaustive]` for
future expansion.
* BREAKING: `retina::client::rtp::Packet` is now
`retina::rtp::ReceivedPacket`, and field access has been removed in favor
of accessors.

## `v0.3.9` (2022-04-12)

Expand Down
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ once_cell = "1.7.2"
pin-project = "1.0.7"
pretty-hex = "0.2.1"
rand = "0.8.3"
rtp-rs = "0.6.0"
rtsp-types = "0.0.3"
sdp-types = "0.1.4"
smallvec = { version = "1.6.1", features = ["union"] }
Expand Down
18 changes: 5 additions & 13 deletions benches/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,20 @@ fn make_test_data(max_payload_size: u16) -> Bytes {
];
let mut dummy_frame = vec![0; 1048576];
dummy_frame[4] = h264_reader::nal::UnitType::SliceLayerWithoutPartitioningIdr.id();
let mut p = retina::codec::h264::Packetizer::new(max_payload_size, 0, 24104).unwrap();
let mut p =
retina::codec::h264::Packetizer::new(max_payload_size, 0, 24104, 96, 0x4cacc3d1).unwrap();
let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap();
let mut pkt_buf = vec![0; 65536];
for _ in 0..30 {
for &f in &frame_sizes {
dummy_frame[0..4].copy_from_slice(&f.to_be_bytes()[..]);
let frame = Bytes::copy_from_slice(&dummy_frame[..(usize::try_from(f).unwrap() + 4)]);
p.push(timestamp, frame).unwrap();
while let Some(pkt) = p.pull().unwrap() {
let pkt_len = rtp_rs::RtpPacketBuilder::new()
.payload_type(96)
.marked(pkt.mark)
.sequence(rtp_rs::Seq::from(pkt.sequence_number))
.ssrc(0x4cacc3d1)
.timestamp(pkt.timestamp.timestamp() as u32)
.payload(&pkt.payload)
.build_into(&mut pkt_buf)
.unwrap();
let pkt = pkt.raw();
data.push(b'$'); // interleaved data
data.push(0); // channel 0
data.extend_from_slice(&u16::try_from(pkt_len).unwrap().to_be_bytes()[..]);
data.extend_from_slice(&pkt_buf[..pkt_len]);
data.extend_from_slice(&u16::try_from(pkt.len()).unwrap().to_be_bytes()[..]);
data.extend_from_slice(&pkt);
}
timestamp = timestamp.try_add(3000).unwrap();
}
Expand Down
7 changes: 0 additions & 7 deletions fuzz/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 14 additions & 8 deletions fuzz/fuzz_targets/depacketize_h264.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,50 @@
// SPDX-License-Identifier: MIT OR Apache-2.0

#![no_main]
use bytes::{Buf, Bytes};
use libfuzzer_sys::fuzz_target;
use std::num::NonZeroU32;

fuzz_target!(|data: &[u8]| {
let mut data = Bytes::copy_from_slice(data);
let mut data = data;
let mut depacketizer = retina::codec::Depacketizer::new(
"video", "h264", 90_000, None, Some("packetization-mode=1;profile-level-id=64001E;sprop-parameter-sets=Z2QAHqwsaoLA9puCgIKgAAADACAAAAMD0IAA,aO4xshsA")).unwrap();
let mut timestamp = retina::Timestamp::new(0, NonZeroU32::new(90_000).unwrap(), 0).unwrap();
let mut sequence_number: u16 = 0;
let conn_ctx = retina::ConnectionContext::dummy();
let stream_ctx = retina::StreamContextRef::dummy();
let pkt_ctx = retina::PacketContext::dummy();
while data.has_remaining() {
let hdr = data.get_u8();
loop {
let (hdr, rest) = match data.split_first() {
Some(r) => r,
None => return,
};
let ts_change = (hdr & 0b001) != 0;
let mark = (hdr & 0b010) != 0;
let loss = (hdr & 0b100) != 0;
let len = usize::from(hdr >> 3);
if len > data.remaining() {
if rest.len() < len {
return;
}
let (payload, rest) = rest.split_at(len);
data = rest;
if loss {
sequence_number = sequence_number.wrapping_add(1);
}
if ts_change {
timestamp = timestamp.try_add(1).unwrap();
}
let pkt = retina::client::rtp::Packet {
let pkt = retina::rtp::ReceivedPacketBuilder {
ctx: pkt_ctx,
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number,
loss: u16::from(loss),
payload_type: 96,
mark,
payload: data.split_off(len),
};
}
.build(payload.iter().copied())
.unwrap();
//println!("pkt: {:#?}", pkt);
if depacketizer.push(pkt).is_err() {
return;
Expand Down
4 changes: 2 additions & 2 deletions fuzz/fuzz_targets/roundtrip_h264.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fuzz_target!(|data: &[u8]| {
let conn_ctx = retina::ConnectionContext::dummy();
let stream_ctx = retina::StreamContextRef::dummy();
let max_payload_size = u16::from_be_bytes([data[0], data[1]]);
let mut p = match retina::codec::h264::Packetizer::new(max_payload_size, 0, 0) {
let mut p = match retina::codec::h264::Packetizer::new(max_payload_size, 0, 0, 0, 0) {
Ok(p) => p,
Err(_) => return,
};
Expand All @@ -39,7 +39,7 @@ fuzz_target!(|data: &[u8]| {
let frame = loop {
match p.pull() {
Ok(Some(pkt)) => {
let mark = pkt.mark;
let mark = pkt.mark();
if d.push(pkt).is_err() {
return;
}
Expand Down
25 changes: 12 additions & 13 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ async fn punch_firewall_hole(
#[derive(Debug)]
#[non_exhaustive]
pub enum PacketItem {
RtpPacket(rtp::Packet),
RtpPacket(crate::rtp::ReceivedPacket),
SenderReport(rtp::SenderReport),
}

Expand Down Expand Up @@ -2315,7 +2315,7 @@ impl futures::Stream for Demuxed {
loop {
let (stream_id, pkt) = match self.state {
DemuxedState::Waiting => match ready!(Pin::new(&mut self.session).poll_next(cx)) {
Some(Ok(PacketItem::RtpPacket(p))) => (p.stream_id, Some(p)),
Some(Ok(PacketItem::RtpPacket(p))) => (p.stream_id(), Some(p)),
Some(Ok(PacketItem::SenderReport(p))) => {
return Poll::Ready(Some(Ok(CodecItem::SenderReport(p))))
}
Expand All @@ -2342,10 +2342,10 @@ impl futures::Stream for Demuxed {
.inner
.ctx();
if let Some(p) = pkt {
let pkt_ctx = p.ctx;
let stream_id = p.stream_id;
let ssrc = p.ssrc;
let sequence_number = p.sequence_number;
let pkt_ctx = *p.ctx();
let stream_id = p.stream_id();
let ssrc = p.ssrc();
let sequence_number = p.sequence_number();
depacketizer.push(p).map_err(|description| {
wrap!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
Expand Down Expand Up @@ -2498,9 +2498,9 @@ mod tests {
async {
match session.next().await {
Some(Ok(PacketItem::RtpPacket(p))) => {
assert_eq!(p.ssrc, 0xdcc4a0d8);
assert_eq!(p.sequence_number, 0x41d4);
assert_eq!(&p.payload[..], b"hello world");
assert_eq!(p.ssrc(), 0xdcc4a0d8);
assert_eq!(p.sequence_number(), 0x41d4);
assert_eq!(&p.payload()[..], b"hello world");
}
o => panic!("unexpected item: {:#?}", o),
}
Expand Down Expand Up @@ -2608,9 +2608,9 @@ mod tests {
async {
match session.next().await {
Some(Ok(PacketItem::RtpPacket(p))) => {
assert_eq!(p.ssrc, 0xdcc4a0d8);
assert_eq!(p.sequence_number, 0x41d4);
assert_eq!(&p.payload[..], b"hello world");
assert_eq!(p.ssrc(), 0xdcc4a0d8);
assert_eq!(p.sequence_number(), 0x41d4);
assert_eq!(&p.payload()[..], b"hello world");
}
o => panic!("unexpected item: {:#?}", o),
}
Expand Down Expand Up @@ -2771,7 +2771,6 @@ mod tests {
("SessionOptions", std::mem::size_of::<SessionOptions>()),
("Demuxed", std::mem::size_of::<Demuxed>()),
("Stream", std::mem::size_of::<Stream>()),
("rtp::Packet", std::mem::size_of::<rtp::Packet>()),
(
"rtp::SenderReport",
std::mem::size_of::<rtp::SenderReport>(),
Expand Down
Loading

0 comments on commit 2e34bf9

Please sign in to comment.