Skip to content

Commit

Permalink
blockreader.rs fix handling GZIP XZ
Browse files Browse the repository at this point in the history
Fix handling of GZIP and XZ in `read_block_FILE_GZ` and
`read_block_FILE_XZ`. GZIP files will be decoded in 1024 byte
chunks; this fails less often.

Fix storage of blocks that affected both functions; was storing
the wrong blockoffset key (how did these ever work?!).
  • Loading branch information
jtmoon79 committed Jul 28, 2022
1 parent b2d6de5 commit 8d5e686
Showing 1 changed file with 90 additions and 50 deletions.
140 changes: 90 additions & 50 deletions src/Readers/blockreader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,10 @@ pub struct BlockReader {
///
/// Users should always call `filesz()`.
pub(crate) filesz: FileSz,
/// File size in blocks, set in `open`
/// File size in blocks, set in `open`.
pub(crate) blockn: u64,
/// BlockSz used for read operations.
/// standard `Block` size in bytes; all `Block`s are this size except the
/// last `Block` which may this size or smaller (and not zero).
pub(crate) blocksz: BlockSz,
/// Count of bytes stored by the `BlockReader`.
/// May not match `self.blocks.iter().map(|x| sum += x.len()); sum` as
Expand Down Expand Up @@ -902,7 +903,7 @@ impl BlockReader {

let mut bufreader: BufReader_Xz = BufReader_Xz::new(file_xz);

// XXX: THIS IS A TERRIBLE HACK
// XXX: THIS IS A TERRIBLE HACK!
// read the entire file into blocks in one go!
// putting this here until the implementation of reading the header/blocks
// of the underlying .xz file
Expand Down Expand Up @@ -1453,7 +1454,7 @@ impl BlockReader {
// XXX: this will panic if the key+value in `self.blocks` was dropped
// which could happen during streaming stage
let blockp: BlockP = self.blocks.get_mut(&bo_at).unwrap().clone();
self.store_block_in_LRU_cache(blockoffset, &blockp);
self.store_block_in_LRU_cache(bo_at, &blockp);
return ResultS3_ReadBlock::Found(blockp);
}
bo_at += 1;
Expand All @@ -1464,52 +1465,92 @@ impl BlockReader {
self.read_blocks_miss += 1;
}

let blocksz_u: usize = self.blocksz as usize;
// XXX: for some reason large block sizes are more likely to fail `.read`
// so do many smaller reads of a size that succeeds more often
let blocksz_u: usize = self.blocksz_at_blockoffset(&bo_at) as usize;
// bytes to read in all `.read()` except the last
// in ad-hoc experiments, this size was found to succeed pretty often
const READSZ: usize = 1024;
//const READSZ: usize = 0xFFFF;
// bytes to read in last `.read()`
let readsz_last: usize = blocksz_u % READSZ;
// number of `.read()` of size `READSZ` plus one
let mut reads: usize = blocksz_u / READSZ + 1;
let bytes_to_read: usize = (reads - 1) * READSZ + readsz_last;
debug_assert_eq!(bytes_to_read, blocksz_u, "bad calculation");
// final block of storage
let mut block = Block::with_capacity(blocksz_u);
// intermediate buffer of size `READSZ` for smaller reads
let mut block_buf = Block::with_capacity(READSZ);
// XXX: `with_capacity, clear, resize` is a verbose way to create a new vector with a run-time determined `capacity`
// and `len`. `len == capacity` is necessary for calls to `decoder.read`.
// Using `decoder.read_exact` and `decoder.read_to_end` was more difficult.
// See https://github.com/rust-lang/flate2-rs/issues/308
block.clear();
block.resize(blocksz_u, 0);
debug_eprintln!("{}read_block_FILE_GZ({}): blocks_read count {:?}", so(), blockoffset, self.blocks_read.len());
debug_eprintln!("{}read_block_FILE_GZ({}): GzDecoder.read(@{:p} (len {}, capacity {}))", so(), blockoffset, &block, block.len(), block.capacity());
match (self.gz.as_mut().unwrap().decoder).read(block.as_mut()) {
Ok(size_) if size_ == 0 => {
debug_eprintln!("{}read_block_FILE_GZ({}): GzDecoder.read() returned Ok({:?})", so(), blockoffset, size_);
let byte_at: FileOffset = self.file_offset_at_block_offset_self(blockoffset);
return ResultS3_ReadBlock::Err(
Error::new(
ErrorKind::InvalidData,
format!("GzDecoder.read() read zero bytes for vec<u8> buffer of length {}, capacity {}; stuck at inflated byte {}, size {}, size uncompressed {} (calculated from gzip header); {:?}", block.len(), block.capacity(), byte_at, self.filesz, self.filesz_actual, self.path)
)
);
},
Ok(size_) if size_ == blocksz_u => {
debug_eprintln!("{}read_block_FILE_GZ({}): GzDecoder.read() returned Ok({:?})", so(), blockoffset, size_);
block.resize(size_, 0);
debug_eprintln!("{}read_block_FILE_GZ({}): blocks_read count {:?}; for blockoffset {}: must do {} reads of {} bytes, and one read of {} bytes (total {} bytes to read) (uncompressed filesz {})", so(), blockoffset, self.blocks_read.len(), bo_at, reads - 1, READSZ, readsz_last, bytes_to_read, self.filesz());
let mut read_total: usize = 0;
while reads > 0 {
reads -= 1;
let mut readsz: usize = READSZ;
if reads == 0 {
readsz = readsz_last;
}
Ok(size_) => {
debug_eprintln!("{}read_block_FILE_GZ({}): GzDecoder.read() returned Ok({:?}), blocksz {}", so(), blockoffset, size_, blocksz_u);
block.resize(size_, 0);
},
//Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {
// // XXX: docs say the value of `bytes` is not guaranteed, but in practice
// // the `bytes` are set to remaining data. Also, further calls to any `read`
// // fail to return anything.
// debug_eprintln!("{}read_block_FILE_GZ({}): read() returned Err({})", sx(), blockoffset, err);
//},
Err(err) => {
debug_eprintln!("ERROR: GzDecoder.read(&block (capacity {})) error {} for {:?}", self.blocksz, err, self.path);
debug_eprintln!("{}read_block_FILE_GZ({}): return Err({})", sx(), blockoffset, err);
return ResultS3_ReadBlock::Err(err);
if readsz_last == 0 {
break;
}
// TODO: [2022/07] cost-savings, use pre-allocated buffer
block_buf.clear();
block_buf.resize(readsz, 0);
debug_eprintln!("{}read_block_FILE_GZ({}): GzDecoder.read(…); read {}, readsz {}, block len {}, block capacity {}, blockoffset {}", so(), blockoffset, reads, readsz, block_buf.len(), block_buf.capacity(), bo_at);
match (self.gz.as_mut().unwrap().decoder).read(block_buf.as_mut()) {
Ok(size_) if size_ == 0 => {
debug_eprintln!("{}read_block_FILE_GZ({}): GzDecoder.read() returned Ok({:?}); read_total {}", so(), blockoffset, size_, read_total);

let byte_at: FileOffset = self.file_offset_at_block_offset_self(bo_at) + (read_total as FileOffset);
// in ad-hoc testing, it was found the decoder never recovers from a zero-byte read
return ResultS3_ReadBlock::Err(
Error::new(
ErrorKind::InvalidData,
format!("GzDecoder.read() read zero bytes for vec<u8> buffer of length {}, capacity {}; stuck at inflated byte {}, size {}, size uncompressed {} (calculated from gzip header); {:?}", block_buf.len(), block_buf.capacity(), byte_at, self.filesz, self.filesz_actual, self.path)
)
);

}
// read was too large
Ok(size_) if size_ > readsz => {
debug_eprintln!("{}read_block_FILE_GZ({}): GzDecoder.read() returned Ok({:?}); size too big", so(), blockoffset, size_);
return ResultS3_ReadBlock::Err(
Error::new(
ErrorKind::InvalidData,
format!("GzDecoder.read() read too many bytes {} for vec<u8> buffer of length {}, capacity {}; file size {}, file size uncompressed {} (calculated from gzip header); {:?}", size_, block_buf.len(), block_buf.capacity(), self.filesz, self.filesz_actual, self.path)
)
);
}
// first or subsequent read is le expected size
Ok(size_) => {
debug_eprintln!("{}read_block_FILE_GZ({}): GzDecoder.read() returned Ok({:?}), readsz {}, blocksz {}", so(), blockoffset, size_, readsz, blocksz_u);
// TODO: cost-savings: use faster `copy_slice`
for byte_ in block_buf.iter().take(size_) {
block[read_total] = *byte_;
read_total += 1;
}
//debug_assert_eq!(copiedn, size_, "copied {} but read returned size {} (readsz {}) B", copiedn, size_, readsz);
}
Err(err) => {
debug_eprintln!("ERROR: GzDecoder.read(&block (capacity {})) error {} for {:?}", self.blocksz, err, self.path);
debug_eprintln!("{}read_block_FILE_GZ({}): return Err({})", sx(), blockoffset, err);
return ResultS3_ReadBlock::Err(err);
}
}
debug_assert_le!(block.len(), blocksz_u, "block.len() {} was expcted to be <= blocksz {}", block.len(), blocksz_u);
}

// check returned Block is expected number of bytes
// sanity check: check returned Block is expected number of bytes
let blocklen_sz: BlockSz = block.len() as BlockSz;
debug_eprintln!("{}read_block_FILE_GZ({}): block.len() {}, blocksz {}, blockoffset at {}", so(), blockoffset, blocklen_sz, self.blocksz, bo_at);
if block.is_empty() {
let byte_at = self.file_offset_at_block_offset_self(blockoffset);
let byte_at = self.file_offset_at_block_offset_self(bo_at);
return ResultS3_ReadBlock::Err(
Error::new(
ErrorKind::UnexpectedEof,
Expand All @@ -1522,7 +1563,7 @@ impl BlockReader {
} else if bo_at == blockoffset_last {
// last block, is blocksz correct?
if blocklen_sz > self.blocksz {
let byte_at = self.file_offset_at_block_offset_self(blockoffset);
let byte_at = self.file_offset_at_block_offset_self(bo_at);
return ResultS3_ReadBlock::Err(
Error::new(
ErrorKind::InvalidData,
Expand All @@ -1535,12 +1576,12 @@ impl BlockReader {
}
} else if blocklen_sz != self.blocksz {
// not last block, is blocksz correct?
let byte_at = self.file_offset_at_block_offset_self(blockoffset) + blocklen_sz;
let byte_at = self.file_offset_at_block_offset_self(bo_at) + blocklen_sz;
return ResultS3_ReadBlock::Err(
Error::new(
ErrorKind::InvalidData,
format!(
"GzDecoder.read() read only {} bytes for block {} expected to read {} bytes (block size), inflate stopped at byte {}. block last {}, filesz {}, filesz uncompressed {} (according to gzip header); {:?}",
"GzDecoder.read() read {} bytes for block {} expected to read {} bytes (block size), inflate stopped at byte {}. block last {}, filesz {}, filesz uncompressed {} (according to gzip header); {:?}",
blocklen_sz, bo_at, self.blocksz, byte_at, blockoffset_last, self.filesz, self.filesz_actual, self.path,
)
)
Expand All @@ -1549,14 +1590,14 @@ impl BlockReader {

// store decompressed block
let blockp = BlockP::new(block);
self.store_block_in_storage(blockoffset, &blockp);
self.store_block_in_LRU_cache(blockoffset, &blockp);
self.store_block_in_storage(bo_at, &blockp);
self.store_block_in_LRU_cache(bo_at, &blockp);
if bo_at == blockoffset {
debug_eprintln!("{}read_block_FILE_GZ({}): return Found", sx(), blockoffset);
return ResultS3_ReadBlock::Found(blockp);
}
bo_at += 1;
}
} // while bo_at <= blockoffset
debug_eprintln!("{}read_block_FILE_GZ({}): return Done", sx(), blockoffset);

ResultS3_ReadBlock::Done
Expand Down Expand Up @@ -1590,7 +1631,7 @@ impl BlockReader {
// XXX: this will panic if the key+value in `self.blocks` was dropped
// which could happen during streaming stage
let blockp: BlockP = self.blocks.get_mut(&bo_at).unwrap().clone();
self.store_block_in_LRU_cache(blockoffset, &blockp);
self.store_block_in_LRU_cache(bo_at, &blockp);
return ResultS3_ReadBlock::Found(blockp);
}
bo_at += 1;
Expand All @@ -1601,8 +1642,7 @@ impl BlockReader {
self.read_blocks_miss += 1;
}

// TODO: use `self.blocksz_at_blockoffset`
let blocksz_u: usize = self.blocksz as usize;
let blocksz_u: usize = self.blocksz_at_blockoffset(&bo_at) as usize;
let mut block = Block::with_capacity(blocksz_u);
let mut bufreader: &mut BufReader_Xz = &mut self.xz.as_mut().unwrap().bufreader;
debug_eprintln!("{}read_block_FILE_XZ: xz_decompress({:?}, block (len {}, capacity {}))", so(), bufreader, block.len(), block.capacity());
Expand All @@ -1627,7 +1667,7 @@ impl BlockReader {
// check returned Block is expected number of bytes
let blocklen_sz: BlockSz = block.len() as BlockSz;
if block.is_empty() {
let byte_at = self.file_offset_at_block_offset_self(blockoffset);
let byte_at = self.file_offset_at_block_offset_self(bo_at);
return ResultS3_ReadBlock::Err(
Error::new(
ErrorKind::UnexpectedEof,
Expand All @@ -1640,7 +1680,7 @@ impl BlockReader {
} else if bo_at == blockoffset_last {
// last block, is blocksz correct?
if blocklen_sz > self.blocksz {
let byte_at = self.file_offset_at_block_offset_self(blockoffset);
let byte_at = self.file_offset_at_block_offset_self(bo_at);
return ResultS3_ReadBlock::Err(
Error::new(
ErrorKind::InvalidData,
Expand All @@ -1653,7 +1693,7 @@ impl BlockReader {
}
} else if blocklen_sz != self.blocksz {
// not last block, is blocksz correct?
let byte_at = self.file_offset_at_block_offset_self(blockoffset) + blocklen_sz;
let byte_at = self.file_offset_at_block_offset_self(bo_at) + blocklen_sz;
return ResultS3_ReadBlock::Err(
Error::new(
ErrorKind::InvalidData,
Expand All @@ -1667,7 +1707,7 @@ impl BlockReader {

// store decompressed block
let blockp = BlockP::new(block);
self.store_block_in_storage(blockoffset, &blockp);
self.store_block_in_storage(bo_at, &blockp);
if bo_at == blockoffset {
debug_eprintln!("{}read_block_FILE_XZ({}): return Found", sx(), blockoffset);
return ResultS3_ReadBlock::Found(blockp);
Expand Down

0 comments on commit 8d5e686

Please sign in to comment.