From 61d669d83aa8acc0f95668955fb32015abe57f03 Mon Sep 17 00:00:00 2001 From: Miguel Date: Fri, 4 Apr 2025 14:10:27 +0200 Subject: [PATCH] Mejorado de ping y funcion nueva de Scan range con obtencion de host --- ip_config.json | 2 +- menu-ip-change.py | 852 ++++++++++++++++++++++++++++++++++++++-------- ping_targets.json | 2 +- 3 files changed, 717 insertions(+), 139 deletions(-) diff --git a/ip_config.json b/ip_config.json index 91cf91a..d64ed28 100644 --- a/ip_config.json +++ b/ip_config.json @@ -1 +1 @@ -{"last_interface": "Ethernet", "last_ip_prefix": "10.1.22", "previous_config": {"name": "Ethernet", "ip_address": "10.1.22.249", "subnet_mask": "255.224.0.0", "gateway": "10.1.22.1", "dhcp_enabled": false}} \ No newline at end of file +{"last_interface": "Ethernet", "last_ip_prefix": "10.1.20", "previous_config": {"name": "Ethernet", "ip_address": "10.1.20.249", "subnet_mask": "255.255.255.0", "gateway": "10.1.20.1", "dhcp_enabled": false}} \ No newline at end of file diff --git a/menu-ip-change.py b/menu-ip-change.py index ac1cf1b..3ccd6c4 100644 --- a/menu-ip-change.py +++ b/menu-ip-change.py @@ -12,6 +12,8 @@ from dataclasses import dataclass import threading import time import socket +import queue +import struct # Add libraries for ping and SNMP functionality try: @@ -152,6 +154,19 @@ class IPChangerApp: self.interfaces: List[NetworkInterface] = [] self.ip_history: List[str] = [] + # Variables for continuous ping + self.continuous_ping = tk.BooleanVar(value=False) + self.ping_running = False + self.ping_stop_event = threading.Event() + + # Variables for IP scanning + self.scan_results = [] + self.scan_running = False + self.scan_stop_event = threading.Event() + self.scan_queue = queue.Queue() + self.scan_threads = [] + self.scan_progress_var = tk.IntVar(value=0) + # Inicializar WMI si está disponible if WMI_AVAILABLE: self.wmi_client = wmi.WMI() @@ -159,7 +174,7 @@ class IPChangerApp: # Cargar datos guardados antes de crear widgets self.load_ip_history() - # Crear la interfaz + # Crear la interfaz con pestañas self.create_widgets() # Initialize trace variables @@ -203,9 +218,57 @@ class IPChangerApp: return ip def create_widgets(self): - # Panel superior para controles - self.control_frame = ttk.Frame(self.master, padding="10") + # Create a notebook (tabbed interface) + self.notebook = ttk.Notebook(self.master) + self.notebook.grid(row=0, column=0, sticky="nsew", padx=10, pady=10) + + # Create frames for each tab + self.ip_setup_tab = ttk.Frame(self.notebook, padding="10") + self.tools_tab = ttk.Frame(self.notebook, padding="10") + + # Add tabs to the notebook + self.notebook.add(self.ip_setup_tab, text="IP Setup") + self.notebook.add(self.tools_tab, text="Tools") + + # Create IP Setup tab content (moved from original layout) + self.create_ip_setup_tab() + + # Create Tools tab content + self.create_tools_tab() + + # Log frame - shared between tabs + self.log_frame = ttk.LabelFrame(self.master, text="Operation Log", padding="5") + self.log_frame.grid(row=1, column=0, sticky="nsew", padx=10, pady=(0, 10)) + + self.log_text = tk.Text(self.log_frame, wrap="none", height=10) + self.log_scrollbar_y = ttk.Scrollbar( + self.log_frame, orient="vertical", command=self.log_text.yview + ) + self.log_scrollbar_x = ttk.Scrollbar( + self.log_frame, orient="horizontal", command=self.log_text.xview + ) + + self.log_text.configure( + yscrollcommand=self.log_scrollbar_y.set, + xscrollcommand=self.log_scrollbar_x.set, + ) + + self.log_text.grid(row=0, column=0, sticky="nsew") + self.log_scrollbar_y.grid(row=0, column=1, sticky="ns") + self.log_scrollbar_x.grid(row=1, column=0, sticky="ew") + + ttk.Button(self.log_frame, text="Clear Log", command=self.clear_log).grid( + row=2, column=0, columnspan=2, pady=5 + ) + + self.log_frame.grid_columnconfigure(0, weight=1) + self.log_frame.grid_rowconfigure(0, weight=1) + + def create_ip_setup_tab(self): + # Control frame for IP Setup tab + self.control_frame = ttk.Frame(self.ip_setup_tab) self.control_frame.grid(row=0, column=0, sticky="new") + self.control_frame.columnconfigure(0, weight=1) # Sección de interfaces self.interfaces_frame = ttk.LabelFrame( @@ -240,18 +303,29 @@ class IPChangerApp: ttk.Label(self.current_frame, textvariable=self.current_ip_var).grid( row=0, column=1, sticky="w" ) + # Add copy button for IP + ttk.Button( + self.current_frame, text="Copy", width=5, command=self.copy_current_ip + ).grid(row=0, column=2, padx=5) + ttk.Label(self.current_frame, text="Subnet Mask:").grid( row=1, column=0, sticky="w", padx=5 ) ttk.Label(self.current_frame, textvariable=self.current_mask_var).grid( row=1, column=1, sticky="w" ) + ttk.Label(self.current_frame, text="Gateway:").grid( row=2, column=0, sticky="w", padx=5 ) ttk.Label(self.current_frame, textvariable=self.current_gateway_var).grid( row=2, column=1, sticky="w" ) + # Add copy button for Gateway + ttk.Button( + self.current_frame, text="Copy", width=5, command=self.copy_current_gateway + ).grid(row=2, column=2, padx=5) + ttk.Label(self.current_frame, text="DHCP Status:").grid( row=3, column=0, sticky="w", padx=5 ) @@ -298,8 +372,6 @@ class IPChangerApp: self.cidr_entry = ttk.Entry(self.ip_frame, textvariable=self.cidr_bits, width=5) self.cidr_entry.grid(row=1, column=3, sticky="w", padx=5) - # Don't add traces here, we'll do it after widget creation is complete - # Sección de historial self.history_frame = ttk.LabelFrame( self.control_frame, text="IP History", padding="5" @@ -338,11 +410,17 @@ class IPChangerApp: command=self.refresh_interfaces, ).grid(row=0, column=3, padx=5) - # Sección de Ping - add initial value and bind event + def create_tools_tab(self): + # Control frame for Tools tab + self.tools_control_frame = ttk.Frame(self.tools_tab) + self.tools_control_frame.grid(row=0, column=0, sticky="new") + self.tools_control_frame.columnconfigure(0, weight=1) + + # Network Tools section (moved from original layout) self.ping_frame = ttk.LabelFrame( - self.control_frame, text="Network Tools", padding="5" + self.tools_control_frame, text="Ping Tool", padding="5" ) - self.ping_frame.grid(row=5, column=0, sticky="ew", pady=(0, 10)) + self.ping_frame.grid(row=0, column=0, sticky="ew", pady=(0, 10)) ttk.Label(self.ping_frame, text="Target IP/Host:").grid( row=0, column=0, sticky="w", padx=5 @@ -354,37 +432,116 @@ class IPChangerApp: self.ping_entry.grid(row=0, column=1, sticky="w", padx=5) # Bind the FocusOut event to save the ping target self.ping_entry.bind("", self.save_current_ping_target) + + # Add continuous ping checkbox + self.continuous_ping_check = ttk.Checkbutton( + self.ping_frame, text="Continuous Ping", variable=self.continuous_ping + ) + self.continuous_ping_check.grid(row=0, column=2, padx=5) + ttk.Button(self.ping_frame, text="Ping", command=self.do_ping).grid( - row=0, column=2, padx=5 + row=0, column=3, padx=5 + ) + ttk.Button(self.ping_frame, text="Stop", command=self.stop_ping).grid( + row=0, column=4, padx=5 ) - # Log en la parte inferior - self.log_frame = ttk.LabelFrame(self.master, text="Operation Log", padding="5") - self.log_frame.grid(row=1, column=0, sticky="nsew", padx=10, pady=(0, 10)) - - self.log_text = tk.Text(self.log_frame, wrap="none", height=10) - self.log_scrollbar_y = ttk.Scrollbar( - self.log_frame, orient="vertical", command=self.log_text.yview + # Network Scan section + self.scan_frame = ttk.LabelFrame( + self.tools_control_frame, text="Network Scan", padding="5" ) - self.log_scrollbar_x = ttk.Scrollbar( - self.log_frame, orient="horizontal", command=self.log_text.xview + self.scan_frame.grid(row=1, column=0, sticky="ew", pady=(0, 10)) + + # Start IP + ttk.Label(self.scan_frame, text="Start IP:").grid( + row=0, column=0, sticky="w", padx=5 + ) + self.scan_start_ip = tk.StringVar() + self.scan_start_entry = ttk.Entry( + self.scan_frame, textvariable=self.scan_start_ip, width=15 + ) + self.scan_start_entry.grid(row=0, column=1, sticky="w", padx=5) + + # End IP + ttk.Label(self.scan_frame, text="End IP:").grid( + row=0, column=2, sticky="w", padx=5 + ) + self.scan_end_ip = tk.StringVar() + self.scan_end_entry = ttk.Entry( + self.scan_frame, textvariable=self.scan_end_ip, width=15 + ) + self.scan_end_entry.grid(row=0, column=3, sticky="w", padx=5) + + # Scan buttons + self.scan_buttons_frame = ttk.Frame(self.scan_frame) + self.scan_buttons_frame.grid(row=0, column=4, columnspan=2, padx=5) + + ttk.Button(self.scan_buttons_frame, text="Start Scan", command=self.start_scan).grid( + row=0, column=0, padx=5 + ) + ttk.Button(self.scan_buttons_frame, text="Stop Scan", command=self.stop_scan).grid( + row=0, column=1, padx=5 + ) + ttk.Button(self.scan_buttons_frame, text="Get Host Info", command=self.gather_host_information).grid( + row=1, column=0, columnspan=2, padx=5, pady=3 ) - self.log_text.configure( - yscrollcommand=self.log_scrollbar_y.set, - xscrollcommand=self.log_scrollbar_x.set, + # Scan progress + self.scan_progress = ttk.Progressbar( + self.scan_frame, + orient="horizontal", + length=300, + mode="determinate", + variable=self.scan_progress_var, + ) + self.scan_progress.grid( + row=1, column=0, columnspan=6, sticky="ew", padx=5, pady=5 ) - self.log_text.grid(row=0, column=0, sticky="nsew") - self.log_scrollbar_y.grid(row=0, column=1, sticky="ns") - self.log_scrollbar_x.grid(row=1, column=0, sticky="ew") - - ttk.Button(self.log_frame, text="Clear Log", command=self.clear_log).grid( - row=2, column=0, columnspan=2, pady=5 + # Scan results + ttk.Label(self.scan_frame, text="Scan Results:").grid( + row=2, column=0, sticky="w", padx=5 ) - self.log_frame.grid_columnconfigure(0, weight=1) - self.log_frame.grid_rowconfigure(0, weight=1) + # Frame for results list and scrollbar + self.results_frame = ttk.Frame(self.scan_frame) + self.results_frame.grid( + row=3, column=0, columnspan=6, sticky="nsew", padx=5, pady=5 + ) + self.results_frame.columnconfigure(0, weight=1) + self.results_frame.rowconfigure(0, weight=1) + + # Replace Listbox with Treeview that has columns + self.scan_results_tree = ttk.Treeview( + self.results_frame, + columns=("ip", "hostname", "mac"), + show="headings", + height=10 + ) + + # Define columns + self.scan_results_tree.heading("ip", text="IP Address") + self.scan_results_tree.heading("hostname", text="Hostname") + self.scan_results_tree.heading("mac", text="MAC Address") + + # Set column widths + self.scan_results_tree.column("ip", width=120, anchor="w") + self.scan_results_tree.column("hostname", width=200, anchor="w") + self.scan_results_tree.column("mac", width=150, anchor="w") + + # Add scrollbar + self.scan_results_scrollbar = ttk.Scrollbar( + self.results_frame, orient="vertical", command=self.scan_results_tree.yview + ) + self.scan_results_tree.configure(yscrollcommand=self.scan_results_scrollbar.set) + + self.scan_results_tree.grid(row=0, column=0, sticky="nsew") + self.scan_results_scrollbar.grid(row=0, column=1, sticky="ns") + + # Configure scan frame to expand + self.scan_frame.columnconfigure(1, weight=1) + self.scan_frame.columnconfigure(3, weight=1) + self.scan_frame.rowconfigure(3, weight=1) def clear_log(self): self.log_text.delete("1.0", tk.END) @@ -554,7 +711,7 @@ class IPChangerApp: ) # Actualizar IP prefix si hay una IP válida - if interface.ip_address: + if (interface.ip_address): prefix = ".".join(interface.ip_address.split(".")[:3]) self.ip_prefix.set(prefix) @@ -567,6 +724,10 @@ class IPChangerApp: self.log_message(f"Selected interface: {selected}") + # Update scan IP range based on current IP and subnet mask + if interface.ip_address and interface.subnet_mask: + self.update_scan_ip_range(interface.ip_address, interface.subnet_mask) + def validate_ip_input(self) -> tuple[bool, str]: try: prefix = self.ip_prefix.get().strip() @@ -748,149 +909,427 @@ class IPChangerApp: # Save the ping target when used self.save_current_ping_target() - self.log_message(f"Pinging {target}...") + # Check if continuous ping is enabled + if self.continuous_ping.get(): + self.log_message(f"Starting continuous ping to {target}...") - # Crear un hilo para el ping para no bloquear la interfaz - threading.Thread(target=self._execute_ping, args=(target,), daemon=True).start() + # Reset the stop event + self.ping_stop_event.clear() + self.ping_running = True - def _execute_ping(self, target: str): - """Ejecuta el ping usando bibliotecas de Python en lugar del comando del sistema""" + # Start ping in a separate thread + threading.Thread( + target=self._execute_continuous_ping, args=(target,), daemon=True + ).start() + else: + self.log_message(f"Pinging {target}...") + + # Standard ping (original behavior) + threading.Thread( + target=self._execute_ping, args=(target,), daemon=True + ).start() + + def stop_ping(self): + """Stops any ongoing continuous ping""" + if self.ping_running: + self.log_message("Stopping continuous ping...") + self.ping_stop_event.set() + self.ping_running = False + + def _execute_continuous_ping(self, target: str): + """Execute continuous ping until stopped""" try: - if PING_LIBRARY_AVAILABLE: - # Usar PythonPing que es multiplataforma y no requiere privilegios - self.log_message( - f"Sending 4 ICMP echo requests to {target} using PythonPing..." - ) + self.log_message( + f"Continuous ping to {target} started. Press Stop to terminate." + ) + count = 0 - # Ejecutar el ping - response = pythonping(target, count=4, timeout=1) + while not self.ping_stop_event.is_set(): + count += 1 + if PING_LIBRARY_AVAILABLE: + try: + # Send a single ping + response = pythonping(target, count=1, timeout=1) - # Comprobar si el ping tuvo éxito - if response.success(): - min_rtt = min(r.time_elapsed_ms for r in response) - max_rtt = max(r.time_elapsed_ms for r in response) - avg_rtt = sum(r.time_elapsed_ms for r in response) / len(response) - - # Mostrar resultados individuales - for i, reply in enumerate(response): - self.log_message( - f"Reply from {target}: time={reply.time_elapsed_ms:.2f}ms" - ) - - # Resumen - success_count = len([r for r in response if r.success]) - self.log_message(f"Ping statistics for {target}:") - self.log_message( - f" Packets: Sent = 4, Received = {success_count}, Lost = {4 - success_count}" - ) - self.log_message(f"Approximate round trip times in milliseconds:") - self.log_message( - f" Minimum = {min_rtt:.2f}ms, Maximum = {max_rtt:.2f}ms, Average = {avg_rtt:.2f}ms" - ) + # Check if any reply was successful + if any(reply.success for reply in response): + # Get the first successful reply + for reply in response: + if reply.success: + rtt = reply.time_elapsed_ms + self.log_message( + f"Reply from {target}: time={rtt:.2f}ms (seq={count})" + ) + break + else: + self.log_message(f"Request timed out (seq={count})") + except Exception as ping_error: + self.log_message(f"Ping error: {str(ping_error)} (seq={count})") else: - self.log_message(f"Could not reach host {target}") - else: - # Usar socket para hacer un ping básico (solo comprueba si el host está activo) - self.log_message( - f"PythonPing not available, checking if host {target} is reachable..." - ) - self.log_message( - "Note: This is not a true ICMP ping, just a TCP connection test" - ) - - for port in [80, 443, 22, 21]: + # Fallback to socket try: start_time = time.time() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(1) - result = s.connect_ex((target, port)) + result = s.connect_ex((target, 80)) elapsed_time = (time.time() - start_time) * 1000 # ms s.close() if result == 0: self.log_message( - f"Port {port} on {target} is open (TCP connect time: {elapsed_time:.2f}ms)" + f"Connected to {target}:80 in {elapsed_time:.2f}ms (seq={count})" ) else: self.log_message( - f"Port {port} on {target} is not responding" + f"Failed to connect to {target}:80 (seq={count})" ) - except socket.gaierror: - self.log_message(f"Could not resolve hostname {target}") + except Exception as socket_error: + self.log_message( + f"Connection error: {str(socket_error)} (seq={count})" + ) + + # Wait 1 second before next ping + for _ in range( + 10 + ): # Check for stop every 100ms for more responsive stopping + if self.ping_stop_event.is_set(): break - except socket.error as e: - self.log_message(f"Error connecting to {target}: {str(e)}") - except AttributeError as e: - # Handle specifically the attribute error - self.log_message(f"Error with PythonPing API: {str(e)}") + time.sleep(0.1) - # Fallback to alternative ping implementation - self.log_message("Falling back to socket-based connectivity test...") + self.log_message( + f"Continuous ping to {target} stopped after {count} pings." + ) - # Implement a simple socket-based ping - success = False - try: - # Try to connect to the host - this just checks if it's reachable - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(2) - start_time = time.time() - s.connect((target, 80)) # Try to connect to port 80 - response_time = (time.time() - start_time) * 1000 - s.close() + except Exception as e: + self.log_message(f"Error in continuous ping: {str(e)}") + finally: + self.ping_running = False + + def _execute_ping(self, target: str): + """Execute a single ping and display the results""" + try: + self.log_message(f"Pinging {target} with 4 echo requests...") + + if PING_LIBRARY_AVAILABLE: + # Use PythonPing library + response = pythonping(target, count=4, timeout=1) + + # Display individual replies + for i, reply in enumerate(response): + if reply.success: + rtt = reply.time_elapsed_ms + self.log_message(f"Reply from {target}: time={rtt:.2f}ms") + else: + self.log_message(f"Request timed out (seq={i+1})") + + # Display statistics + success_count = sum(1 for r in response if r.success) + loss_percentage = (4 - success_count) * 25 + + self.log_message(f"\nPing statistics for {target}:") self.log_message( - f"Host {target} is reachable. Response time: {response_time:.2f}ms" + f" Packets: Sent = 4, Received = {success_count}, Lost = {4 - success_count} ({loss_percentage}% loss)" ) - success = True - except socket.error as se: - self.log_message(f"Could not connect to {target}: {str(se)}") - # Try to at least resolve the hostname - try: - ip = socket.gethostbyname(target) - self.log_message( - f"Hostname {target} resolves to {ip}, but connection failed" + if success_count > 0: + min_rtt = min(r.time_elapsed_ms for r in response if r.success) + max_rtt = max(r.time_elapsed_ms for r in response if r.success) + avg_rtt = ( + sum(r.time_elapsed_ms for r in response if r.success) + / success_count ) - except socket.gaierror: - self.log_message(f"Could not resolve hostname {target}") - # Alternative implementation using subprocess directly as last resort - try: - self.log_message("Attempting to ping using system command...") - # Use subprocess to run ping directly (won't work if command not available) - use_count = "-n" if sys.platform.lower() == "win32" else "-c" - cmd = ["ping", use_count, "4", target] - process = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE + self.log_message(f"Approximate round trip times in milliseconds:") + self.log_message( + f" Minimum = {min_rtt:.2f}ms, Maximum = {max_rtt:.2f}ms, Average = {avg_rtt:.2f}ms" + ) + else: + # Fallback to socket if PythonPing is not available + self.log_message("Using socket connection test (not a true ICMP ping)") + + success_count = 0 + times = [] + + for i in range(4): + try: + start_time = time.time() + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(1) + result = s.connect_ex((target, 80)) # Try to connect to port 80 + elapsed_time = ( + time.time() - start_time + ) * 1000 # Convert to ms + s.close() + + if result == 0: + success_count += 1 + times.append(elapsed_time) + self.log_message( + f"Connected to {target}:80 in {elapsed_time:.2f}ms" + ) + else: + self.log_message( + f"Failed to connect to {target}:80 (seq={i+1})" + ) + except Exception as e: + self.log_message(f"Connection error: {str(e)} (seq={i+1})") + + # Display statistics + loss_percentage = (4 - success_count) * 25 + self.log_message(f"\nConnection statistics for {target}:") + self.log_message( + f" Attempts: Sent = 4, Successful = {success_count}, Failed = {4 - success_count} ({loss_percentage}% loss)" ) - stdout, stderr = process.communicate() - if process.returncode == 0: - # Process and display output - output = stdout.decode("utf-8", errors="replace") - for line in output.splitlines(): - if line.strip(): - self.log_message(line) - else: + if success_count > 0: + self.log_message(f"Approximate connection times in milliseconds:") self.log_message( - f"System ping command failed: {stderr.decode('utf-8', errors='replace')}" + f" Minimum = {min(times):.2f}ms, Maximum = {max(times):.2f}ms, Average = {sum(times)/len(times):.2f}ms" ) - except Exception as subproc_e: - self.log_message(f"System ping command error: {str(subproc_e)}") + + # Try to get hostname and MAC address + hostname = self.get_hostname(target) + mac_address = self.get_mac_address(target) + + if hostname: + self.log_message(f"Hostname: {hostname}") + if mac_address: + self.log_message(f"MAC Address: {mac_address}") except Exception as e: self.log_message(f"Error executing ping: {str(e)}") + + def start_scan(self): + """Start network scanning between IP range""" + start_ip = self.scan_start_ip.get().strip() + end_ip = self.scan_end_ip.get().strip() + + # Validate IPs + try: + start = ipaddress.IPv4Address(start_ip) + end = ipaddress.IPv4Address(end_ip) + + if start > end: + self.show_error("Start IP must be lower than or equal to End IP") + return + + # Clear previous results - FIXED: use the treeview method instead of the old listbox + for item in self.scan_results_tree.get_children(): + self.scan_results_tree.delete(item) + + self.scan_results = [] + + # Calculate total IPs to scan for progress bar + ip_range = int(end) - int(start) + 1 + self.scan_progress_var.set(0) + self.scan_progress["maximum"] = ip_range + self.log_message( - "PythonPing may need to be reinstalled or updated. Run: pip install -U pythonping" + f"Starting scan from {start_ip} to {end_ip} ({ip_range} addresses)..." ) - # Try to give more information - try: - import pkg_resources + # Reset stop event and set running flag + self.scan_stop_event.clear() + self.scan_running = True - version = pkg_resources.get_distribution("pythonping").version - self.log_message(f"Installed PythonPing version: {version}") - except Exception: - self.log_message("Could not determine PythonPing version") + # Start scan in a separate thread + threading.Thread( + target=self._execute_scan, args=(start, end, ip_range), daemon=True + ).start() + + except ValueError as e: + self.show_error(f"Invalid IP address: {str(e)}") + + def stop_scan(self): + """Stop an ongoing network scan""" + if self.scan_running: + self.log_message("Stopping network scan...") + self.scan_stop_event.set() + + # Wait for all scan threads to finish + for thread in self.scan_threads: + if thread.is_alive(): + thread.join(0.5) + + self.scan_running = False + self.log_message("Network scan stopped.") + + def _execute_scan(self, start_ip, end_ip, ip_range): + """Execute the network scan - now only performs ping scan""" + try: + # Empty the queue + while not self.scan_queue.empty(): + self.scan_queue.get_nowait() + + # Clear threads list + self.scan_threads = [] + + # Reset counter for finished IPs + self.scan_completed = 0 + + # Clear previous scan results tree + for item in self.scan_results_tree.get_children(): + self.scan_results_tree.delete(item) + + # Fill queue with all IPs in range + for ip_int in range(int(start_ip), int(end_ip) + 1): + self.scan_queue.put(ipaddress.IPv4Address(ip_int)) + + # Start worker threads for parallel scanning + max_threads = min(20, ip_range) # Limit to 20 threads or fewer if range is small + for i in range(max_threads): + thread = threading.Thread(target=self._scan_worker, daemon=True) + thread.start() + self.scan_threads.append(thread) + + # Wait for threads to complete or be stopped + while self.scan_running and any(t.is_alive() for t in self.scan_threads): + time.sleep(0.1) + + if not self.scan_stop_event.is_set(): + self.log_message( + f"Scan completed. Found {len(self.scan_results)} active hosts. Use 'Get Host Info' to lookup hostnames and MAC addresses." + ) + + except Exception as e: + self.log_message(f"Error in network scan: {str(e)}") + finally: + self.scan_running = False + + def _scan_worker(self): + """Worker thread function to scan IPs from the queue - now only performs ping scan""" + while not self.scan_stop_event.is_set(): + try: + # Get next IP from queue with timeout + try: + ip_address = self.scan_queue.get(timeout=0.1) + except queue.Empty: + # No more IPs to scan + break + + # Ping the IP + is_active = self._ping_host(str(ip_address)) + + # If active, add to results - but don't get host info yet + if is_active: + ip_str = str(ip_address) + self.log_message(f"Host discovered: {ip_str}") + self.scan_results.append(ip_str) + + # Add to results tree with placeholder values + self.master.after( + 0, + lambda ip=ip_str: self.add_scan_result(ip, "", "") + ) + + # Update progress + self.scan_completed = self.scan_completed + 1 + self.scan_progress_var.set(self.scan_completed) + + # Mark task as done + self.scan_queue.task_done() + + except Exception as e: + self.log_message(f"Error in scan worker: {str(e)}") + + def gather_host_information(self): + """Gather hostname and MAC address information for discovered hosts""" + if not self.scan_results: + self.show_info("No hosts discovered yet. Run a scan first.") + return + + # Start the information gathering in a separate thread + threading.Thread( + target=self._execute_host_gathering, + daemon=True + ).start() + + def _execute_host_gathering(self): + """Perform the actual host information gathering""" + try: + self.log_message(f"Gathering information for {len(self.scan_results)} hosts...") + + # Reset progress bar for this operation + total_hosts = len(self.scan_results) + self.scan_progress_var.set(0) + self.scan_progress["maximum"] = total_hosts + + # Get information for each host + for i, ip in enumerate(self.scan_results): + if self.scan_stop_event.is_set(): + self.log_message("Host information gathering stopped.") + break + + self.log_message(f"Getting information for host {ip} ({i+1}/{total_hosts})...") + + # Get hostname and MAC + hostname = self.get_hostname(ip) + mac_address = self.get_mac_address(ip) + + # Log what we found + if hostname: + self.log_message(f" Hostname: {hostname}") + else: + self.log_message(f" Hostname: Not resolved") + + if mac_address: + self.log_message(f" MAC Address: {mac_address}") + else: + self.log_message(f" MAC Address: Not found") + + # Update the tree + self.update_host_in_tree(ip, hostname, mac_address) + + # Update progress + self.scan_progress_var.set(i + 1) + + self.log_message("Host information gathering completed.") + + except Exception as e: + self.log_message(f"Error gathering host information: {str(e)}") + + def update_host_in_tree(self, ip, hostname, mac): + """Update an existing host in the treeview with obtained information""" + try: + # Find the item with this IP + for item_id in self.scan_results_tree.get_children(): + values = self.scan_results_tree.item(item_id, 'values') + if values and values[0] == ip: + # Update the values + hostname_str = hostname if hostname else "Not resolved" + mac_str = mac if mac else "Not found" + self.scan_results_tree.item(item_id, values=(ip, hostname_str, mac_str)) + break + except Exception as e: + self.log_message(f"Error updating host in treeview: {str(e)}") + + def add_scan_result(self, ip, hostname, mac): + """Helper method to add scan result to the treeview""" + try: + # Insert the item with all values, even if some are empty + hostname = hostname if hostname else "Not resolved" + mac = mac if mac else "Not found" + + self.scan_results_tree.insert("", "end", values=(ip, hostname, mac)) + except Exception as e: + self.log_message(f"Error adding scan result to UI: {str(e)}") + + def _ping_host(self, host: str) -> bool: + """Ping a host to check if it's active, returns True if host responds""" + try: + if PING_LIBRARY_AVAILABLE: + # Use PythonPing with shorter timeout for scanning + response = pythonping(host, count=1, timeout=0.5) + return response.success() + else: + # Fallback using socket + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(0.5) + result = s.connect_ex((host, 80)) # Try common port + s.close() + return result == 0 + except: + return False def set_static_ip(self): interface = self.if_var.get() @@ -1015,7 +1454,7 @@ class IPChangerApp: # Add new methods for subnet mask conversion def on_subnet_mask_changed(self, *args): - """Update CIDR bits when subnet mask changes""" + """Update CIDR bits when subnet mask changes and update scan IP range""" # Use a static variable to prevent recursion if hasattr(self, "_updating_mask") and self._updating_mask: return @@ -1028,13 +1467,19 @@ class IPChangerApp: # Convert mask to bits bits = self.subnet_mask_to_cidr(mask) self.cidr_bits.set(str(bits)) + + # Update scan IP range if we have a current IP + current_ip = self.current_ip_var.get() + if current_ip: + self.update_scan_ip_range(current_ip, mask) + except Exception as e: self.log_message(f"Error updating CIDR bits: {str(e)}") finally: self._updating_mask = False def on_cidr_bits_changed(self, *args): - """Update subnet mask when CIDR bits change""" + """Update subnet mask when CIDR bits change and update scan IP range""" # Use a static variable to prevent recursion if hasattr(self, "_updating_bits") and self._updating_bits: return @@ -1049,6 +1494,12 @@ class IPChangerApp: # Convert bits to mask mask = self.cidr_to_subnet_mask(bits) self.subnet_mask.set(mask) + + # Update scan IP range if we have a current IP + current_ip = self.current_ip_var.get() + if current_ip: + self.update_scan_ip_range(current_ip, mask) + except Exception as e: self.log_message(f"Error updating subnet mask: {str(e)}") finally: @@ -1118,6 +1569,133 @@ class IPChangerApp: self.config.set_ping_target(ip_prefix, target) self.log_message(f"Saved ping target {target} for IP prefix {ip_prefix}") + def copy_to_clipboard(self, value): + """Copy the given value to clipboard""" + self.master.clipboard_clear() + self.master.clipboard_append(value) + self.log_message(f"Copied to clipboard: {value}") + + def copy_current_ip(self): + """Copy the current IP to clipboard""" + ip = self.current_ip_var.get() + if ip: + self.copy_to_clipboard(ip) + else: + self.show_info("No IP address available to copy") + + def copy_current_gateway(self): + """Copy the current gateway to clipboard""" + gateway = self.current_gateway_var.get() + if gateway: + self.copy_to_clipboard(gateway) + else: + self.show_info("No gateway address available to copy") + + def update_scan_ip_range(self, ip_address, subnet_mask): + """Calculate and update scan IP range based on current IP and subnet mask""" + try: + # Calculate network address and broadcast address + ip_obj = ipaddress.IPv4Address(ip_address) + mask_obj = ipaddress.IPv4Address(subnet_mask) + + # Convert mask to binary string + mask_bits = bin(int(mask_obj))[2:].zfill(32) + network_bits = mask_bits.count("1") + + # Create network object + network = ipaddress.IPv4Network( + f"{ip_address}/{network_bits}", strict=False + ) + + # Get first and last usable host addresses + if network.num_addresses <= 2: + # For point-to-point links or /31 masks + start_ip = network.network_address + end_ip = network.broadcast_address + else: + # Skip network address and broadcast address + start_ip = network.network_address + 1 + end_ip = network.broadcast_address - 1 + + # Update the scan fields + self.scan_start_ip.set(str(start_ip)) + self.scan_end_ip.set(str(end_ip)) + + self.log_message(f"Updated scan range: {start_ip} - {end_ip}") + + except Exception as e: + self.log_message(f"Error calculating IP range: {str(e)}") + + def get_hostname(self, ip_address): + """Try to resolve hostname for an IP address with better error handling""" + try: + self.log_message(f"Resolving hostname for {ip_address}...") + hostname, _, _ = socket.gethostbyaddr(ip_address) + return hostname + except socket.herror as e: + self.log_message(f"Hostname resolution error for {ip_address}: {e}") + return "" + except socket.gaierror as e: + self.log_message(f"Address-related error for {ip_address}: {e}") + return "" + except socket.timeout: + self.log_message(f"Timeout resolving hostname for {ip_address}") + return "" + except Exception as e: + self.log_message(f"Unknown error resolving hostname for {ip_address}: {str(e)}") + return "" + + def get_mac_address(self, ip_address): + """Get MAC address for an IP using ARP on Windows with improved parsing""" + try: + # Run the ARP command with timeout to avoid hanging + result = subprocess.run( + f"arp -a {ip_address}", + shell=True, + capture_output=True, + text=True, + timeout=2 + ) + + if result.returncode == 0 and result.stdout: + # Log the raw output for debugging + self.log_message(f"ARP output for {ip_address}: {result.stdout.strip()}") + + # Parse the output to find the MAC address + # First try the typical format with hyphens or colons + mac_match = re.search(r"([0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2}[-:][0-9A-F]{2})", + result.stdout, + re.IGNORECASE) + if mac_match: + return mac_match.group(0) + + # Try an alternative format with spaces (Windows format) + mac_match = re.search(r"([0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2}[\s-][0-9a-f]{2})", + result.stdout, + re.IGNORECASE) + if mac_match: + return mac_match.group(0) + + # Try yet another approach - look for any string of hex digits with separators + lines = result.stdout.strip().split('\n') + for line in lines: + if ip_address in line: + parts = line.split() + # Usually the MAC address is the second column in the output + if len(parts) >= 2: + # Check if the second part looks like a MAC address + if re.match(r"([0-9a-f]{2}[^\w]){5}[0-9a-f]{2}", parts[1], re.IGNORECASE): + return parts[1] + + self.log_message(f"No MAC address found for {ip_address}") + return "" + except subprocess.TimeoutExpired: + self.log_message(f"Timeout running ARP command for {ip_address}") + return "" + except Exception as e: + self.log_message(f"Error getting MAC address for {ip_address}: {str(e)}") + return "" + def main(): root = tk.Tk() diff --git a/ping_targets.json b/ping_targets.json index 3900d64..acd0e5c 100644 --- a/ping_targets.json +++ b/ping_targets.json @@ -1 +1 @@ -{"10.1.22": "10.1.20.11"} \ No newline at end of file +{"10.1.22": "10.1.22.11", "10.138.182": "10.138.182.94", "10.1.20": "10.1.20.11"} \ No newline at end of file