summaryrefslogtreecommitdiff
path: root/src/session/segment_buffer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/session/segment_buffer.rs')
-rw-r--r--src/session/segment_buffer.rs64
1 files changed, 30 insertions, 34 deletions
diff --git a/src/session/segment_buffer.rs b/src/session/segment_buffer.rs
index 6e6ceae..7b75343 100644
--- a/src/session/segment_buffer.rs
+++ b/src/session/segment_buffer.rs
@@ -4,23 +4,23 @@ use std::num::Wrapping;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Description {
Ok,
- TooManyPacket,
- WindowFull,
+ TooManySegments,
+ TooLongBuffer,
}
#[derive(Debug, Clone)]
struct Segment {
- rel_seq: Wrapping<u32>,
+ offset: Wrapping<u32>,
payload: Vec<u8>,
}
impl Segment {
fn offset_part(&self, offset: Wrapping<u32>) -> &[u8] {
- let this_right = self.rel_seq + Wrapping(self.payload.len() as u32);
+ let this_right = self.offset + Wrapping(self.payload.len() as u32);
if this_right <= offset {
return &[];
} else {
- let overlap_size = (offset - self.rel_seq).0 as usize;
+ let overlap_size = (offset - self.offset).0 as usize;
return &self.payload[overlap_size..];
}
}
@@ -30,45 +30,41 @@ impl Segment {
pub struct Stream {
// Next Seq number, isn + (sum of all sent segments lengths)
next_offset: Wrapping<u32>,
- // The current list of segments that this peer is about to sent (ordered by rel_seq)
+ // The current list of segments that this peer is about to sent
segments: BTreeMap<u32, Segment>,
- max_packets: usize,
- window_size: usize,
-
- used_window_size: usize,
+ max_n_segments: usize,
+ max_buf_len: usize,
+ used_buf_len: usize,
- // todo: 完全抽象,不要timestamp,window size 和 max packets 改名。next rel seq改名。
- // todo: 起始偏移量一定是0. isn 不要了
- // todo: 回绕也不用考虑了,wrapper 删掉
// todo: 支持有截止的输出(输出一部分)
// todo: 看看磁盘是怎么实现的
}
impl Stream {
- pub fn new(window_size: usize, max_packets: usize) -> Self { // todo: 删掉isn,重命名,不要timeout
+ pub fn new(max_buf_len: usize, max_n_segments: usize) -> Self { // todo: 删掉isn,重命名,不要timeout
Stream {
next_offset: Wrapping(0),
segments: BTreeMap::new(),
- max_packets,
- window_size,
- used_window_size: 0,
+ max_n_segments,
+ max_buf_len,
+ used_buf_len: 0,
}
}
pub fn update(&mut self, offset: u32, payload: &[u8]) -> Description {
- if self.segments.len() >= self.max_packets {
- return Description::TooManyPacket;
+ if self.segments.len() >= self.max_n_segments {
+ return Description::TooManySegments;
}
- if self.used_window_size + payload.len()> self.window_size {
- return Description::WindowFull;
+ if self.used_buf_len + payload.len()> self.max_buf_len {
+ return Description::TooLongBuffer;
}
if payload.is_empty() {
return Description::Ok;
}
let segment = Segment {
- rel_seq: Wrapping(offset),
+ offset: Wrapping(offset),
payload: payload.to_vec(),
};
self.insert_sorted(segment);
@@ -85,7 +81,7 @@ impl Stream {
let mut ret = Vec::new();
while !self.segments.is_empty() {
- if self.segments.first_entry().unwrap().get().rel_seq > self.next_offset { // there is a gap
+ if self.segments.first_entry().unwrap().get().offset > self.next_offset { // there is a gap
println!("Gap detected");
break;
}
@@ -95,7 +91,7 @@ impl Stream {
ret.extend_from_slice(extension);
self.next_offset += Wrapping(extension.len() as u32);
- self.used_window_size -= segment.payload.len();
+ self.used_buf_len -= segment.payload.len();
}
println!("ret len: {}", ret.len());
@@ -112,14 +108,14 @@ impl Stream {
while !self.segments.is_empty() {
let segment = self.segments.pop_first().unwrap().1;
- if segment.rel_seq + Wrapping(segment.payload.len() as u32) <= self.next_offset {
+ if segment.offset + Wrapping(segment.payload.len() as u32) <= self.next_offset {
continue;
}
let rel = segment.payload.len() as u32;
- if segment.rel_seq > self.next_offset || ret.is_empty() {
+ if segment.offset > self.next_offset || ret.is_empty() {
ret.push(segment.payload);
- self.next_offset = segment.rel_seq + Wrapping(rel);
+ self.next_offset = segment.offset + Wrapping(rel);
} else {
if let Some(last) = ret.last_mut() {
let extension = segment.offset_part(self.next_offset);
@@ -129,15 +125,15 @@ impl Stream {
}
}
- self.used_window_size = 0;
+ self.used_buf_len = 0;
self.next_offset = Wrapping(0);
ret
}
fn insert_sorted(&mut self, s: Segment) {
- self.used_window_size += s.payload.len();
- self.segments.insert(s.rel_seq.0, s);
+ self.used_buf_len += s.payload.len();
+ self.segments.insert(s.offset.0, s);
}
}
@@ -294,13 +290,13 @@ mod tests {
assert_eq!(s.update(0, &[1, 2, 3, 4]), Description::Ok);
assert_eq!(s.update(4, &[5, 6, 7, 8]), Description::Ok);
- assert_eq!(s.update(8, &[2, 3, 4, 5]), Description::WindowFull);
+ assert_eq!(s.update(8, &[2, 3, 4, 5]), Description::TooLongBuffer);
assert_eq!(s.pullup().unwrap(), &[1, 2, 3, 4, 5, 6, 7, 8]); // pop will clear window size
let v:Vec<_> = (1..=10).collect();
assert_eq!(s.update(8, v.as_slice()), Description::Ok);
- assert_eq!(s.update(18, &[1]), Description::WindowFull);
+ assert_eq!(s.update(18, &[1]), Description::TooLongBuffer);
}
#[test]
@@ -309,12 +305,12 @@ mod tests {
assert_eq!(s.update(0, &[1, 2, 3, 4]), Description::Ok);
assert_eq!(s.update(5, &[6, 7, 8]), Description::Ok); // hole
- assert_eq!(s.update(8, &[2, 3, 4, 5]), Description::TooManyPacket);
+ assert_eq!(s.update(8, &[2, 3, 4, 5]), Description::TooManySegments);
assert_eq!(s.pullup().unwrap(), &[1, 2, 3, 4]); // pop 1
assert_eq!(s.update(8, &[11]), Description::Ok);
- assert_eq!(s.update(9, &[12]), Description::TooManyPacket);
+ assert_eq!(s.update(9, &[12]), Description::TooManySegments);
assert_eq!(s.clear(), vec![vec![6, 7, 8, 11]]);
}
}