
Desktop Motion Detection s GPU akcelerací
🎯 Účel programu
Tento nástroj sleduje vybrané okno na ploše, detekuje v něm pohyb a zobrazuje výsledky v reálném čase. Díky volitelné podpoře GPU akcelerace (NVIDIA CUDA) je zpracování obrazu rychlejší a plynulejší, zejména u vysokého rozlišení.
🚀 Hlavní vlastnosti
1. Detekce pohybu
Víceúrovňová analýza: program spočítá rozdíl mezi po sobě jdoucími snímky a zvýrazní oblasti pohybu
Filtrace šumu: práh detekce a minimální velikost oblasti lze dynamicky nastavit posuvníky
Výřez oblasti zájmu (ROI): možnost omezit detekci jen na vybranou část okna
2. GPU akcelerace (CUDA)
Automatické rozpoznání dostupnosti CUDA-GPU
Přepínání na CPU, pokud GPU není k dispozici nebo dojde k chybě
Předalokace paměti na GPU pro minimalizaci latence
Fallback mechanika: při selhání kteréhokoli GPU kroku se operace provádějí na CPU
3. Watchdog a odolnost proti zamrznutí
Watchdog vlákno hlídá aktivitu programu a volá obnovu, pokud není žádná interakce delší dobu
Časové limity (timeout) pro potenciálně blokující operace
Automatické restartování při kritických chybách nebo dlouhém výpadku
4. Více metod snímání okna
PrintWindow (různé příznaky)
PIL ImageGrab
Celoplošné zachycení a ořez jako poslední záchrana
Vše chráněno timeout dekorátorem, aby žádná metoda nezablokovala běh aplikace
5. Uživatelské ovládání
Okno “Motion Detection”: výstupní zobrazovač
Okno “Controls”: posuvníky pro pražící práh, rozmazání, zpoždění, zoom a další parametry
Klávesové zkratky:
q – ukončení programu
r – výběr nové oblasti (ROI)
z – přepnutí auto-zoomu
c – vymazání historie detekovaných oblastí
🔧 Monitorování a obnovování
Logování na konzoli s časovými razítky
Graceful shutdown při signálech SIGINT/SIGBREAK
Automatické obnovení po selhání až 5×, poté nutné manuální spuštění
📊 Rozšířené funkce
Auto-zoom: program sám vyhledá oblast s nejvíce detekcemi a plynule ji přiblíží
Smooth tracking: plynulé posouvání výřezu podle rychlosti pohybu a frekvence detekcí
Vizualizace historie: ukládání a resetování počtu detekcí pro jednotlivé oblasti
✅ Shrnutí
Spolehlivá detekce pohybu i ve výkonnostně náročných scénářích
Dynamické přepínání mezi GPU a CPU zpracováním
Robustnost díky watchdogu a automatickému restartu
Interaktivní ovládání prostřednictvím intuitivních posuvníků a klávesových zkratek
Poznámka: Pro plné využití GPU akcelerace je potřeba mít nainstalovanou OpenCV s podporou CUDA a odpovídající ovladače NVIDIA.
import os
import numpy as np
import time
import cv2
import datetime
import threading
import signal
import sys
from PIL import ImageGrab, Image, ImageDraw
"""
Desktop Motion Detection with GPU Acceleration
This script captures a selected window and performs motion detection.
When available, it uses GPU acceleration via NVIDIA CUDA to improve performance.
GPU acceleration benefits:
1. Faster image processing operations (color conversion, blurring, etc.)
2. Reduced CPU load, allowing for smoother operation
3. Better performance with high-resolution video or large windows
4. More responsive detection with lower latency
The script automatically falls back to CPU processing if:
- CUDA is not available in the OpenCV build
- No CUDA-capable GPU is detected
- Any GPU operation fails during execution
The script includes watchdog mechanisms to prevent freezing:
1. Operation timeouts for potentially blocking operations
2. Automatic recovery from unresponsive states
3. Graceful degradation when resources are constrained
4. Monitoring of critical operations to detect and recover from hangs
"""
# Global variables for watchdog mechanism
program_alive = True
last_activity_time = time.time()
class Watchdog:
"""
Watchdog class to monitor program activity and recover from hangs
"""
def __init__(self, timeout=30, callback=None):
"""
Initialize the watchdog
Args:
timeout (int): Maximum time in seconds before considering the program hung
callback (function): Function to call when a hang is detected
"""
self.timeout = timeout
self.callback = callback
self.is_running = False
self.thread = None
def start(self):
"""Start the watchdog thread"""
if self.is_running:
return
self.is_running = True
self.thread = threading.Thread(target=self._monitor, daemon=True)
self.thread.start()
print(f"[{get_timestamp()}] Watchdog started with timeout of {self.timeout} seconds")
def stop(self):
"""Stop the watchdog thread"""
self.is_running = False
if self.thread:
self.thread.join(1.0) # Wait for thread to finish with timeout
print(f"[{get_timestamp()}] Watchdog stopped")
def _monitor(self):
"""Monitor thread function"""
global last_activity_time, program_alive
while self.is_running and program_alive:
time.sleep(1) # Check every second
# Check if the program is still active
current_time = time.time()
if current_time - last_activity_time > self.timeout:
print(f"[{get_timestamp()}] WARNING: Program appears to be hung for {self.timeout} seconds")
if self.callback:
try:
self.callback()
except Exception as e:
print(f"[{get_timestamp()}] Error in watchdog callback: {e}")
# Reset the timer to prevent multiple callbacks
last_activity_time = time.time()
def update_activity():
"""Update the last activity timestamp to indicate the program is still alive"""
global last_activity_time
last_activity_time = time.time()
def timeout(seconds, default=None):
"""
Decorator to add a timeout to a function
Args:
seconds (int): Timeout in seconds
default: Default value to return if timeout occurs
Returns:
Decorated function
"""
def decorator(func):
def wrapper(*args, **kwargs):
result = [default]
exception = [None]
def target():
try:
result[0] = func(*args, **kwargs)
except Exception as e:
exception[0] = e
thread = threading.Thread(target=target)
thread.daemon = True
thread.start()
thread.join(seconds)
if thread.is_alive():
print(f"[{get_timestamp()}] WARNING: Function {func.__name__} timed out after {seconds} seconds")
return default
if exception[0]:
raise exception[0]
return result[0]
return wrapper
return decorator
def handle_hang():
"""Handle a detected program hang"""
global program_alive
print(f"[{get_timestamp()}] CRITICAL: Attempting to recover from program hang")
# Try to release OpenCV windows
try:
cv2.destroyAllWindows()
print(f"[{get_timestamp()}] Released OpenCV windows")
except Exception as e:
print(f"[{get_timestamp()}] Failed to release OpenCV windows: {e}")
# Set flag to terminate the program
program_alive = False
# Try to restart the program
print(f"[{get_timestamp()}] Initiating program restart...")
# This will be caught by the main exception handler
raise Exception("Watchdog detected program hang, forcing restart")
# Initialize the watchdog
watchdog = Watchdog(timeout=60, callback=handle_hang)
# Set up signal handlers for graceful termination
def signal_handler(sig, frame):
"""Handle termination signals"""
global program_alive
print(f"[{get_timestamp()}] Received termination signal, shutting down gracefully...")
program_alive = False
sys.exit(0)
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler)
if hasattr(signal, 'SIGBREAK'): # Windows-specific signal
signal.signal(signal.SIGBREAK, signal_handler)
# Check if CUDA is available for GPU acceleration
try:
cv2.cuda.getCudaEnabledDeviceCount()
CUDA_AVAILABLE = cv2.cuda.getCudaEnabledDeviceCount() > 0
if CUDA_AVAILABLE:
print(f"CUDA is available. Found {cv2.cuda.getCudaEnabledDeviceCount()} GPU device(s).")
# Get device properties
for i in range(cv2.cuda.getCudaEnabledDeviceCount()):
props = cv2.cuda.getDevice(i)
print(f"GPU Device {i}: {cv2.cuda.getDeviceName(i)}")
else:
print("CUDA is not available. Using CPU for computations.")
except (AttributeError, cv2.error) as e:
print(f"CUDA is not available in this OpenCV build: {e}")
print("Using CPU for computations.")
CUDA_AVAILABLE = False
try:
import win32gui
import win32con
import win32api
import win32ui
from ctypes import windll
WIN32_AVAILABLE = True
except ImportError:
print("Warning: win32gui modules not found. Window-specific capture will not be available.")
print("To install win32gui: pip install pywin32")
WIN32_AVAILABLE = False
def get_window_list():
if not WIN32_AVAILABLE:
return []
windows = []
def enum_windows_callback(hwnd, results):
if win32gui.IsWindowVisible(hwnd):
window_text = win32gui.GetWindowText(hwnd)
if window_text and window_text != "Motion Detection":
results.append((hwnd, window_text))
return True
win32gui.EnumWindows(enum_windows_callback, windows)
return windows
def find_window_by_name(name):
if not WIN32_AVAILABLE:
return None
windows = get_window_list()
for hwnd, title in windows:
if name in title:
return hwnd
return None
def select_window():
if not WIN32_AVAILABLE:
print("Window selection is not available without win32gui modules.")
return None
windows = get_window_list()
if not windows:
print("No windows found to select.")
return None
window_name = "Select Window"
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(window_name, 800, 600)
height = len(windows) * 30 + 60
img = np.zeros((height, 800, 3), dtype=np.uint8)
cv2.putText(img, "Click on a window to select it for motion detection:",
(20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
for i, (hwnd, title) in enumerate(windows):
y = 60 + i * 30
display_title = title[:50] + "..." if len(title) > 50 else title
cv2.putText(img, f"{i+1}. {display_title}",
(20, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1)
cv2.imshow(window_name, img)
selected_hwnd = [None]
def mouse_callback(event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONDOWN:
index = (y - 60) // 30
if 0 <= index < len(windows):
selected_hwnd[0] = windows[index][0]
cv2.destroyWindow(window_name)
cv2.setMouseCallback(window_name, mouse_callback)
while selected_hwnd[0] is None:
try:
key = cv2.waitKey(100)
except (ValueError, TypeError) as e:
print(f"[{get_timestamp()}] Chyba při čekání v select_window: {e}. Použití výchozí hodnoty 100ms.")
key = cv2.waitKey(100)
if key == 27:
cv2.destroyWindow(window_name)
return None
if 49 <= key <= 57:
index = key - 49
if index < len(windows):
selected_hwnd[0] = windows[index][0]
cv2.destroyWindow(window_name)
if cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) < 1:
return None
return selected_hwnd[0]
@timeout(5, default=None)
def _capture_window_method1(hwnd, width, height):
"""Capture window using PrintWindow with PW_RENDERFULLCONTENT (3)"""
update_activity()
hwnd_dc = None
mfc_dc = None
save_dc = None
save_bitmap = None
img = None
try:
# Get device context
hwnd_dc = win32gui.GetWindowDC(hwnd)
mfc_dc = win32ui.CreateDCFromHandle(hwnd_dc)
save_dc = mfc_dc.CreateCompatibleDC()
save_bitmap = win32ui.CreateBitmap()
save_bitmap.CreateCompatibleBitmap(mfc_dc, width, height)
save_dc.SelectObject(save_bitmap)
# Try with flag 3 (PW_RENDERFULLCONTENT) - best for modern Windows
result = windll.user32.PrintWindow(hwnd, save_dc.GetSafeHdc(), 3)
if result:
bmpinfo = save_bitmap.GetInfo()
bmpstr = save_bitmap.GetBitmapBits(True)
img = Image.frombuffer(
'RGB',
(bmpinfo['bmWidth'], bmpinfo['bmHeight']),
bmpstr, 'raw', 'BGRX', 0, 1)
except Exception as e:
print(f"[{get_timestamp()}] Method 1 (PrintWindow flag 3) failed: {e}")
finally:
# Clean up resources in reverse order of creation
try:
if save_bitmap:
win32gui.DeleteObject(save_bitmap.GetHandle())
except Exception as e:
print(f"[{get_timestamp()}] Failed to delete bitmap: {e}")
try:
if save_dc:
save_dc.DeleteDC()
except Exception as e:
print(f"[{get_timestamp()}] Failed to delete save_dc: {e}")
try:
if mfc_dc:
mfc_dc.DeleteDC()
except Exception as e:
print(f"[{get_timestamp()}] Failed to delete mfc_dc: {e}")
try:
if hwnd_dc:
win32gui.ReleaseDC(hwnd, hwnd_dc)
except Exception as e:
print(f"[{get_timestamp()}] Failed to release hwnd_dc: {e}")
return img
@timeout(5, default=None)
def _capture_window_method2(hwnd, width, height):
"""Capture window using PrintWindow with PW_CLIENTONLY (2)"""
update_activity()
hwnd_dc = None
mfc_dc = None
save_dc = None
save_bitmap = None
img = None
try:
# Get device context
hwnd_dc = win32gui.GetWindowDC(hwnd)
mfc_dc = win32ui.CreateDCFromHandle(hwnd_dc)
save_dc = mfc_dc.CreateCompatibleDC()
save_bitmap = win32ui.CreateBitmap()
save_bitmap.CreateCompatibleBitmap(mfc_dc, width, height)
save_dc.SelectObject(save_bitmap)
# Try with flag 2 (PW_CLIENTONLY)
result = windll.user32.PrintWindow(hwnd, save_dc.GetSafeHdc(), 2)
if result:
bmpinfo = save_bitmap.GetInfo()
bmpstr = save_bitmap.GetBitmapBits(True)
img = Image.frombuffer(
'RGB',
(bmpinfo['bmWidth'], bmpinfo['bmHeight']),
bmpstr, 'raw', 'BGRX', 0, 1)
except Exception as e:
print(f"[{get_timestamp()}] Method 2 (PrintWindow flag 2) failed: {e}")
finally:
# Clean up resources in reverse order of creation
try:
if save_bitmap:
win32gui.DeleteObject(save_bitmap.GetHandle())
except Exception as e:
print(f"[{get_timestamp()}] Failed to delete bitmap: {e}")
try:
if save_dc:
save_dc.DeleteDC()
except Exception as e:
print(f"[{get_timestamp()}] Failed to delete save_dc: {e}")
try:
if mfc_dc:
mfc_dc.DeleteDC()
except Exception as e:
print(f"[{get_timestamp()}] Failed to delete mfc_dc: {e}")
try:
if hwnd_dc:
win32gui.ReleaseDC(hwnd, hwnd_dc)
except Exception as e:
print(f"[{get_timestamp()}] Failed to release hwnd_dc: {e}")
return img
@timeout(5, default=None)
def _capture_window_method3(hwnd, width, height):
"""Capture window using PrintWindow with default flag (0)"""
update_activity()
hwnd_dc = None
mfc_dc = None
save_dc = None
save_bitmap = None
img = None
try:
# Get device context
hwnd_dc = win32gui.GetWindowDC(hwnd)
mfc_dc = win32ui.CreateDCFromHandle(hwnd_dc)
save_dc = mfc_dc.CreateCompatibleDC()
save_bitmap = win32ui.CreateBitmap()
save_bitmap.CreateCompatibleBitmap(mfc_dc, width, height)
save_dc.SelectObject(save_bitmap)
# Try with flag 0 (default)
result = windll.user32.PrintWindow(hwnd, save_dc.GetSafeHdc(), 0)
if result:
bmpinfo = save_bitmap.GetInfo()
bmpstr = save_bitmap.GetBitmapBits(True)
img = Image.frombuffer(
'RGB',
(bmpinfo['bmWidth'], bmpinfo['bmHeight']),
bmpstr, 'raw', 'BGRX', 0, 1)
except Exception as e:
print(f"[{get_timestamp()}] Method 3 (PrintWindow flag 0) failed: {e}")
finally:
# Clean up resources in reverse order of creation
try:
if save_bitmap:
win32gui.DeleteObject(save_bitmap.GetHandle())
except Exception as e:
print(f"[{get_timestamp()}] Failed to delete bitmap: {e}")
try:
if save_dc:
save_dc.DeleteDC()
except Exception as e:
print(f"[{get_timestamp()}] Failed to delete save_dc: {e}")
try:
if mfc_dc:
mfc_dc.DeleteDC()
except Exception as e:
print(f"[{get_timestamp()}] Failed to delete mfc_dc: {e}")
try:
if hwnd_dc:
win32gui.ReleaseDC(hwnd, hwnd_dc)
except Exception as e:
print(f"[{get_timestamp()}] Failed to release hwnd_dc: {e}")
return img
@timeout(5, default=None)
def _capture_window_method4(left, top, right, bottom, hwnd):
"""Capture window using PIL's ImageGrab"""
update_activity()
img = None
try:
# Try to bring window to foreground for ImageGrab
try:
# Check if window still exists
if not win32gui.IsWindow(hwnd):
print(f"[{get_timestamp()}] Window no longer exists for SetForegroundWindow")
return None
# Try to bring window to foreground
win32gui.SetForegroundWindow(hwnd)
time.sleep(0.2) # Give window time to come to foreground
except Exception as e:
# Log but continue - we might still be able to capture the screen region
print(f"[{get_timestamp()}] SetForegroundWindow failed: {e}")
print(f"[{get_timestamp()}] Attempting to capture anyway...")
# Try direct screen capture even if SetForegroundWindow failed
try:
# Capture the region of the screen where the window is
img = ImageGrab.grab(bbox=(left, top, right, bottom))
print(f"[{get_timestamp()}] Used ImageGrab fallback method")
except Exception as e:
print(f"[{get_timestamp()}] ImageGrab.grab failed: {e}")
return None
return img
except Exception as e:
print(f"[{get_timestamp()}] Method 4 (ImageGrab) failed: {e}")
return None
@timeout(10, default=None)
def capture_window(hwnd):
"""
Capture a window with multiple fallback methods and timeout protection
Args:
hwnd: Window handle to capture
Returns:
PIL.Image or None if capture failed
"""
update_activity()
if not WIN32_AVAILABLE:
print(f"[{get_timestamp()}] Window capture not available without win32gui modules")
return None
# Try to get window dimensions with timeout protection
try:
# Check if window still exists
if not win32gui.IsWindow(hwnd):
print(f"[{get_timestamp()}] Window no longer exists")
return None
left, top, right, bottom = win32gui.GetWindowRect(hwnd)
width = right - left
height = bottom - top
# Check if window is minimized or has invalid dimensions
if width <= 0 or height <= 0:
print(f"[{get_timestamp()}] Window is minimized or has zero size: {width}x{height}")
# Try to restore the window if it's minimized
if win32gui.IsIconic(hwnd):
print(f"[{get_timestamp()}] Attempting to restore minimized window...")
win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
time.sleep(0.5) # Give the window time to restore
left, top, right, bottom = win32gui.GetWindowRect(hwnd)
width = right - left
height = bottom - top
if width <= 0 or height <= 0:
print(f"[{get_timestamp()}] Failed to restore window to valid size")
return None
else:
return None
except Exception as e:
print(f"[{get_timestamp()}] Error getting window dimensions: {e}")
return None
# Try to ensure the window is in a good state for capture
try:
# Check if window is minimized and restore it
if win32gui.IsIconic(hwnd):
print(f"[{get_timestamp()}] Window is minimized, attempting to restore...")
win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
time.sleep(0.5) # Give window time to restore
# Try to bring window to foreground
win32gui.SetForegroundWindow(hwnd)
time.sleep(0.2) # Give window time to come to foreground
except Exception as e:
print(f"[{get_timestamp()}] Warning: Could not prepare window for capture: {e}")
# Try multiple capture methods with timeout protection
img = None
errors = []
# Method 1: PrintWindow with PW_RENDERFULLCONTENT (3)
if img is None:
img = _capture_window_method1(hwnd, width, height)
if img is None:
errors.append("Method 1 (PrintWindow flag 3) failed or timed out")
# Method 2: PrintWindow with PW_CLIENTONLY (2)
if img is None:
img = _capture_window_method2(hwnd, width, height)
if img is None:
errors.append("Method 2 (PrintWindow flag 2) failed or timed out")
# Method 3: PrintWindow with default flag (0)
if img is None:
img = _capture_window_method3(hwnd, width, height)
if img is None:
errors.append("Method 3 (PrintWindow flag 0) failed or timed out")
# Method 4: Fallback to PIL's ImageGrab
if img is None:
img = _capture_window_method4(left, top, right, bottom, hwnd)
if img is None:
errors.append("Method 4 (ImageGrab) failed or timed out")
# Method 5: Last resort - try to capture entire screen and crop
if img is None:
try:
print(f"[{get_timestamp()}] Attempting last resort capture method - full screen capture")
# Capture entire screen
full_screen = ImageGrab.grab()
screen_width, screen_height = full_screen.size
# Adjust crop dimensions to stay within screen boundaries
adjusted_left = max(0, left)
adjusted_top = max(0, top)
adjusted_right = min(screen_width, right)
adjusted_bottom = min(screen_height, bottom)
adjusted_width = adjusted_right - adjusted_left
adjusted_height = adjusted_bottom - adjusted_top
# Crop to window area if dimensions are valid
if adjusted_width > 0 and adjusted_height > 0:
img = full_screen.crop((adjusted_left, adjusted_top, adjusted_right, adjusted_bottom))
if adjusted_left != left or adjusted_top != top or adjusted_right != right or adjusted_bottom != bottom:
print(f"[{get_timestamp()}] Last resort method succeeded with adjusted dimensions - window was partially off-screen")
else:
print(f"[{get_timestamp()}] Last resort method succeeded - cropped full screen capture")
else:
print(f"[{get_timestamp()}] Last resort method failed - invalid crop dimensions after adjustment")
print(f"[{get_timestamp()}] Original: ({left}, {top}, {right}, {bottom}), Adjusted: ({adjusted_left}, {adjusted_top}, {adjusted_right}, {adjusted_bottom})")
errors.append("Method 5 (Full screen capture and crop) failed - invalid dimensions")
except Exception as e:
print(f"[{get_timestamp()}] Last resort capture method failed: {e}")
errors.append(f"Method 5 (Full screen capture and crop) failed: {e}")
if img is None and errors:
print(f"[{get_timestamp()}] All capture methods failed:")
for error in errors:
print(f"[{get_timestamp()}] - {error}")
update_activity()
return img
def get_timestamp():
"""Generate a formatted timestamp for logging"""
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def process_screen_motion(hwnd):
global program_alive, CUDA_AVAILABLE
# Start the watchdog
watchdog.start()
window_title = ""
if hwnd is not None:
try:
window_title = win32gui.GetWindowText(hwnd)
print(f"[{get_timestamp()}] Starting motion detection for window: {window_title}")
except Exception as e:
print(f"[{get_timestamp()}] Error getting window information: {e}")
print(f"[{get_timestamp()}] Aplikace se ukončí.")
watchdog.stop()
return
else:
print(f"[{get_timestamp()}] Window not found. Aplikace se ukončí.")
watchdog.stop()
return
# Update activity to indicate we're making progress
update_activity()
cv2.namedWindow("Motion Detection", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Motion Detection", 800, 600)
cv2.moveWindow("Motion Detection", 20, 20)
# Create a separate window for controls to make them more responsive
cv2.namedWindow("Controls", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Controls", 400, 480) # Increased height to accommodate additional information
cv2.moveWindow("Controls", 830, 20)
cv2.createTrackbar('Threshold', "Controls", 5, 100, lambda x: None)
cv2.createTrackbar('Min Area', "Controls", 110, 10000, lambda x: None)
cv2.createTrackbar('Blur', "Controls", 5, 51, lambda x: None)
cv2.createTrackbar('Delay (ms)', "Controls", 5, 1000, lambda x: None)
cv2.createTrackbar('Max Width', "Controls", 1280, 3840, lambda x: None) # Default 1280, max 4K
cv2.createTrackbar('Zoom Threshold', "Controls", 5, 20, lambda x: None) # Number of detections needed to trigger zoom
cv2.createTrackbar('Zoom Scale', "Controls", 10, 30, lambda x: None) # Zoom scale factor (lower values = less zoom)
cv2.createTrackbar('Area Expansion', "Controls", 120, 150, lambda x: None) # Area expansion factor (higher values = larger area)
cv2.createTrackbar('Movement Speed', "Controls", 90, 100, lambda x: None) # Speed of zoomed view movement (0-100)
# Memory optimization: Pre-allocate GPU matrices if CUDA is available
gpu_frame = None
gpu_gray = None
gpu_prev_gray = None
gpu_frame_diff = None
# Variables for auto-zoom feature
motion_area_history = {} # Dictionary to track motion areas and their detection counts
current_zoom_area = None # Currently zoomed area (x, y, w, h)
zoom_active = False # Flag to indicate if zoom is active
zoom_cooldown = 0 # Cooldown counter to prevent rapid zoom changes
zoom_cooldown_threshold = 10 # Number of frames to wait before allowing zoom change
# Variables for smooth zoomed view movement
current_view_center = None # Current center point of the zoomed view (x, y)
target_view_center = None # Target center point to move towards (x, y)
view_movement_speed = 0.5 # Speed factor for smooth movement (0-1, higher = faster)
last_view_reset_time = time.time() # Time of last view center reset
view_reset_interval = 5 # Reset view center every 5 seconds to prevent getting stuck
last_movement_time = time.time() # Time of last significant movement
movement_timeout = 1 # Force reset if no movement for 1 second
last_target_update_time = time.time() # Time of last target position update
target_update_timeout = 0.5 # Force target update if no update for 0.5 seconds
consecutive_resets = 0 # Counter for consecutive resets due to lack of movement
max_consecutive_resets = 3 # Threshold for warning about persistent tracking issues
force_update_counter = 0 # Counter for forcing updates even when no motion is detected
frame_count = 0 # Counter for frames processed
full_scan_interval = 100 # Perform a full frame scan every N frames
last_full_scan_time = time.time() # Time of last full frame scan
if CUDA_AVAILABLE:
try:
# Pre-allocate GPU matrices to avoid repeated allocation/deallocation
gpu_frame = cv2.cuda_GpuMat()
gpu_gray = cv2.cuda_GpuMat()
gpu_prev_gray = cv2.cuda_GpuMat()
gpu_frame_diff = cv2.cuda_GpuMat()
print(f"[{get_timestamp()}] Pre-allocated GPU matrices for better memory efficiency")
except Exception as e:
print(f"[{get_timestamp()}] Failed to pre-allocate GPU matrices: {e}")
CUDA_AVAILABLE = False
prev_gray = None
roi = None
selecting_roi = False
# Variables for capture failure handling
consecutive_failures = 0
max_consecutive_failures = 5
retry_delay = 1 # Initial retry delay in seconds
max_retry_delay = 10 # Maximum retry delay in seconds
last_successful_capture_time = time.time()
auto_restart_interval = 30 * 60 # Auto restart every 30 minutes (in seconds)
print("Stiskni 'q' pro ukončení, 'r' pro výběr oblasti (ROI).")
while True:
# Increment frame counter
frame_count += 1
# Perform a full frame scan periodically to ensure we don't miss any motion areas
current_time = time.time()
if current_time - last_full_scan_time > 10 or frame_count % full_scan_interval == 0:
print(f"[{get_timestamp()}] Performing full frame scan (frame {frame_count})")
# Clear motion area history to force a fresh scan
if len(motion_area_history) > 0:
# Save the most active area before clearing
most_active_area = None
most_active_count = 0
for area_key, (count, area_rect) in motion_area_history.items():
if count > most_active_count:
most_active_count = count
most_active_area = area_rect
# Clear the history but keep the most active area with a reduced count
motion_area_history.clear()
if most_active_area is not None:
# Create a key for this area
x, y, w, h = most_active_area
center_x, center_y = x + w//2, y + h//2
size_key = max(w, h) // 10
area_key = (center_x // 20, center_y // 20, size_key)
# Add it back with a reduced count to give it some history but allow new areas to take over
motion_area_history[area_key] = (max(1, most_active_count // 2), most_active_area)
print(f"[{get_timestamp()}] Kept most active area with reduced count: {max(1, most_active_count // 2)}")
# Reset tracking variables to force a fresh scan
current_view_center = None
last_full_scan_time = current_time
print(f"[{get_timestamp()}] Full frame scan complete")
# Get trackbar values
motion_threshold = cv2.getTrackbarPos('Threshold', "Controls")
min_contour_area = cv2.getTrackbarPos('Min Area', "Controls")
blur_size = cv2.getTrackbarPos('Blur', "Controls")
delay_ms = cv2.getTrackbarPos('Delay (ms)', "Controls")
zoom_count_threshold = cv2.getTrackbarPos('Zoom Threshold', "Controls")
zoom_scale_raw = cv2.getTrackbarPos('Zoom Scale', "Controls")
area_expansion_raw = cv2.getTrackbarPos('Area Expansion', "Controls")
movement_speed_raw = cv2.getTrackbarPos('Movement Speed', "Controls")
# Use a gentler formula for zoom scale: starts at 1.1 for value 1, increases more gradually
zoom_scale = 1.0 + (zoom_scale_raw * 0.05) # Gives values from 1.05 to 2.5 for trackbar range 1-30
# Convert area expansion from trackbar value (10-50) to actual multiplier (1.0-5.0)
area_expansion = max(1.0, area_expansion_raw / 10.0)
# Convert movement speed from trackbar value (0-100) to a factor (0.01-1.0)
view_movement_speed = max(0.01, movement_speed_raw / 100.0)
if delay_ms < 10:
delay_ms = 10
if blur_size % 2 == 0:
blur_size += 1
if blur_size < 3:
blur_size = 3
# Process UI events to keep controls responsive
cv2.waitKey(1)
# Check if we need to auto-restart based on time
current_time = time.time()
if current_time - last_successful_capture_time > auto_restart_interval:
print(f"[{get_timestamp()}] Automatický restart zachytávání po {auto_restart_interval/60:.1f} minutách...")
# Try to select the same window again
try:
if win32gui.IsWindow(hwnd):
# Bring window to foreground
win32gui.SetForegroundWindow(hwnd)
time.sleep(0.5)
print(f"[{get_timestamp()}] Obnovuji zachytávání okna: {window_title}")
last_successful_capture_time = current_time
consecutive_failures = 0
else:
print(f"[{get_timestamp()}] Okno již neexistuje, vybírám nové...")
hwnd = select_window()
if hwnd is None:
print(f"[{get_timestamp()}] Okno nebylo vybráno. Konec.")
break
try:
window_title = win32gui.GetWindowText(hwnd)
print(f"[{get_timestamp()}] Přepínám na okno: {window_title}")
last_successful_capture_time = current_time
consecutive_failures = 0
except Exception as e:
print(f"[{get_timestamp()}] Chyba při získávání názvu okna: {e}")
window_title = "Neznámé okno"
except Exception as e:
print(f"[{get_timestamp()}] Chyba při automatickém restartu: {e}")
# Process UI events again before potentially time-consuming operation
cv2.waitKey(1)
screenshot = capture_window(hwnd)
if screenshot is None:
consecutive_failures += 1
print(f"[{get_timestamp()}] Chyba: nelze získat snímek okna. Pokus {consecutive_failures}/{max_consecutive_failures}")
frame = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.putText(frame, "Nelze zachytit snímek okna.", (50, 200),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
# Show retry information
if consecutive_failures < max_consecutive_failures:
cv2.putText(frame, f"Automatický pokus o zachycení za {retry_delay} sekund...", (50, 230),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 1)
else:
cv2.putText(frame, "Automatický výběr nového okna...", (50, 230),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 1)
cv2.putText(frame, "Zkuste:", (50, 260),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1)
cv2.putText(frame, "1. Ujistit se, že okno není minimalizované", (50, 290),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(frame, "2. Vybrat jiné okno", (50, 320),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(frame, "3. Restartovat aplikaci", (50, 350),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(frame, "Stiskni 'q' pro ukončení nebo 'r' pro výběr jiného okna", (50, 390),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 1)
cv2.imshow("Motion Detection", frame)
# Wait for user input or timeout
try:
# Ensure delay is an integer
wait_time = int(retry_delay * 1000)
key = cv2.waitKey(wait_time) & 0xFF
except (ValueError, TypeError) as e:
print(f"[{get_timestamp()}] Chyba při čekání: {e}. Použití výchozí hodnoty 1000ms.")
key = cv2.waitKey(1000) & 0xFF
if key == ord('q'):
break
elif key == ord('r'):
hwnd = select_window()
if hwnd is None:
print(f"[{get_timestamp()}] Okno nebylo vybráno. Konec.")
break
try:
window_title = win32gui.GetWindowText(hwnd)
print(f"[{get_timestamp()}] Přepínám na okno: {window_title}")
consecutive_failures = 0
except Exception as e:
print(f"[{get_timestamp()}] Chyba při získávání názvu okna: {e}")
window_title = "Neznámé okno"
elif consecutive_failures >= max_consecutive_failures:
# Auto-select new window after max failures
print(f"[{get_timestamp()}] Dosažen maximální počet neúspěšných pokusů. Automatický výběr nového okna.")
hwnd = select_window()
if hwnd is None:
print(f"[{get_timestamp()}] Okno nebylo vybráno. Konec.")
break
try:
window_title = win32gui.GetWindowText(hwnd)
print(f"[{get_timestamp()}] Přepínám na okno: {window_title}")
consecutive_failures = 0
except Exception as e:
print(f"[{get_timestamp()}] Chyba při získávání názvu okna: {e}")
window_title = "Neznámé okno"
else:
# Increase retry delay for next attempt (with a maximum)
retry_delay = min(retry_delay * 1.5, max_retry_delay)
print(f"[{get_timestamp()}] Zvyšuji prodlevu mezi pokusy na {retry_delay:.1f} sekund")
continue
else:
# Reset counters on successful capture
if consecutive_failures > 0:
print(f"[{get_timestamp()}] Zachytávání obnoveno po {consecutive_failures} neúspěšných pokusech")
consecutive_failures = 0
retry_delay = 1 # Reset to initial delay
last_successful_capture_time = time.time()
if screenshot.mode != 'RGB':
screenshot = screenshot.convert('RGB')
# Process UI events to keep controls responsive
cv2.waitKey(1)
frame = np.array(screenshot)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# Memory optimization: Resize large frames to reduce memory usage
max_width = cv2.getTrackbarPos('Max Width', "Controls")
if max_width < 100: # Ensure reasonable minimum
max_width = 1280
current_width = frame.shape[1]
if current_width > max_width:
# Calculate new height to maintain aspect ratio
scale_factor = max_width / current_width
new_height = int(frame.shape[0] * scale_factor)
# Resize the frame
frame = cv2.resize(frame, (max_width, new_height), interpolation=cv2.INTER_AREA)
print(f"[{get_timestamp()}] Resized frame from {current_width}x{frame.shape[0]} to {max_width}x{new_height} to reduce memory usage")
# We don't need to keep a full copy of the original frame anymore
# orig_frame = frame.copy() # This was using a lot of memory
if selecting_roi:
roi = cv2.selectROI("Motion Detection", frame, fromCenter=False, showCrosshair=True)
selecting_roi = False
if roi == (0, 0, 0, 0):
roi = None
if roi is not None:
x, y, w, h = roi
roi_frame = frame[y:y+h, x:x+w]
cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2)
process_frame = roi_frame
else:
process_frame = frame
# Process image with GPU if available, otherwise use CPU
update_activity() # Update activity timestamp before processing
# Process UI events to keep controls responsive
cv2.waitKey(1)
# Define timeout for GPU operations
GPU_OPERATION_TIMEOUT = 3 # seconds
if CUDA_AVAILABLE:
try:
# Memory optimization: Reuse pre-allocated GPU matrices
# Wrap GPU operations in timeout to prevent hanging
@timeout(GPU_OPERATION_TIMEOUT, default=False)
def gpu_upload_frame():
gpu_frame.upload(process_frame)
return True
@timeout(GPU_OPERATION_TIMEOUT, default=None)
def gpu_convert_to_gray():
cv2.cuda.cvtColor(gpu_frame, cv2.COLOR_BGR2GRAY, gpu_gray)
return True
@timeout(GPU_OPERATION_TIMEOUT, default=None)
def gpu_apply_blur():
cv2.cuda.GaussianBlur(gpu_gray, (blur_size, blur_size), 0, gpu_gray)
return True
@timeout(GPU_OPERATION_TIMEOUT, default=None)
def gpu_download_gray():
return gpu_gray.download()
# Execute GPU operations with timeout protection
if not gpu_upload_frame():
raise Exception("GPU frame upload timed out")
if not gpu_convert_to_gray():
raise Exception("GPU grayscale conversion timed out")
if not gpu_apply_blur():
raise Exception("GPU blur operation timed out")
gray = gpu_download_gray()
if gray is None:
raise Exception("GPU gray download timed out")
update_activity() # Update activity after potentially slow operations
if prev_gray is None or prev_gray.shape != gray.shape:
prev_gray = gray.copy() # Need to copy here to avoid modifying the reference
cv2.imshow("Motion Detection", frame)
try:
if cv2.waitKey(int(delay_ms)) & 0xFF == ord('q'):
break
except (ValueError, TypeError) as e:
print(f"[{get_timestamp()}] Chyba při čekání: {e}. Použití výchozí hodnoty 10ms.")
if cv2.waitKey(10) & 0xFF == ord('q'):
break
continue
# Upload previous frame to GPU (reuse pre-allocated matrix)
@timeout(GPU_OPERATION_TIMEOUT, default=False)
def gpu_upload_prev_gray():
gpu_prev_gray.upload(prev_gray)
return True
@timeout(GPU_OPERATION_TIMEOUT, default=False)
def gpu_compute_diff():
cv2.cuda.absdiff(gpu_prev_gray, gpu_gray, gpu_frame_diff)
return True
@timeout(GPU_OPERATION_TIMEOUT, default=None)
def gpu_download_diff():
return gpu_frame_diff.download()
# Execute GPU operations with timeout protection
if not gpu_upload_prev_gray():
raise Exception("GPU prev_gray upload timed out")
if not gpu_compute_diff():
raise Exception("GPU absdiff operation timed out")
frame_diff = gpu_download_diff()
if frame_diff is None:
raise Exception("GPU frame_diff download timed out")
update_activity() # Update activity after potentially slow operations
# Try to use GPU for threshold and dilate if available
try:
@timeout(GPU_OPERATION_TIMEOUT, default=None)
def gpu_threshold_and_dilate():
# GPU thresholding
gpu_thresh = cv2.cuda.threshold(gpu_frame_diff, motion_threshold, 255, cv2.THRESH_BINARY)[1]
# GPU dilation (if available)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
gpu_thresh = cv2.cuda.dilate(gpu_thresh, kernel, iterations=2)
# Download result
return gpu_thresh.download()
thresh = gpu_threshold_and_dilate()
if thresh is None:
raise Exception("GPU threshold and dilate timed out")
except Exception as e:
# Fall back to CPU for these operations if GPU version fails
print(f"[{get_timestamp()}] GPU threshold/dilate failed: {e}. Using CPU instead.")
thresh = cv2.threshold(frame_diff, motion_threshold, 255, cv2.THRESH_BINARY)[1]
thresh = cv2.dilate(thresh, None, iterations=2)
# Find contours (CPU operation - no efficient GPU implementation in OpenCV)
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
update_activity() # Update activity after contour finding
except Exception as e:
print(f"[{get_timestamp()}] GPU processing error: {e}. Falling back to CPU.")
# Fall back to CPU processing
gray = cv2.cvtColor(process_frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (blur_size, blur_size), 0)
update_activity() # Update activity after CPU processing
if prev_gray is None or prev_gray.shape != gray.shape:
prev_gray = gray
cv2.imshow("Motion Detection", frame)
try:
if cv2.waitKey(int(delay_ms)) & 0xFF == ord('q'):
break
except (ValueError, TypeError) as e:
print(f"[{get_timestamp()}] Chyba při čekání: {e}. Použití výchozí hodnoty 10ms.")
if cv2.waitKey(10) & 0xFF == ord('q'):
break
continue
frame_diff = cv2.absdiff(prev_gray, gray)
thresh = cv2.threshold(frame_diff, motion_threshold, 255, cv2.THRESH_BINARY)[1]
thresh = cv2.dilate(thresh, None, iterations=2)
contours, _ = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
else:
# CPU processing
gray = cv2.cvtColor(process_frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (blur_size, blur_size), 0)
update_activity() # Update activity after CPU processing
if prev_gray is None or prev_gray.shape != gray.shape:
prev_gray = gray
cv2.imshow("Motion Detection", frame)
try:
if cv2.waitKey(int(delay_ms)) & 0xFF == ord('q'):
break
except (ValueError, TypeError) as e:
print(f"[{get_timestamp()}] Chyba při čekání: {e}. Použití výchozí hodnoty 10ms.")
if cv2.waitKey(10) & 0xFF == ord('q'):
break
continue
frame_diff = cv2.absdiff(prev_gray, gray)
thresh = cv2.threshold(frame_diff, motion_threshold, 255, cv2.THRESH_BINARY)[1]
thresh = cv2.dilate(thresh, None, iterations=2)
contours, _ = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# Process UI events to keep controls responsive
cv2.waitKey(1)
motion_detected = False
motion_areas = []
for contour in contours:
if cv2.contourArea(contour) < min_contour_area:
continue
motion_detected = True
(x_c, y_c, w_c, h_c) = cv2.boundingRect(contour)
area = cv2.contourArea(contour)
motion_areas.append((x_c, y_c, w_c, h_c, area))
if roi is not None:
x_c += roi[0]
y_c += roi[1]
cv2.rectangle(frame, (x_c, y_c), (x_c + w_c, y_c + h_c), (0, 255, 0), 2)
if motion_detected:
cv2.putText(frame, "Motion Detected", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
# Log motion detection information
print(f"[{get_timestamp()}] Pohyb detekován: {len(motion_areas)} oblastí")
# Auto-zoom feature: Update motion area history
for x, y, w, h, area in motion_areas:
# Create a key for this area (using center point and approximate size)
center_x, center_y = x + w//2, y + h//2
size_key = max(w, h) // 10 # Group similar sized areas
area_key = (center_x // 20, center_y // 20, size_key) # Discretize to allow for small movements
# Update count for this area
if area_key in motion_area_history:
motion_area_history[area_key] = (motion_area_history[area_key][0] + 1, (x, y, w, h))
else:
motion_area_history[area_key] = (1, (x, y, w, h))
# Decrease cooldown counter if it's active - moved outside motion_detected block
if zoom_cooldown > 0:
zoom_cooldown -= 1
# Find area with most detections - moved outside motion_detected block
most_detections = 0
most_detected_area = None
most_detected_area_key = None
for area_key, (count, area_rect) in motion_area_history.items():
if count > most_detections:
most_detections = count
most_detected_area = area_rect
most_detected_area_key = area_key
# Increment force update counter
force_update_counter += 1
# Force an update of the target position periodically, even if no motion is detected
# This helps prevent the zoomed view from getting stuck
if force_update_counter >= 10: # Force update every 10 frames
force_update_counter = 0
if most_detected_area is not None:
# Force update of target position
x, y, w, h = most_detected_area
center_x, center_y = x + w//2, y + h//2
target_view_center = (center_x, center_y)
last_target_update_time = time.time()
print(f"[{get_timestamp()}] FORCED UPDATE: Target position set to {target_view_center} (periodic force update)")
# If we haven't moved in a while, force a reset of the current view center
current_time = time.time()
if current_view_center is not None and current_time - last_movement_time > movement_timeout:
current_view_center = None
print(f"[{get_timestamp()}] FORCED RESET: View center reset due to lack of movement during forced update")
# Always update the current zoom area to the most active area, regardless of zoom status
if most_detected_area is not None:
# If we're already zoomed, check if we should change the zoom area
if zoom_active:
# Always change to the area with the most detections, even if it's not significantly different
if current_zoom_area is not None:
cx, cy, cw, ch = current_zoom_area
nx, ny, nw, nh = most_detected_area
# Calculate centers
current_center = (cx + cw//2, cy + ch//2)
new_center = (nx + nw//2, ny + nh//2)
# Calculate distance between centers
distance = ((current_center[0] - new_center[0])**2 +
(current_center[1] - new_center[1])**2)**0.5
# Check if the most active area has changed significantly
area_changed = (nx != cx or ny != cy or nw != cw or nh != ch)
# Always update the current zoom area to follow the most active area
current_zoom_area = most_detected_area
# Always update the target position to the center of the most active area
# This ensures continuous tracking even if the zoomed view is not being rendered
x, y, w, h = most_detected_area
center_x, center_y = x + w//2, y + h//2
target_view_center = (center_x, center_y)
last_target_update_time = time.time() # Update the last target update time
print(f"[{get_timestamp()}] Updating target position to {target_view_center} (zoom active)")
# Only reset cooldown and log if the area has changed significantly
if area_changed:
print(f"[{get_timestamp()}] Změna přiblížení na novou oblast s nejvíce detekcemi ({most_detections} detekcí)")
if zoom_cooldown == 0:
zoom_cooldown = zoom_cooldown_threshold
else:
# Check if we should activate zoom
if most_detections >= zoom_count_threshold:
# Activate zoom
zoom_active = True
current_zoom_area = most_detected_area
# Initialize current_view_center if it's the first time zoom is activated
# This will be set to the target position in the zoomed view calculation
zoom_cooldown = zoom_cooldown_threshold
print(f"[{get_timestamp()}] Aktivace přiblížení na oblast s nejvíce detekcemi")
else:
# Update current_zoom_area even if zoom is not active
# This ensures that when zoom is activated, it will use the most recent active area
current_zoom_area = most_detected_area
# Update target position even if zoom is not active
# This ensures continuous tracking of the most active area
x, y, w, h = most_detected_area
center_x, center_y = x + w//2, y + h//2
target_view_center = (center_x, center_y)
last_target_update_time = time.time() # Update the last target update time
print(f"[{get_timestamp()}] Updating target position to {target_view_center} (zoom inactive)")
cv2.putText(frame, f"Threshold: {motion_threshold}", (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)
cv2.putText(frame, f"Min Area: {min_contour_area}", (10, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)
cv2.putText(frame, f"Blur: {blur_size}", (10, 100),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)
cv2.putText(frame, f"Monitoring: {window_title}", (10, 120),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1)
# Add instructions on two lines to avoid text going off screen
cv2.putText(frame, "Stiskni 'q' pro ukončení, 'r' pro ROI",
(10, frame.shape[0] - 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)
cv2.putText(frame, "'z' pro přepnutí přiblížení, 'c' pro vymazání historie pohybu",
(10, frame.shape[0] - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)
# Process UI events to keep controls responsive
cv2.waitKey(1)
# Create a control panel image to display current values
control_panel = np.zeros((480, 400, 3), dtype=np.uint8)
cv2.putText(control_panel, "Control Panel", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(control_panel, f"Threshold: {motion_threshold}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(control_panel, f"Min Area: {min_contour_area}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(control_panel, f"Blur: {blur_size}", (10, 130), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(control_panel, f"Delay (ms): {delay_ms}", (10, 160), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(control_panel, f"Max Width: {max_width}", (10, 190), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# Add zoom settings to control panel
cv2.putText(control_panel, "Zoom Settings:", (10, 230), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 1)
cv2.putText(control_panel, f"Zoom Threshold: {zoom_count_threshold}", (10, 260), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(control_panel, f"Zoom Scale: {zoom_scale:.1f}x", (10, 290), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(control_panel, f"Area Expansion: {area_expansion:.1f}x", (10, 320), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
cv2.putText(control_panel, f"Movement Speed: {view_movement_speed:.2f}", (10, 350), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# Show zoom status
zoom_status = "ACTIVE" if zoom_active else "Inactive"
zoom_status_color = (0, 255, 0) if zoom_active else (100, 100, 100)
cv2.putText(control_panel, f"Zoom Status: {zoom_status}", (10, 380), cv2.FONT_HERSHEY_SIMPLEX, 0.6, zoom_status_color, 1)
# Show most active area information - using values already calculated above
if most_detected_area_key is not None:
cv2.putText(control_panel, f"Most Active Area: {most_detections} detections",
(10, 410), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 1)
# Add a horizontal line to separate information from instructions
cv2.line(control_panel, (10, 420), (390, 420), (100, 100, 100), 1)
cv2.putText(control_panel, "Adjust sliders to change values", (10, 440), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)
cv2.putText(control_panel, "Changes apply immediately", (10, 460), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)
else:
cv2.putText(control_panel, "No motion areas detected yet",
(10, 410), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (100, 100, 100), 1)
# Add a horizontal line to separate information from instructions
cv2.line(control_panel, (10, 420), (390, 420), (100, 100, 100), 1)
cv2.putText(control_panel, "Adjust sliders to change values", (10, 440), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)
cv2.putText(control_panel, "Changes apply immediately", (10, 460), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)
# Apply zoom if active
display_frame = frame.copy()
# Show the most active area with a special highlight, even if zoom is not active
if most_detected_area_key is not None:
# Get the area rectangle for the most active area
for area_key, (count, area_rect) in motion_area_history.items():
if area_key == most_detected_area_key:
mx, my, mw, mh = area_rect
# Draw a special rectangle around the most active area (dashed line in cyan)
# Create a dashed line effect
for i in range(0, mw, 5):
cv2.line(display_frame, (mx + i, my), (mx + min(i + 2, mw), my), (255, 255, 0), 2)
cv2.line(display_frame, (mx + i, my + mh), (mx + min(i + 2, mw), my + mh), (255, 255, 0), 2)
for i in range(0, mh, 5):
cv2.line(display_frame, (mx, my + i), (mx, my + min(i + 2, mh)), (255, 255, 0), 2)
cv2.line(display_frame, (mx + mw, my + i), (mx + mw, my + min(i + 2, mh)), (255, 255, 0), 2)
# Add text to indicate this is the most active area
cv2.putText(display_frame, f"Most Active: {count}", (mx, my - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 1)
break
if zoom_active and current_zoom_area is not None:
x, y, w, h = current_zoom_area
# Ensure the zoom area is within frame boundaries
x = max(0, x)
y = max(0, y)
w = min(w, frame.shape[1] - x)
h = min(h, frame.shape[0] - y)
# Extract the region of interest
zoom_region = frame[y:y+h, x:x+w]
if zoom_region.size > 0: # Check if region is valid
# Calculate new dimensions maintaining aspect ratio
orig_height, orig_width = frame.shape[:2]
zoom_height, zoom_width = zoom_region.shape[:2]
# Resize the zoomed region to fit the original frame size
try:
# Initialize zoomed variable to avoid "cannot access local variable" error
zoomed = None
# Calculate dimensions for the zoomed view based on zoom_scale
# First, calculate the center of the region
center_x, center_y = x + w//2, y + h//2
# Expand the area around the detection point before zooming
# Use the area_expansion multiplier from the trackbar
expanded_w = int(w * area_expansion)
expanded_h = int(h * area_expansion)
# Calculate new dimensions for the zoomed area
# The higher the zoom_scale, the smaller the area we take (more zoom)
# But we're starting with a larger area due to expansion
new_w = int(expanded_w / zoom_scale)
new_h = int(expanded_h / zoom_scale)
# Set the target center to the center of the most active area
# Always update the target position, even if it hasn't changed
# Note: This is now redundant as we update the target position earlier,
# but we'll keep it for consistency and as a fallback
target_view_center = (center_x, center_y)
last_target_update_time = time.time() # Update the last target update time
# Log the target position update for debugging
print(f"[{get_timestamp()}] Updating target position to {target_view_center} (in zoomed view)")
# Log the current position for debugging
if current_view_center is not None:
print(f"[{get_timestamp()}] Current position: {current_view_center}")
# Check if the target position hasn't been updated for too long
current_time = time.time()
if target_view_center is not None and current_time - last_target_update_time > target_update_timeout:
print(f"[{get_timestamp()}] WARNING: Target position hasn't been updated for {target_update_timeout} seconds")
# Force a target update using the most active area if available
if most_detected_area is not None:
x, y, w, h = most_detected_area
center_x, center_y = x + w//2, y + h//2
target_view_center = (center_x, center_y)
last_target_update_time = current_time
print(f"[{get_timestamp()}] Forced target position update to {target_view_center}")
else:
# If no active area is available, scan the entire frame for any motion
print(f"[{get_timestamp()}] CRITICAL: No active area available for target update")
# Force a reset of the current view center to trigger a full scan
current_view_center = None
print(f"[{get_timestamp()}] Forced view center reset to trigger full frame scan")
# Check if there's been no movement for too long
if current_view_center is not None and target_view_center is not None:
if current_time - last_movement_time > movement_timeout:
print(f"[{get_timestamp()}] WARNING: No significant movement for {movement_timeout} seconds")
# Force a reset of the current view center
current_view_center = None
last_movement_time = current_time
consecutive_resets += 1
if consecutive_resets >= max_consecutive_resets:
print(f"[{get_timestamp()}] CRITICAL WARNING: Zoomed view has been reset {consecutive_resets} times in a row!")
print(f"[{get_timestamp()}] This indicates a persistent tracking issue that may cause the zoomed view to stop moving.")
# Take more drastic action after multiple resets
# Force zoom to deactivate and reactivate
if zoom_active:
print(f"[{get_timestamp()}] EMERGENCY RECOVERY: Toggling zoom off and on")
zoom_active = False
# Wait a few frames
for _ in range(5):
cv2.waitKey(1)
zoom_active = True
consecutive_resets = 0
print(f"[{get_timestamp()}] Forced view center reset due to lack of movement (reset count: {consecutive_resets})")
# Check if it's time for a periodic reset to prevent getting stuck
if current_time - last_view_reset_time > view_reset_interval:
print(f"[{get_timestamp()}] Periodic view center reset to prevent getting stuck")
current_view_center = None
last_view_reset_time = current_time
# Don't increment consecutive_resets for periodic resets
print(f"[{get_timestamp()}] This is a scheduled reset, not due to tracking issues")
# Unpack target_view_center to get target_x and target_y before using them
target_x, target_y = target_view_center
# Initialize new_current_x and new_current_y with current values to avoid "cannot access local variable" error
if current_view_center is not None:
current_x, current_y = current_view_center
new_current_x, new_current_y = current_x, current_y
else:
# If current_view_center is None, use target values
new_current_x, new_current_y = target_x, target_y
# Initialize new_x and new_y with default values to avoid "cannot access local variable" error
# These will be properly set later based on conditions
new_x = 0
new_y = 0
# Initialize current_view_center if it's None
if current_view_center is None:
current_view_center = target_view_center
last_movement_time = current_time # Reset the movement timer
print(f"[{get_timestamp()}] Initialized view center to {current_view_center}")
# Implement smooth movement towards the target
# Calculate new current center by moving towards target
current_x, current_y = current_view_center
# Move current center towards target center with smooth interpolation
# Calculate distance to target
distance_x = target_x - current_x
distance_y = target_y - current_y
total_distance = (distance_x**2 + distance_y**2)**0.5
# Use a dynamic speed that increases as we get closer to the target
# This creates a more natural, accelerating movement
if total_distance > 0:
# Base speed from trackbar - use a higher value for more responsive tracking
# Add a minimum speed to ensure movement always happens
speed = max(0.1, view_movement_speed)
# Add a boost factor for large distances to make movement more responsive
if total_distance > 50:
speed = min(1.0, speed * 1.5) # Boost speed for large distances, but cap at 1.0
print(f"[{get_timestamp()}] Speed boosted to {speed:.2f} for large distance {total_distance:.2f}")
# Accelerate transitions when moving to an area with more detections
# Find detection count for current area
current_area_count = 0
if current_zoom_area is not None:
cx, cy, cw, ch = current_zoom_area
current_center_x, current_center_y = cx + cw//2, cy + ch//2
current_size_key = max(cw, ch) // 10
current_area_key = (current_center_x // 20, current_center_y // 20, current_size_key)
if current_area_key in motion_area_history:
current_area_count = motion_area_history[current_area_key][0]
# If target area has more detections than current area, boost speed
if most_detections > current_area_count:
# Calculate boost factor based on the ratio of detections
detection_ratio = min(5.0, most_detections / max(1, current_area_count))
speed_boost = min(3.0, 1.0 + (detection_ratio - 1) * 0.5) # Increased multiplier and max boost
# Allow speed to go higher than 1.0 when transitioning to areas with more detections
# This enables faster transitions to important areas
speed = min(2.0, speed * speed_boost) # Higher cap for areas with more detections
print(f"[{get_timestamp()}] Speed boosted to {speed:.2f} for area with more detections (current: {current_area_count}, target: {most_detections})")
# Apply the speed to the current position
new_current_x = current_x + distance_x * speed
new_current_y = current_y + distance_y * speed
# Check if we've moved a significant distance
movement_distance = ((new_current_x - current_x)**2 + (new_current_y - current_y)**2)**0.5
if movement_distance > 0.5: # Lower threshold to 0.5 pixels to detect smaller movements
last_movement_time = current_time # Update the last movement time
if consecutive_resets > 0:
consecutive_resets = 0 # Reset the counter when significant movement is detected
print(f"[{get_timestamp()}] Tracking resumed: significant movement detected after resets")
# Only log significant movements to avoid log spam
if movement_distance > 2.0:
print(f"[{get_timestamp()}] Significant movement detected: {movement_distance:.2f} pixels")
# If we're very close to the target, just snap to it
if abs(distance_x) < 0.5 and abs(distance_y) < 0.5: # Lower threshold to 0.5 pixels
new_current_x = target_x
new_current_y = target_y
print(f"[{get_timestamp()}] Snapped to target position {target_view_center} (was very close)")
else:
# Already at target
new_current_x = target_x
new_current_y = target_y
# Even if we're at the target, log this occasionally to confirm we're still tracking
# Use modulo on frame count to avoid log spam
if frame_count % 30 == 0: # Log every 30 frames
print(f"[{get_timestamp()}] Still at target position {target_view_center}")
# Check if the target hasn't been updated for too long
if current_time - last_target_update_time > target_update_timeout:
print(f"[{get_timestamp()}] WARNING: At target but target hasn't been updated for {target_update_timeout} seconds")
# Force a target update using the most active area if available
if most_detected_area is not None:
x, y, w, h = most_detected_area
center_x, center_y = x + w//2, y + h//2
# Add a small random offset to force movement even if the target is the same
# This helps prevent getting stuck in the same position
center_x += np.random.uniform(-5, 5)
center_y += np.random.uniform(-5, 5)
target_view_center = (center_x, center_y)
last_target_update_time = current_time
print(f"[{get_timestamp()}] Forced target position update to {target_view_center} with random offset while at target")
# Update current view center
current_view_center = (new_current_x, new_current_y)
# Log the updated position for debugging
print(f"[{get_timestamp()}] Updated position to {current_view_center} (distance: {total_distance:.2f}, speed: {view_movement_speed:.2f})")
# Calculate new top-left corner based on current view center
ideal_x = int(new_current_x - new_w//2)
ideal_y = int(new_current_y - new_h//2)
# Check if this would go out of bounds
if ideal_x < 0:
# If we would go out of bounds on the left, adjust to 0
new_x = 0
elif ideal_x + new_w > frame.shape[1]:
# If we would go out of bounds on the right, adjust to stay within frame
new_x = frame.shape[1] - new_w
else:
# If we're within bounds, use the ideal position
new_x = ideal_x
if ideal_y < 0:
# If we would go out of bounds on the top, adjust to 0
new_y = 0
elif ideal_y + new_h > frame.shape[0]:
# If we would go out of bounds on the bottom, adjust to stay within frame
new_y = frame.shape[0] - new_h
else:
# If we're within bounds, use the ideal position
new_y = ideal_y
# Extract the zoomed region
zoom_region = frame[new_y:new_y+new_h, new_x:new_x+new_w]
# Resize to original dimensions
zoomed = cv2.resize(zoom_region, (orig_width, orig_height), interpolation=cv2.INTER_LINEAR)
# Add visual indicators in the zoomed view to show tracking status
tracking_time = int(current_time - last_movement_time)
tracking_color = (0, 255, 255) # Default yellow
if tracking_time > 3:
tracking_color = (0, 165, 255) # Orange if no movement for 3+ seconds
if tracking_time > 5:
tracking_color = (0, 0, 255) # Red if no movement for 5+ seconds
cv2.putText(zoomed, f"Tracking: {tracking_time}s", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, tracking_color, 1)
# Add target position indicator
cv2.putText(zoomed, f"Target: {int(target_x)},{int(target_y)}", (10, 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)
# Add current position indicator
cv2.putText(zoomed, f"Current: {int(new_current_x)},{int(new_current_y)}", (10, 70),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)
# Draw a small crosshair at the target position if it's within the visible area
target_visible_x = int(target_x - new_x)
target_visible_y = int(target_y - new_y)
if 0 <= target_visible_x < zoomed.shape[1] and 0 <= target_visible_y < zoomed.shape[0]:
# Draw crosshair
cv2.line(zoomed, (target_visible_x - 10, target_visible_y), (target_visible_x + 10, target_visible_y), (0, 255, 255), 1)
cv2.line(zoomed, (target_visible_x, target_visible_y - 10), (target_visible_x, target_visible_y + 10), (0, 255, 255), 1)
cv2.circle(zoomed, (target_visible_x, target_visible_y), 5, (0, 255, 255), 1)
# Add visual indicators for zoom mode
cv2.putText(zoomed, "ZOOM ACTIVE", (10, orig_height - 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
# Show the area being zoomed in the original frame
cv2.rectangle(display_frame, (x, y), (x + w, y + h), (0, 255, 255), 2)
cv2.putText(display_frame, "ZOOMED AREA", (x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)
# Display both the original frame with highlighted zoom area and the zoomed view
cv2.imshow("Motion Detection", display_frame)
cv2.imshow("Zoomed View", zoomed)
except Exception as e:
print(f"[{get_timestamp()}] Error creating zoomed view: {e}")
zoom_active = False
cv2.imshow("Motion Detection", display_frame)
else:
print(f"[{get_timestamp()}] Invalid zoom region")
zoom_active = False
cv2.imshow("Motion Detection", display_frame)
else:
# If zoom is not active, just show the regular frame
try:
# Check if the Zoomed View window exists and destroy it if it does
if cv2.getWindowProperty("Zoomed View", cv2.WND_PROP_VISIBLE) >= 0:
cv2.destroyWindow("Zoomed View")
except:
pass # Window doesn't exist, which is fine
cv2.imshow("Motion Detection", display_frame)
cv2.imshow("Controls", control_panel)
prev_gray = gray
try:
key = cv2.waitKey(int(delay_ms)) & 0xFF
except (ValueError, TypeError) as e:
print(f"[{get_timestamp()}] Chyba při čekání: {e}. Použití výchozí hodnoty 10ms.")
key = cv2.waitKey(10) & 0xFF
if key == ord('q'):
break
elif key == ord('r'):
selecting_roi = True
elif key == ord('z'):
# Toggle zoom on/off
zoom_active = not zoom_active
if zoom_active:
# Reset current_view_center to force immediate jump to the area when manually activating
current_view_center = None
print(f"[{get_timestamp()}] Zoom manually activated")
else:
print(f"[{get_timestamp()}] Zoom manually deactivated")
# Reset current_view_center when zoom is deactivated
current_view_center = None
try:
# Try to close the zoomed view window if it exists
if cv2.getWindowProperty("Zoomed View", cv2.WND_PROP_VISIBLE) >= 0:
cv2.destroyWindow("Zoomed View")
except:
pass
elif key == ord('c'):
# Clear motion area history
motion_area_history.clear()
current_zoom_area = None
zoom_active = False
# Reset current_view_center when history is cleared
current_view_center = None
print(f"[{get_timestamp()}] Motion area history cleared")
# Clean up GPU resources if they were allocated
if CUDA_AVAILABLE:
try:
# Release GPU matrices to free up memory
if gpu_frame is not None:
gpu_frame.release()
if gpu_gray is not None:
gpu_gray.release()
if gpu_prev_gray is not None:
gpu_prev_gray.release()
if gpu_frame_diff is not None:
gpu_frame_diff.release()
print(f"[{get_timestamp()}] Released GPU resources")
except Exception as e:
print(f"[{get_timestamp()}] Error releasing GPU resources: {e}")
cv2.destroyAllWindows()
print(f"[{get_timestamp()}] Motion detection stopped.")
def main_loop():
"""
Main application loop with error handling and automatic restart
"""
global program_alive
# Initialize restart counter
restart_count = 0
max_restarts = 5
# Memory optimization: Force garbage collection
try:
import gc
gc.enable()
except ImportError:
print(f"[{get_timestamp()}] Warning: Garbage collection module not available")
# Main loop with restart capability
while program_alive and restart_count <= max_restarts:
try:
# Reset activity timestamp at the start of each iteration
update_activity()
# Try to find the window
window_name = "Centrum monitorování" # Změňte dle názvu svého okna!
hwnd = find_window_by_name(window_name)
if hwnd is None:
print(f"[{get_timestamp()}] Okno '{window_name}' nebylo nalezeno.")
print(f"[{get_timestamp()}] Vyber okno ručně.")
hwnd = select_window()
if hwnd is None:
print(f"[{get_timestamp()}] Okno nebylo vybráno. Konec.")
break
print(f"[{get_timestamp()}] Detekce pohybu poběží pouze v okně: {window_name}")
# Process screen motion (this is the main processing function)
process_screen_motion(hwnd)
# If we get here normally (not through an exception), break the loop
print(f"[{get_timestamp()}] Aplikace ukončena normálně.")
break
except Exception as e:
# Handle critical errors
print(f"[{get_timestamp()}] Kritická chyba v aplikaci: {e}")
# Try to clean up resources
try:
cv2.destroyAllWindows()
print(f"[{get_timestamp()}] Uvolněny prostředky OpenCV")
except Exception as cleanup_error:
print(f"[{get_timestamp()}] Chyba při uvolňování prostředků: {cleanup_error}")
# Force garbage collection to free memory
try:
gc.collect()
print(f"[{get_timestamp()}] Vynucen garbage collection")
except:
pass
# Check if we should restart
if restart_count < max_restarts:
restart_count += 1
print(f"[{get_timestamp()}] Pokus o obnovení... (pokus {restart_count}/{max_restarts})")
# Wait a moment before trying to restart
time.sleep(2)
print(f"[{get_timestamp()}] Restartuji aplikaci...")
# Continue to next iteration (non-recursive restart)
else:
print(f"[{get_timestamp()}] Dosažen maximální počet pokusů o restart ({max_restarts}).")
print(f"[{get_timestamp()}] Aplikace bude ukončena. Prosím, spusťte ji znovu ručně.")
# Keep the console window open for a moment
time.sleep(5)
break
# Final cleanup
try:
watchdog.stop()
cv2.destroyAllWindows()
print(f"[{get_timestamp()}] Aplikace ukončena.")
except Exception as e:
print(f"[{get_timestamp()}] Chyba při ukončování aplikace: {e}")
def main():
"""
Entry point with signal handling and exception catching
"""
try:
print(f"[{get_timestamp()}] Spouštím aplikaci pro detekci pohybu...")
main_loop()
except KeyboardInterrupt:
print(f"[{get_timestamp()}] Aplikace přerušena uživatelem (Ctrl+C)")
except Exception as e:
print(f"[{get_timestamp()}] Neočekávaná chyba: {e}")
finally:
print(f"[{get_timestamp()}] Konec aplikace")
if __name__ == "__main__":
main()