-
Notifications
You must be signed in to change notification settings - Fork 2k
/
Copy pathfile_recovery.py
552 lines (453 loc) · 22.6 KB
/
file_recovery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
import os
import sys
import argparse
import struct
import time
import logging
import subprocess
import signal
from datetime import datetime, timedelta
from pathlib import Path
import binascii
# File signatures (magic numbers) for common file types
FILE_SIGNATURES = {
'jpg': [bytes([0xFF, 0xD8, 0xFF, 0xE0]), bytes([0xFF, 0xD8, 0xFF, 0xE1])],
'png': [bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A])],
'gif': [bytes([0x47, 0x49, 0x46, 0x38, 0x37, 0x61]), bytes([0x47, 0x49, 0x46, 0x38, 0x39, 0x61])],
'pdf': [bytes([0x25, 0x50, 0x44, 0x46])],
'zip': [bytes([0x50, 0x4B, 0x03, 0x04])],
'docx': [bytes([0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x06, 0x00])], # More specific signature
'xlsx': [bytes([0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x06, 0x00])], # More specific signature
'pptx': [bytes([0x50, 0x4B, 0x03, 0x04, 0x14, 0x00, 0x06, 0x00])], # More specific signature
'mp3': [bytes([0x49, 0x44, 0x33])],
'mp4': [bytes([0x00, 0x00, 0x00, 0x18, 0x66, 0x74, 0x79, 0x70])],
'avi': [bytes([0x52, 0x49, 0x46, 0x46])],
}
# Additional validation patterns to check after finding the signature
# This helps reduce false positives
VALIDATION_PATTERNS = {
'docx': [b'word/', b'[Content_Types].xml'],
'xlsx': [b'xl/', b'[Content_Types].xml'],
'pptx': [b'ppt/', b'[Content_Types].xml'],
'zip': [b'PK\x01\x02'], # Central directory header
'pdf': [b'obj', b'endobj'],
}
# File endings (trailer signatures) for some file types
FILE_TRAILERS = {
'jpg': bytes([0xFF, 0xD9]),
'png': bytes([0x49, 0x45, 0x4E, 0x44, 0xAE, 0x42, 0x60, 0x82]),
'gif': bytes([0x00, 0x3B]),
'pdf': bytes([0x25, 0x25, 0x45, 0x4F, 0x46]),
}
# Maximum file sizes to prevent recovering corrupted files
MAX_FILE_SIZES = {
'jpg': 30 * 1024 * 1024, # 30MB
'png': 50 * 1024 * 1024, # 50MB
'gif': 20 * 1024 * 1024, # 20MB
'pdf': 100 * 1024 * 1024, # 100MB
'zip': 200 * 1024 * 1024, # 200MB
'docx': 50 * 1024 * 1024, # 50MB
'xlsx': 50 * 1024 * 1024, # 50MB
'pptx': 100 * 1024 * 1024, # 100MB
'mp3': 50 * 1024 * 1024, # 50MB
'mp4': 1024 * 1024 * 1024, # 1GB
'avi': 1024 * 1024 * 1024, # 1GB
}
class FileRecoveryTool:
def __init__(self, source, output_dir, file_types=None, deep_scan=False,
block_size=512, log_level=logging.INFO, skip_existing=True,
max_scan_size=None, timeout_minutes=None):
"""
Initialize the file recovery tool
Args:
source (str): Path to the source device or directory
output_dir (str): Directory to save recovered files
file_types (list): List of file types to recover
deep_scan (bool): Whether to perform a deep scan
block_size (int): Block size for reading data
log_level (int): Logging level
skip_existing (bool): Skip existing files in output directory
max_scan_size (int): Maximum number of bytes to scan
timeout_minutes (int): Timeout in minutes
"""
self.source = source
self.output_dir = Path(output_dir)
self.file_types = file_types if file_types else list(FILE_SIGNATURES.keys())
self.deep_scan = deep_scan
self.block_size = block_size
self.skip_existing = skip_existing
self.max_scan_size = max_scan_size
self.timeout_minutes = timeout_minutes
self.timeout_reached = False
# Setup logging
self.setup_logging(log_level)
# Create output directory if it doesn't exist
self.output_dir.mkdir(parents=True, exist_ok=True)
# Statistics
self.stats = {
'total_files_recovered': 0,
'recovered_by_type': {},
'start_time': time.time(),
'bytes_scanned': 0,
'false_positives': 0
}
for file_type in self.file_types:
self.stats['recovered_by_type'][file_type] = 0
def setup_logging(self, log_level):
"""Set up logging configuration"""
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(f"recovery_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
]
)
self.logger = logging.getLogger('file_recovery')
def _setup_timeout(self):
"""Set up a timeout handler"""
if self.timeout_minutes:
def timeout_handler(signum, frame):
self.logger.warning(f"Timeout of {self.timeout_minutes} minutes reached!")
self.timeout_reached = True
# Set the timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(int(self.timeout_minutes * 60))
def get_device_size(self):
"""Get the size of the device or file"""
if os.path.isfile(self.source):
# Regular file
return os.path.getsize(self.source)
else:
# Block device
try:
# Try using blockdev command (Linux)
result = subprocess.run(['blockdev', '--getsize64', self.source],
capture_output=True, text=True, check=True)
return int(result.stdout.strip())
except (subprocess.SubprocessError, FileNotFoundError):
try:
# Try using ioctl (requires root)
import fcntl
with open(self.source, 'rb') as fd:
# BLKGETSIZE64 = 0x80081272
buf = bytearray(8)
fcntl.ioctl(fd, 0x80081272, buf)
return struct.unpack('L', buf)[0]
except:
# Last resort: try to seek to the end
try:
with open(self.source, 'rb') as fd:
fd.seek(0, 2) # Seek to end
return fd.tell()
except:
self.logger.warning("Could not determine device size. Using fallback size.")
# Fallback to a reasonable size for testing
return 1024 * 1024 * 1024 # 1GB
def scan_device(self):
"""Scan the device for deleted files"""
self.logger.info(f"Starting scan of {self.source}")
self.logger.info(f"Looking for file types: {', '.join(self.file_types)}")
try:
# Get device size
device_size = self.get_device_size()
self.logger.info(f"Device size: {self._format_size(device_size)}")
# Set up timeout if specified
if self.timeout_minutes:
self._setup_timeout()
self.logger.info(f"Timeout set for {self.timeout_minutes} minutes")
with open(self.source, 'rb', buffering=0) as device: # buffering=0 for direct I/O
self._scan_device_data(device, device_size)
except (IOError, OSError) as e:
self.logger.error(f"Error accessing source: {e}")
return False
self._print_summary()
return True
def _scan_device_data(self, device, device_size):
"""Scan the device data for file signatures"""
position = 0
# Limit scan size if specified
if self.max_scan_size and self.max_scan_size < device_size:
self.logger.info(f"Limiting scan to first {self._format_size(self.max_scan_size)} of device")
device_size = self.max_scan_size
# Create subdirectories for each file type
for file_type in self.file_types:
(self.output_dir / file_type).mkdir(exist_ok=True)
scan_start_time = time.time()
last_progress_time = scan_start_time
# Read the device in blocks
while position < device_size:
# Check if timeout reached
if self.timeout_reached:
self.logger.warning("Stopping scan due to timeout")
break
try:
# Seek to position first
device.seek(position)
# Read a block of data
data = device.read(self.block_size)
if not data:
break
self.stats['bytes_scanned'] += len(data)
# Check for file signatures in this block
for file_type in self.file_types:
signatures = FILE_SIGNATURES.get(file_type, [])
for signature in signatures:
sig_pos = data.find(signature)
if sig_pos != -1:
# Found a file signature, try to recover the file
absolute_pos = position + sig_pos
device.seek(absolute_pos)
self.logger.debug(f"Found {file_type} signature at position {absolute_pos}")
# Recover the file
if self._recover_file(device, file_type, absolute_pos):
self.stats['total_files_recovered'] += 1
self.stats['recovered_by_type'][file_type] += 1
else:
self.stats['false_positives'] += 1
# Reset position to continue scanning
device.seek(position + self.block_size)
# Update position and show progress
position += self.block_size
current_time = time.time()
# Show progress every 5MB or 10 seconds, whichever comes first
if (position % (5 * 1024 * 1024) == 0) or (current_time - last_progress_time >= 10):
percent = (position / device_size) * 100 if device_size > 0 else 0
elapsed = current_time - self.stats['start_time']
# Calculate estimated time remaining
if position > 0 and device_size > 0:
bytes_per_second = position / elapsed if elapsed > 0 else 0
remaining_bytes = device_size - position
eta_seconds = remaining_bytes / bytes_per_second if bytes_per_second > 0 else 0
eta_str = str(timedelta(seconds=int(eta_seconds)))
else:
eta_str = "unknown"
self.logger.info(f"Progress: {percent:.2f}% ({self._format_size(position)} / {self._format_size(device_size)}) - "
f"{self.stats['total_files_recovered']} files recovered - "
f"Elapsed: {timedelta(seconds=int(elapsed))} - ETA: {eta_str}")
last_progress_time = current_time
except Exception as e:
self.logger.error(f"Error reading at position {position}: {e}")
position += self.block_size # Skip this block and continue
def _validate_file_content(self, data, file_type):
"""
Additional validation to reduce false positives
Args:
data: File data to validate
file_type: Type of file to validate
Returns:
bool: True if file content appears valid
"""
# Check minimum size
if len(data) < 100:
return False
# Check for validation patterns
patterns = VALIDATION_PATTERNS.get(file_type, [])
if patterns:
for pattern in patterns:
if pattern in data:
return True
return False # None of the patterns were found
# For file types without specific validation patterns
return True
def _recover_file(self, device, file_type, start_position):
"""
Recover a file of the given type starting at the given position
Args:
device: Open file handle to the device
file_type: Type of file to recover
start_position: Starting position of the file
Returns:
bool: True if file was recovered successfully
"""
max_size = MAX_FILE_SIZES.get(file_type, 10 * 1024 * 1024) # Default to 10MB
trailer = FILE_TRAILERS.get(file_type)
# Generate a unique filename
filename = f"{file_type}_{start_position}_{int(time.time())}_{binascii.hexlify(os.urandom(4)).decode()}.{file_type}"
output_path = self.output_dir / file_type / filename
if self.skip_existing and output_path.exists():
self.logger.debug(f"Skipping existing file: {output_path}")
return False
# Save the current position to restore later
current_pos = device.tell()
try:
# Seek to the start of the file
device.seek(start_position)
# Read the file data
if trailer and self.deep_scan:
# If we know the trailer and deep scan is enabled, read until trailer
file_data = self._read_until_trailer(device, trailer, max_size)
else:
# Otherwise, use heuristics to determine file size
file_data = self._read_file_heuristic(device, file_type, max_size)
if not file_data or len(file_data) < 100: # Ignore very small files
return False
# Additional validation to reduce false positives
if not self._validate_file_content(file_data, file_type):
self.logger.debug(f"Skipping invalid {file_type} file at position {start_position}")
return False
# Write the recovered file
with open(output_path, 'wb') as f:
f.write(file_data)
self.logger.info(f"Recovered {file_type} file: {filename} ({self._format_size(len(file_data))})")
return True
except Exception as e:
self.logger.error(f"Error recovering file at position {start_position}: {e}")
return False
finally:
# Restore the original position
try:
device.seek(current_pos)
except:
pass # Ignore seek errors in finally block
def _read_until_trailer(self, device, trailer, max_size):
"""Read data until a trailer signature is found or max size is reached"""
buffer = bytearray()
chunk_size = 4096
while len(buffer) < max_size:
try:
chunk = device.read(chunk_size)
if not chunk:
break
buffer.extend(chunk)
# Check if trailer is in the buffer
trailer_pos = buffer.find(trailer, max(0, len(buffer) - len(trailer) - chunk_size))
if trailer_pos != -1:
# Found trailer, return data up to and including the trailer
return buffer[:trailer_pos + len(trailer)]
except Exception as e:
self.logger.error(f"Error reading chunk: {e}")
break
# If we reached max size without finding a trailer, return what we have
return buffer if len(buffer) > 100 else None
def _read_file_heuristic(self, device, file_type, max_size):
"""
Use heuristics to determine file size when trailer is unknown
This is a simplified approach - real tools use more sophisticated methods
"""
buffer = bytearray()
chunk_size = 4096
valid_chunks = 0
invalid_chunks = 0
# For Office documents and ZIP files, read a larger initial chunk to validate
initial_chunk_size = 16384 if file_type in ['docx', 'xlsx', 'pptx', 'zip'] else chunk_size
# Read initial chunk for validation
initial_chunk = device.read(initial_chunk_size)
if not initial_chunk:
return None
buffer.extend(initial_chunk)
# For Office documents, check if it contains required elements
if file_type in ['docx', 'xlsx', 'pptx', 'zip']:
# Basic validation for Office Open XML files
if file_type == 'docx' and b'word/' not in initial_chunk:
return None
if file_type == 'xlsx' and b'xl/' not in initial_chunk:
return None
if file_type == 'pptx' and b'ppt/' not in initial_chunk:
return None
if file_type == 'zip' and b'PK\x01\x02' not in initial_chunk:
return None
# Continue reading chunks
while len(buffer) < max_size:
try:
chunk = device.read(chunk_size)
if not chunk:
break
buffer.extend(chunk)
# Simple heuristic: for binary files, check if chunk contains too many non-printable characters
# This is a very basic approach and would need to be refined for real-world use
if file_type in ['jpg', 'png', 'gif', 'pdf', 'zip', 'docx', 'xlsx', 'pptx', 'mp3', 'mp4', 'avi']:
# For binary files, we continue reading until we hit max size or end of device
valid_chunks += 1
# For ZIP-based formats, check for corruption
if file_type in ['zip', 'docx', 'xlsx', 'pptx'] and b'PK' not in chunk and valid_chunks > 10:
# If we've read several chunks and don't see any more PK signatures, we might be past the file
invalid_chunks += 1
else:
# For text files, we could check for text validity
printable_ratio = sum(32 <= b <= 126 or b in (9, 10, 13) for b in chunk) / len(chunk)
if printable_ratio < 0.7: # If less than 70% printable characters
invalid_chunks += 1
else:
valid_chunks += 1
# If we have too many invalid chunks in a row, stop
if invalid_chunks > 3:
return buffer[:len(buffer) - (invalid_chunks * chunk_size)]
except Exception as e:
self.logger.error(f"Error reading chunk in heuristic: {e}")
break
return buffer
def _format_size(self, size_bytes):
"""Format size in bytes to a human-readable string"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024 or unit == 'TB':
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024
def _print_summary(self):
"""Print a summary of the recovery operation"""
elapsed = time.time() - self.stats['start_time']
self.logger.info("=" * 50)
self.logger.info("Recovery Summary")
self.logger.info("=" * 50)
self.logger.info(f"Total files recovered: {self.stats['total_files_recovered']}")
self.logger.info(f"False positives detected and skipped: {self.stats['false_positives']}")
self.logger.info(f"Total data scanned: {self._format_size(self.stats['bytes_scanned'])}")
self.logger.info(f"Time elapsed: {timedelta(seconds=int(elapsed))}")
self.logger.info("Files recovered by type:")
for file_type, count in self.stats['recovered_by_type'].items():
if count > 0:
self.logger.info(f" - {file_type}: {count}")
if self.timeout_reached:
self.logger.info("Note: Scan was stopped due to timeout")
self.logger.info("=" * 50)
def main():
"""Main function to parse arguments and run the recovery tool"""
parser = argparse.ArgumentParser(description='File Recovery Tool - Recover deleted files from storage devices')
parser.add_argument('source', help='Source device or directory to recover files from (e.g., /dev/sdb, /media/usb)')
parser.add_argument('output', help='Directory to save recovered files')
parser.add_argument('-t', '--types', nargs='+', choices=FILE_SIGNATURES.keys(), default=None,
help='File types to recover (default: all supported types)')
parser.add_argument('-d', '--deep-scan', action='store_true',
help='Perform a deep scan (slower but more thorough)')
parser.add_argument('-b', '--block-size', type=int, default=512,
help='Block size for reading data (default: 512 bytes)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Enable verbose output')
parser.add_argument('-q', '--quiet', action='store_true',
help='Suppress all output except errors')
parser.add_argument('--no-skip', action='store_true',
help='Do not skip existing files in output directory')
parser.add_argument('--max-size', type=int,
help='Maximum size to scan in MB (e.g., 1024 for 1GB)')
parser.add_argument('--timeout', type=int, default=None,
help='Stop scanning after specified minutes')
args = parser.parse_args()
# Set logging level based on verbosity
if args.quiet:
log_level = logging.ERROR
elif args.verbose:
log_level = logging.DEBUG
else:
log_level = logging.INFO
# Convert max size from MB to bytes if specified
max_scan_size = args.max_size * 1024 * 1024 if args.max_size else None
# Create and run the recovery tool
recovery_tool = FileRecoveryTool(
source=args.source,
output_dir=args.output,
file_types=args.types,
deep_scan=args.deep_scan,
block_size=args.block_size,
log_level=log_level,
skip_existing=not args.no_skip,
max_scan_size=max_scan_size,
timeout_minutes=args.timeout
)
try:
recovery_tool.scan_device()
except KeyboardInterrupt:
print("\nRecovery process interrupted by user.")
recovery_tool._print_summary()
sys.exit(1)
if __name__ == "__main__":
main()