Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multithreading in duplicate finder #100

Merged
merged 1 commit into from
Nov 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 124 additions & 82 deletions czkawka_core/src/duplicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::common_extensions::Extensions;
use crate::common_items::ExcludedItems;
use crate::common_messages::Messages;
use crate::common_traits::*;
use rayon::prelude::*;
use std::sync::atomic::{AtomicBool, Ordering};

const HASH_MB_LIMIT_BYTES: u64 = 1024 * 1024; // 1MB

Expand Down Expand Up @@ -469,8 +471,6 @@ impl DuplicateFinder {
// Create new BTreeMap without single size entries(files have not duplicates)
let mut new_map: BTreeMap<u64, Vec<FileEntry>> = Default::default();

self.information.number_of_duplicated_files_by_size = 0;

for (size, vector) in &self.files_with_identical_size {
if vector.len() > 1 {
self.information.number_of_duplicated_files_by_size += vector.len() - 1;
Expand All @@ -492,112 +492,154 @@ impl DuplicateFinder {
}

let start_time: SystemTime = SystemTime::now();
let mut file_handler: File;
let mut hashmap_with_hash: HashMap<String, Vec<FileEntry>>;
let check_was_breaked = AtomicBool::new(false);
let mut pre_checked_map: BTreeMap<u64, Vec<FileEntry>> = Default::default();

// 1 step - check only small part of file hash
for (size, vector) in &self.files_with_identical_size {
hashmap_with_hash = Default::default();

for file_entry in vector {
if stop_receiver.is_some() && stop_receiver.unwrap().try_recv().is_ok() {
return false;
}
file_handler = match File::open(&file_entry.path) {
Ok(t) => t,
Err(_) => {
self.text_messages.warnings.push(format!("Unable to check hash of file {}", file_entry.path.display()));
continue;
#[allow(clippy::type_complexity)]
let pre_hash_results: Vec<(u64, HashMap<String, Vec<FileEntry>>, Vec<String>, u64)> = self
.files_with_identical_size
.par_iter()
.map(|(size, vec_file_entry)| {
let mut hashmap_with_hash: HashMap<String, Vec<FileEntry>> = Default::default();
let mut errors: Vec<String> = Vec::new();
let mut file_handler: File;
let mut bytes_read: u64 = 0;
'fe: for file_entry in vec_file_entry {
if stop_receiver.is_some() && stop_receiver.unwrap().try_recv().is_ok() {
check_was_breaked.store(true, Ordering::Relaxed);
return None;
}
};
file_handler = match File::open(&file_entry.path) {
Ok(t) => t,
Err(_) => {
errors.push(format!("Unable to check hash of file {}", file_entry.path.display()));
continue 'fe;
}
};

let mut hasher: blake3::Hasher = blake3::Hasher::new();
let mut buffer = [0u8; 1024 * 2];
let n = match file_handler.read(&mut buffer) {
Ok(t) => t,
Err(_) => {
self.text_messages.warnings.push(format!("Error happened when checking hash of file {}", file_entry.path.display()));
continue;
}
};
let mut hasher: blake3::Hasher = blake3::Hasher::new();
let mut buffer = [0u8; 1024 * 2];
let n = match file_handler.read(&mut buffer) {
Ok(t) => t,
Err(_) => {
errors.push(format!("Error happened when checking hash of file {}", file_entry.path.display()));
continue 'fe;
}
};

self.information.bytes_read_when_hashing += n as u64;
hasher.update(&buffer[..n]);
bytes_read += n as u64;
hasher.update(&buffer[..n]);

let hash_string: String = hasher.finalize().to_hex().to_string();
hashmap_with_hash.entry(hash_string.to_string()).or_insert_with(Vec::new);
hashmap_with_hash.get_mut(hash_string.as_str()).unwrap().push(file_entry.to_owned());
}
for (_string, mut vector) in hashmap_with_hash {
if vector.len() > 1 {
pre_checked_map.entry(*size).or_insert_with(Vec::new);
pre_checked_map.get_mut(size).unwrap().append(&mut vector);
let hash_string: String = hasher.finalize().to_hex().to_string();
hashmap_with_hash.entry(hash_string.to_string()).or_insert_with(Vec::new);
hashmap_with_hash.get_mut(hash_string.as_str()).unwrap().push(file_entry.to_owned());
}
}
}
for (size, vector) in pre_checked_map.iter() {
self.information.number_of_duplicated_files_after_pre_hash += vector.len() - 1;
self.information.number_of_groups_after_pre_hash += 1;
self.information.lost_space_after_pre_hash += (vector.len() as u64 - 1) * size;
}
Some((*size, hashmap_with_hash, errors, bytes_read))
})
.while_some()
.collect();

// 2 step - Check full file hash
for (size, vector) in &pre_checked_map {
hashmap_with_hash = Default::default();
// Check if user aborted search(only from GUI)
if check_was_breaked.load(Ordering::Relaxed) {
return false;
}

for file_entry in vector {
if stop_receiver.is_some() && stop_receiver.unwrap().try_recv().is_ok() {
return false;
// Check results
for (size, hash_map, mut errors, bytes_read) in pre_hash_results {
self.information.bytes_read_when_hashing += bytes_read;
self.text_messages.warnings.append(&mut errors);
for (_hash, mut vec_file_entry) in hash_map {
if vec_file_entry.len() > 1 {
self.information.number_of_duplicated_files_after_pre_hash += vec_file_entry.len() - 1;
self.information.number_of_groups_after_pre_hash += 1;
self.information.lost_space_after_pre_hash += (vec_file_entry.len() as u64 - 1) * size;
pre_checked_map.entry(size).or_insert_with(Vec::new);
pre_checked_map.get_mut(&size).unwrap().append(&mut vec_file_entry);
}
file_handler = match File::open(&file_entry.path) {
Ok(t) => t,
Err(_) => {
self.text_messages.warnings.push(format!("Unable to check hash of file {}", file_entry.path.display()));
continue;
}
};
}
}

let mut error_reading_file: bool = false;
Common::print_time(start_time, SystemTime::now(), "check_files_hash - prehash".to_string());
let start_time: SystemTime = SystemTime::now();

let mut hasher: blake3::Hasher = blake3::Hasher::new();
let mut buffer = [0u8; 32 * 1024];
let mut read_bytes: u64 = 0;
loop {
let n = match file_handler.read(&mut buffer) {
/////////////////////////

#[allow(clippy::type_complexity)]
let full_hash_results: Vec<(u64, HashMap<String, Vec<FileEntry>>, Vec<String>, u64)> = pre_checked_map
.par_iter()
.map(|(size, vec_file_entry)| {
let mut hashmap_with_hash: HashMap<String, Vec<FileEntry>> = Default::default();
let mut errors: Vec<String> = Vec::new();
let mut file_handler: File;
let mut bytes_read: u64 = 0;
'fe: for file_entry in vec_file_entry {
if stop_receiver.is_some() && stop_receiver.unwrap().try_recv().is_ok() {
check_was_breaked.store(true, Ordering::Relaxed);
return None;
}
file_handler = match File::open(&file_entry.path) {
Ok(t) => t,
Err(_) => {
self.text_messages.warnings.push(format!("Error happened when checking hash of file {}", file_entry.path.display()));
error_reading_file = true;
break;
errors.push(format!("Unable to check hash of file {}", file_entry.path.display()));
continue 'fe;
}
};
if n == 0 {
break;
}

read_bytes += n as u64;
self.information.bytes_read_when_hashing += n as u64;
hasher.update(&buffer[..n]);
let mut hasher: blake3::Hasher = blake3::Hasher::new();
let mut buffer = [0u8; 1024 * 32];
let mut current_file_read_bytes: u64 = 0;

if self.check_method == CheckingMethod::HashMB && read_bytes >= HASH_MB_LIMIT_BYTES {
break;
loop {
let n = match file_handler.read(&mut buffer) {
Ok(t) => t,
Err(_) => {
errors.push(format!("Error happened when checking hash of file {}", file_entry.path.display()));
continue 'fe;
}
};
if n == 0 {
break;
}

current_file_read_bytes += n as u64;
bytes_read += n as u64;
hasher.update(&buffer[..n]);

if self.check_method == CheckingMethod::HashMB && current_file_read_bytes >= HASH_MB_LIMIT_BYTES {
break;
}
}
}
if !error_reading_file {

let hash_string: String = hasher.finalize().to_hex().to_string();
hashmap_with_hash.entry(hash_string.to_string()).or_insert_with(Vec::new);
hashmap_with_hash.get_mut(hash_string.as_str()).unwrap().push(file_entry.to_owned());
}
}
for (_string, vector) in hashmap_with_hash {
if vector.len() > 1 {
self.files_with_identical_hashes.entry(*size).or_insert_with(Vec::new);
self.files_with_identical_hashes.get_mut(size).unwrap().push(vector);
Some((*size, hashmap_with_hash, errors, bytes_read))
})
.while_some()
.collect();

// Check if user aborted search(only from GUI)
if check_was_breaked.load(Ordering::Relaxed) {
return false;
}

for (size, hash_map, mut errors, bytes_read) in full_hash_results {
self.information.bytes_read_when_hashing += bytes_read;
self.text_messages.warnings.append(&mut errors);
for (_hash, vec_file_entry) in hash_map {
if vec_file_entry.len() > 1 {
self.information.number_of_duplicated_files_after_pre_hash += vec_file_entry.len() - 1;
self.information.number_of_groups_after_pre_hash += 1;
self.information.lost_space_after_pre_hash += (vec_file_entry.len() as u64 - 1) * size;
self.files_with_identical_hashes.entry(size).or_insert_with(Vec::new);
self.files_with_identical_hashes.get_mut(&size).unwrap().push(vec_file_entry);
}
}
}

/////////////////////////

for (size, vector_vectors) in &self.files_with_identical_hashes {
for vector in vector_vectors {
self.information.number_of_duplicated_files_by_hash += vector.len() - 1;
Expand All @@ -606,7 +648,7 @@ impl DuplicateFinder {
}
}

Common::print_time(start_time, SystemTime::now(), "check_files_hash".to_string());
Common::print_time(start_time, SystemTime::now(), "check_files_hash - full hash".to_string());
true
}

Expand Down
8 changes: 4 additions & 4 deletions czkawka_core/src/same_music.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl SameMusic {

let tag = match Tag::new().read_from_path(&file_entry.path) {
Ok(t) => t,
Err(_) => return Option::from((file_entry, false)), // Data not in utf-8, etc.
Err(_) => return Some(None), // Data not in utf-8, etc.
};

file_entry.title = match tag.title() {
Expand All @@ -319,11 +319,11 @@ impl SameMusic {
None => 0,
};

Option::from((file_entry, true))
Some(Some(file_entry))
})
.while_some()
.filter(|file_entry| file_entry.1)
.map(|file_entry| file_entry.0)
.filter(|file_entry| file_entry.is_some())
.map(|file_entry| file_entry.unwrap())
.collect::<Vec<_>>();

// Adding files to Vector
Expand Down
18 changes: 10 additions & 8 deletions czkawka_core/src/similar_images.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl SimilarImages {
fn sort_images(&mut self, stop_receiver: Option<&Receiver<()>>) -> bool {
let hash_map_modification = SystemTime::now();

let vec_file_entry = self
let vec_file_entry: Vec<(FileEntry, [u8; 8])> = self
.images_to_check
.par_iter()
.map(|file_entry| {
Expand All @@ -288,7 +288,7 @@ impl SimilarImages {

let image = match image::open(file_entry.path.clone()) {
Ok(t) => t,
Err(_) => return Option::from((file_entry, [0u8; 8], false)), // Something is wrong with image
Err(_) => return Some(None), // Something is wrong with image
};
let dimensions = image.dimensions();

Expand All @@ -299,20 +299,22 @@ impl SimilarImages {
let mut buf = [0u8; 8];
buf.copy_from_slice(&hash.as_bytes());

Option::from((file_entry, buf, true))
Some(Some((file_entry, buf)))
})
.while_some()
.filter(|file_entry| file_entry.2)
.map(|file_entry| (file_entry.0, file_entry.1))
.collect::<Vec<(_, _)>>();
.filter(|file_entry| file_entry.is_some())
.map(|file_entry| file_entry.unwrap())
.collect::<Vec<(FileEntry, [u8; 8])>>();

for (file_entry, buf) in vec_file_entry {
self.bktree.add(buf);
self.image_hashes.entry(buf).or_insert_with(Vec::<FileEntry>::new);
self.image_hashes.get_mut(&buf).unwrap().push(file_entry.clone());
}

//let hash_map_modification = SystemTime::now();
Common::print_time(hash_map_modification, SystemTime::now(), "sort_images - reading data from files".to_string());
let hash_map_modification = SystemTime::now();

let similarity: u64 = match self.similarity {
Similarity::VeryHigh => 0,
Similarity::High => 1,
Expand Down Expand Up @@ -389,7 +391,7 @@ impl SimilarImages {

self.similar_vectors = new_vector;

Common::print_time(hash_map_modification, SystemTime::now(), "sort_images".to_string());
Common::print_time(hash_map_modification, SystemTime::now(), "sort_images - selecting data from BtreeMap".to_string());
true
}

Expand Down